001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.coordinator;
021
022import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
023import ca.uhn.fhir.batch2.model.JobDefinition;
024import ca.uhn.fhir.batch2.model.JobDefinitionStep;
025import ca.uhn.fhir.batch2.model.JobInstance;
026import ca.uhn.fhir.context.ConfigurationException;
027import ca.uhn.fhir.i18n.Msg;
028import ca.uhn.fhir.model.api.IModelJson;
029import ca.uhn.fhir.util.Logs;
030import com.google.common.collect.ImmutableSortedMap;
031import jakarta.annotation.Nonnull;
032import org.apache.commons.lang3.Validate;
033import org.slf4j.Logger;
034
035import java.util.Collection;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.NavigableMap;
040import java.util.Optional;
041import java.util.Set;
042import java.util.TreeMap;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.stream.Collectors;
045
046public class JobDefinitionRegistry {
047        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
048
049        private final Map<String, NavigableMap<Integer, JobDefinition<?>>> myJobDefinitions = new ConcurrentHashMap<>();
050
051        /**
052         * Add a job definition only if it is not registered
053         *
054         * @param <PT> the job parameter type for the definition
055         * @return true if it did not already exist and was registered
056         */
057        public synchronized <PT extends IModelJson> boolean addJobDefinitionIfNotRegistered(
058                        @Nonnull JobDefinition<PT> theDefinition) {
059                Optional<JobDefinition<?>> orig =
060                                getJobDefinition(theDefinition.getJobDefinitionId(), theDefinition.getJobDefinitionVersion());
061                if (orig.isPresent()) {
062                        return false;
063                }
064                addJobDefinition(theDefinition);
065                return true;
066        }
067
068        public synchronized <PT extends IModelJson> void addJobDefinition(@Nonnull JobDefinition<PT> theDefinition) {
069                Validate.notNull(theDefinition);
070                String jobDefinitionId = theDefinition.getJobDefinitionId();
071                Validate.notBlank(jobDefinitionId);
072                Validate.isTrue(theDefinition.getJobDefinitionVersion() >= 1);
073                Validate.isTrue(theDefinition.getSteps().size() > 1);
074
075                Set<String> stepIds = new HashSet<>();
076                for (JobDefinitionStep<?, ?, ?> next : theDefinition.getSteps()) {
077                        if (!stepIds.add(next.getStepId())) {
078                                throw new ConfigurationException(
079                                                Msg.code(2046) + "Duplicate step[" + next.getStepId() + "] in definition[" + jobDefinitionId
080                                                                + "] version: " + theDefinition.getJobDefinitionVersion());
081                        }
082                }
083
084                NavigableMap<Integer, JobDefinition<?>> versionMap =
085                                myJobDefinitions.computeIfAbsent(jobDefinitionId, t -> new TreeMap<>());
086                if (versionMap.containsKey(theDefinition.getJobDefinitionVersion())) {
087                        if (versionMap.get(theDefinition.getJobDefinitionVersion()) == theDefinition) {
088                                ourLog.warn(
089                                                "job[{}] version: {} already registered.  Not registering again.",
090                                                jobDefinitionId,
091                                                theDefinition.getJobDefinitionVersion());
092                                return;
093                        }
094                        throw new ConfigurationException(Msg.code(2047) + "Multiple definitions for job[" + jobDefinitionId
095                                        + "] version: " + theDefinition.getJobDefinitionVersion());
096                }
097                versionMap.put(theDefinition.getJobDefinitionVersion(), theDefinition);
098        }
099
100        public synchronized void removeJobDefinition(@Nonnull String theDefinitionId, int theVersion) {
101                Validate.notBlank(theDefinitionId);
102                Validate.isTrue(theVersion >= 1);
103
104                NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theDefinitionId);
105                if (versionMap != null) {
106                        versionMap.remove(theVersion);
107                        if (versionMap.isEmpty()) {
108                                myJobDefinitions.remove(theDefinitionId);
109                        }
110                }
111        }
112
113        public Optional<JobDefinition<?>> getLatestJobDefinition(@Nonnull String theJobDefinitionId) {
114                NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theJobDefinitionId);
115                if (versionMap == null || versionMap.isEmpty()) {
116                        return Optional.empty();
117                }
118                return Optional.of(versionMap.lastEntry().getValue());
119        }
120
121        public Optional<JobDefinition<?>> getJobDefinition(
122                        @Nonnull String theJobDefinitionId, int theJobDefinitionVersion) {
123                NavigableMap<Integer, JobDefinition<?>> versionMap = myJobDefinitions.get(theJobDefinitionId);
124                if (versionMap == null || versionMap.isEmpty()) {
125                        return Optional.empty();
126                }
127                return Optional.of(versionMap.get(theJobDefinitionVersion));
128        }
129
130        /**
131         * @throws JobExecutionFailedException if the job definition can not be found
132         */
133        public JobDefinition<?> getJobDefinitionOrThrowException(String theJobDefinitionId, int theJobDefinitionVersion) {
134                Optional<JobDefinition<?>> opt = getJobDefinition(theJobDefinitionId, theJobDefinitionVersion);
135                if (opt.isEmpty()) {
136                        String msg =
137                                        "Unknown job definition ID[" + theJobDefinitionId + "] version[" + theJobDefinitionVersion + "]";
138                        ourLog.warn(msg);
139                        throw new JobExecutionFailedException(Msg.code(2043) + msg);
140                }
141                return opt.get();
142        }
143
144        public void setJobDefinition(JobInstance theInstance) {
145                JobDefinition<?> jobDefinition = getJobDefinitionOrThrowException(theInstance);
146                theInstance.setJobDefinition(jobDefinition);
147        }
148
149        /**
150         * @return a list of Job Definition Ids in alphabetical order
151         */
152        public List<String> getJobDefinitionIds() {
153                return myJobDefinitions.keySet().stream().sorted().collect(Collectors.toList());
154        }
155
156        public boolean isEmpty() {
157                return myJobDefinitions.isEmpty();
158        }
159
160        @SuppressWarnings("unchecked")
161        public <T extends IModelJson> JobDefinition<T> getJobDefinitionOrThrowException(JobInstance theJobInstance) {
162                return (JobDefinition<T>) getJobDefinitionOrThrowException(
163                                theJobInstance.getJobDefinitionId(), theJobInstance.getJobDefinitionVersion());
164        }
165
166        public Collection<Integer> getJobDefinitionVersions(String theDefinitionId) {
167                return myJobDefinitions
168                                .getOrDefault(theDefinitionId, ImmutableSortedMap.of())
169                                .keySet();
170        }
171}