001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.coordinator; 021 022import ca.uhn.fhir.batch2.api.JobExecutionFailedException; 023import ca.uhn.fhir.batch2.model.JobDefinition; 024import ca.uhn.fhir.batch2.model.JobDefinitionStep; 025import ca.uhn.fhir.batch2.model.JobInstance; 026import ca.uhn.fhir.context.ConfigurationException; 027import ca.uhn.fhir.i18n.Msg; 028import ca.uhn.fhir.model.api.IModelJson; 029import ca.uhn.fhir.util.Logs; 030import com.google.common.collect.ImmutableSortedMap; 031import jakarta.annotation.Nonnull; 032import org.apache.commons.lang3.Validate; 033import org.slf4j.Logger; 034 035import java.util.Collection; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.NavigableMap; 040import java.util.Optional; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.stream.Collectors; 045 046public class JobDefinitionRegistry { 047 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 048 049 private final Map<String, NavigableMap<Integer, JobDefinition<?>>> myJobDefinitions = new ConcurrentHashMap<>(); 050 051 /** 052 * Add a job definition only if it is not registered 053 * 054 * @param <PT> the job parameter type for the definition 055 * @return true if it did not already exist and was registered 056 */ 057 public synchronized <PT extends IModelJson> boolean addJobDefinitionIfNotRegistered( 058 @Nonnull JobDefinition<PT> theDefinition) { 059 Optional<JobDefinition<?>> orig = 060 getJobDefinition(theDefinition.getJobDefinitionId(), theDefinition.getJobDefinitionVersion()); 061 if (orig.isPresent()) { 062 return false; 063 } 064 addJobDefinition(theDefinition); 065 return true; 066 } 067 068 public synchronized <PT extends IModelJson> void addJobDefinition(@Nonnull JobDefinition<PT> theDefinition) { 069 Validate.notNull(theDefinition); 070 String jobDefinitionId = theDefinition.getJobDefinitionId(); 071 Validate.notBlank(jobDefinitionId); 072 Validate.isTrue(theDefinition.getJobDefinitionVersion() >= 1); 073 Validate.isTrue(theDefinition.getSteps().size() > 1); 074 075 Set<String> stepIds = new HashSet<>(); 076 for (JobDefinitionStep<?, ?, ?> next : theDefinition.getSteps()) { 077 if (!stepIds.add(next.getStepId())) { 078 throw new ConfigurationException( 079 Msg.code(2046) + "Duplicate step[" + next.getStepId() + "] in definition[" + jobDefinitionId 080 + "] version: " + theDefinition.getJobDefinitionVersion()); 081 } 082 } 083 084 NavigableMap<Integer, JobDefinition<?>> versionMap = 085 myJobDefinitions.computeIfAbsent(jobDefinitionId, t -> new TreeMap<>()); 086 if (versionMap.containsKey(theDefinition.getJobDefinitionVersion())) { 087 if (versionMap.get(theDefinition.getJobDefinitionVersion()) == theDefinition) { 088 ourLog.warn( 089 "job[{}] version: {} already registered. Not registering again.", 090 jobDefinitionId, 091 theDefinition.getJobDefinitionVersion()); 092 return; 093 } 094 throw new ConfigurationException(Msg.code(2047) + "Multiple definitions for job[" + jobDefinitionId 095 + "] version: " + theDefinition.getJobDefinitionVersion()); 096 } 097 versionMap.put(theDefinition.getJobDefinitionVersion(), theDefinition); 098 } 099 100 public synchronized void removeJobDefinition(@Nonnull String theDefinitionId, int theVersion) { 101 Validate.notBlank(theDefinitionId); 102 Validate.isTrue(theVersion >= 1); 103 104 NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theDefinitionId); 105 if (versionMap != null) { 106 versionMap.remove(theVersion); 107 if (versionMap.isEmpty()) { 108 myJobDefinitions.remove(theDefinitionId); 109 } 110 } 111 } 112 113 public Optional<JobDefinition<?>> getLatestJobDefinition(@Nonnull String theJobDefinitionId) { 114 NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theJobDefinitionId); 115 if (versionMap == null || versionMap.isEmpty()) { 116 return Optional.empty(); 117 } 118 return Optional.of(versionMap.lastEntry().getValue()); 119 } 120 121 public Optional<JobDefinition<?>> getJobDefinition( 122 @Nonnull String theJobDefinitionId, int theJobDefinitionVersion) { 123 NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theJobDefinitionId); 124 if (versionMap == null || versionMap.isEmpty()) { 125 return Optional.empty(); 126 } 127 return Optional.of(versionMap.get(theJobDefinitionVersion)); 128 } 129 130 /** 131 * @throws JobExecutionFailedException if the job definition can not be found 132 */ 133 public JobDefinition<?> getJobDefinitionOrThrowException(String theJobDefinitionId, int theJobDefinitionVersion) { 134 Optional<JobDefinition<?>> opt = getJobDefinition(theJobDefinitionId, theJobDefinitionVersion); 135 if (opt.isEmpty()) { 136 String msg = 137 "Unknown job definition ID[" + theJobDefinitionId + "] version[" + theJobDefinitionVersion + "]"; 138 ourLog.warn(msg); 139 throw new JobExecutionFailedException(Msg.code(2043) + msg); 140 } 141 return opt.get(); 142 } 143 144 public void setJobDefinition(JobInstance theInstance) { 145 JobDefinition<?> jobDefinition = getJobDefinitionOrThrowException(theInstance); 146 theInstance.setJobDefinition(jobDefinition); 147 } 148 149 /** 150 * @return a list of Job Definition Ids in alphabetical order 151 */ 152 public List<String> getJobDefinitionIds() { 153 return myJobDefinitions.keySet().stream().sorted().collect(Collectors.toList()); 154 } 155 156 public boolean isEmpty() { 157 return myJobDefinitions.isEmpty(); 158 } 159 160 @SuppressWarnings("unchecked") 161 public <T extends IModelJson> JobDefinition<T> getJobDefinitionOrThrowException(JobInstance theJobInstance) { 162 return (JobDefinition<T>) getJobDefinitionOrThrowException( 163 theJobInstance.getJobDefinitionId(), theJobInstance.getJobDefinitionVersion()); 164 } 165 166 public Collection<Integer> getJobDefinitionVersions(String theDefinitionId) { 167 return myJobDefinitions 168 .getOrDefault(theDefinitionId, ImmutableSortedMap.of()) 169 .keySet(); 170 } 171}