001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.coordinator;
021
022import ca.uhn.fhir.batch2.api.IJobCoordinator;
023import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
024import ca.uhn.fhir.batch2.api.IJobPersistence;
025import ca.uhn.fhir.batch2.api.JobOperationResultJson;
026import ca.uhn.fhir.batch2.channel.BatchJobSender;
027import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
028import ca.uhn.fhir.batch2.model.JobDefinition;
029import ca.uhn.fhir.batch2.model.JobInstance;
030import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
031import ca.uhn.fhir.batch2.model.StatusEnum;
032import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
033import ca.uhn.fhir.i18n.Msg;
034import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
035import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
036import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
037import ca.uhn.fhir.rest.api.server.RequestDetails;
038import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
039import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
040import ca.uhn.fhir.util.Logs;
041import jakarta.annotation.Nonnull;
042import jakarta.annotation.Nullable;
043import jakarta.annotation.PostConstruct;
044import jakarta.annotation.PreDestroy;
045import org.apache.commons.lang3.Validate;
046import org.slf4j.Logger;
047import org.springframework.data.domain.Page;
048import org.springframework.messaging.MessageHandler;
049import org.springframework.transaction.annotation.Propagation;
050
051import java.util.Arrays;
052import java.util.HashSet;
053import java.util.List;
054import java.util.Set;
055
056import static org.apache.commons.lang3.StringUtils.isBlank;
057
058public class JobCoordinatorImpl implements IJobCoordinator {
059        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
060
061        private final IJobPersistence myJobPersistence;
062        private final BatchJobSender myBatchJobSender;
063        private final IChannelReceiver myWorkChannelReceiver;
064        private final JobDefinitionRegistry myJobDefinitionRegistry;
065        private final MessageHandler myReceiverHandler;
066        private final JobQuerySvc myJobQuerySvc;
067        private final JobParameterJsonValidator myJobParameterJsonValidator;
068        private final IHapiTransactionService myTransactionService;
069
070        /**
071         * Constructor
072         */
073        public JobCoordinatorImpl(
074                        @Nonnull BatchJobSender theBatchJobSender,
075                        @Nonnull IChannelReceiver theWorkChannelReceiver,
076                        @Nonnull IJobPersistence theJobPersistence,
077                        @Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
078                        @Nonnull WorkChunkProcessor theExecutorSvc,
079                        @Nonnull IJobMaintenanceService theJobMaintenanceService,
080                        @Nonnull IHapiTransactionService theTransactionService) {
081                Validate.notNull(theJobPersistence);
082
083                myJobPersistence = theJobPersistence;
084                myBatchJobSender = theBatchJobSender;
085                myWorkChannelReceiver = theWorkChannelReceiver;
086                myJobDefinitionRegistry = theJobDefinitionRegistry;
087
088                myReceiverHandler = new WorkChannelMessageHandler(
089                                theJobPersistence,
090                                theJobDefinitionRegistry,
091                                theBatchJobSender,
092                                theExecutorSvc,
093                                theJobMaintenanceService,
094                                theTransactionService);
095                myJobQuerySvc = new JobQuerySvc(theJobPersistence, theJobDefinitionRegistry);
096                myJobParameterJsonValidator = new JobParameterJsonValidator();
097                myTransactionService = theTransactionService;
098        }
099
100        @Override
101        public Batch2JobStartResponse startInstance(
102                        RequestDetails theRequestDetails, JobInstanceStartRequest theStartRequest) {
103                String paramsString = theStartRequest.getParameters();
104                if (isBlank(paramsString)) {
105                        throw new InvalidRequestException(Msg.code(2065) + "No parameters supplied");
106                }
107                Validate.notBlank(theStartRequest.getJobDefinitionId(), "No job definition ID supplied in start request");
108
109                // if cache - use that first
110                if (theStartRequest.isUseCache()) {
111                        FetchJobInstancesRequest request = new FetchJobInstancesRequest(
112                                        theStartRequest.getJobDefinitionId(), theStartRequest.getParameters(), getStatesThatTriggerCache());
113
114                        List<JobInstance> existing = myJobPersistence.fetchInstances(request, 0, 1000);
115                        if (!existing.isEmpty()) {
116                                // we'll look for completed ones first... otherwise, take any of the others
117                                existing.sort(
118                                                (o1, o2) -> -(o1.getStatus().ordinal() - o2.getStatus().ordinal()));
119
120                                JobInstance first = existing.stream().findFirst().orElseThrow();
121
122                                Batch2JobStartResponse response = new Batch2JobStartResponse();
123                                response.setInstanceId(first.getInstanceId());
124                                response.setUsesCachedResult(true);
125
126                                ourLog.info(
127                                                "Reusing cached {} job with status {} and id {}",
128                                                first.getJobDefinitionId(),
129                                                first.getStatus(),
130                                                first.getInstanceId());
131
132                                return response;
133                        }
134                }
135
136                JobDefinition<?> jobDefinition = myJobDefinitionRegistry
137                                .getLatestJobDefinition(theStartRequest.getJobDefinitionId())
138                                .orElseThrow(() -> new IllegalArgumentException(
139                                                Msg.code(2063) + "Unknown job definition ID: " + theStartRequest.getJobDefinitionId()));
140
141                myJobParameterJsonValidator.validateJobParameters(theRequestDetails, theStartRequest, jobDefinition);
142
143                // we only create the first chunk amd job here
144                // JobMaintenanceServiceImpl.doMaintenancePass will handle the rest
145                IJobPersistence.CreateResult instanceAndFirstChunk = myTransactionService
146                                .withSystemRequestOnDefaultPartition()
147                                .withPropagation(Propagation.REQUIRES_NEW)
148                                .execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters()));
149
150                Batch2JobStartResponse response = new Batch2JobStartResponse();
151                response.setInstanceId(instanceAndFirstChunk.jobInstanceId);
152                return response;
153        }
154
155        /**
156         * Cache will be used if an identical job is QUEUED or IN_PROGRESS. Otherwise a new one will kickoff.
157         */
158        private StatusEnum[] getStatesThatTriggerCache() {
159                return new StatusEnum[] {StatusEnum.QUEUED, StatusEnum.IN_PROGRESS};
160        }
161
162        @Override
163        @Nonnull
164        public JobInstance getInstance(String theInstanceId) {
165                return myJobQuerySvc.fetchInstance(theInstanceId);
166        }
167
168        @Override
169        public List<JobInstance> getInstances(int thePageSize, int thePageIndex) {
170                return myJobQuerySvc.fetchInstances(thePageSize, thePageIndex);
171        }
172
173        @Override
174        public List<JobInstance> getRecentInstances(int theCount, int theStart) {
175                return myJobQuerySvc.fetchRecentInstances(theCount, theStart);
176        }
177
178        @Override
179        public List<JobInstance> getInstancesbyJobDefinitionIdAndEndedStatus(
180                        String theJobDefinitionId, @Nullable Boolean theEnded, int theCount, int theStart) {
181                return myJobQuerySvc.getInstancesByJobDefinitionIdAndEndedStatus(
182                                theJobDefinitionId, theEnded, theCount, theStart);
183        }
184
185        @Override
186        public List<JobInstance> getJobInstancesByJobDefinitionIdAndStatuses(
187                        String theJobDefinitionId, Set<StatusEnum> theStatuses, int theCount, int theStart) {
188                return myJobQuerySvc.getInstancesByJobDefinitionAndStatuses(
189                                theJobDefinitionId, theStatuses, theCount, theStart);
190        }
191
192        @Override
193        public List<JobInstance> getJobInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart) {
194                return getJobInstancesByJobDefinitionIdAndStatuses(
195                                theJobDefinitionId, new HashSet<>(Arrays.asList(StatusEnum.values())), theCount, theStart);
196        }
197
198        @Override
199        public Page<JobInstance> fetchAllJobInstances(JobInstanceFetchRequest theFetchRequest) {
200                return myJobQuerySvc.fetchAllInstances(theFetchRequest);
201        }
202
203        // wipmb For 6.8 - Clarify this interface. We currently return a JobOperationResultJson, and don't throw
204        // ResourceNotFoundException
205        @Override
206        public JobOperationResultJson cancelInstance(String theInstanceId) throws ResourceNotFoundException {
207                return myJobPersistence.cancelInstance(theInstanceId);
208        }
209
210        @PostConstruct
211        public void start() {
212                myWorkChannelReceiver.subscribe(myReceiverHandler);
213        }
214
215        @PreDestroy
216        public void stop() {
217                myWorkChannelReceiver.unsubscribe(myReceiverHandler);
218        }
219}