001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.maintenance;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
024import ca.uhn.fhir.batch2.channel.BatchJobSender;
025import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
026import ca.uhn.fhir.batch2.model.JobDefinition;
027import ca.uhn.fhir.batch2.model.JobInstance;
028import ca.uhn.fhir.batch2.model.JobWorkCursor;
029import ca.uhn.fhir.batch2.model.JobWorkNotification;
030import ca.uhn.fhir.batch2.model.StatusEnum;
031import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
032import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
033import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
034import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
035import ca.uhn.fhir.model.api.IModelJson;
036import ca.uhn.fhir.model.api.PagingIterator;
037import ca.uhn.fhir.util.Logs;
038import ca.uhn.fhir.util.StopWatch;
039import org.apache.commons.lang3.time.DateUtils;
040import org.slf4j.Logger;
041import org.springframework.data.domain.Page;
042import org.springframework.data.domain.Pageable;
043
044import java.util.Iterator;
045import java.util.List;
046import java.util.Optional;
047import java.util.Set;
048
049public class JobInstanceProcessor {
050        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
051        public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY;
052
053        // 10k; we want to get as many as we can
054        private static final int WORK_CHUNK_METADATA_BATCH_SIZE = 10000;
055        private final IJobPersistence myJobPersistence;
056        private final BatchJobSender myBatchJobSender;
057        private final JobChunkProgressAccumulator myProgressAccumulator;
058        private final JobInstanceProgressCalculator myJobInstanceProgressCalculator;
059        private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
060        private final IReductionStepExecutorService myReductionStepExecutorService;
061        private final String myInstanceId;
062        private final JobDefinitionRegistry myJobDefinitionegistry;
063
064        public JobInstanceProcessor(
065                        IJobPersistence theJobPersistence,
066                        BatchJobSender theBatchJobSender,
067                        String theInstanceId,
068                        JobChunkProgressAccumulator theProgressAccumulator,
069                        IReductionStepExecutorService theReductionStepExecutorService,
070                        JobDefinitionRegistry theJobDefinitionRegistry) {
071                myJobPersistence = theJobPersistence;
072                myBatchJobSender = theBatchJobSender;
073                myInstanceId = theInstanceId;
074                myProgressAccumulator = theProgressAccumulator;
075                myReductionStepExecutorService = theReductionStepExecutorService;
076                myJobDefinitionegistry = theJobDefinitionRegistry;
077                myJobInstanceProgressCalculator =
078                                new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry);
079                myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
080        }
081
082        public void process() {
083                ourLog.debug("Starting job processing: {}", myInstanceId);
084                StopWatch stopWatch = new StopWatch();
085
086                JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null);
087                if (theInstance == null) {
088                        return;
089                }
090
091                boolean cancelUpdate = handleCancellation(theInstance);
092                if (cancelUpdate) {
093                        // reload after update
094                        theInstance = myJobPersistence.fetchInstance(myInstanceId).orElseThrow();
095                }
096
097                JobDefinition<? extends IModelJson> jobDefinition =
098                                myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance);
099
100                // move POLL_WAITING -> READY
101                processPollingChunks(theInstance.getInstanceId());
102                // determine job progress; delete CANCELED/COMPLETE/FAILED jobs that are no longer needed
103                cleanupInstance(theInstance);
104                // move gated jobs to the next step, if needed
105                // moves GATE_WAITING / QUEUED (legacy) chunks to:
106                // READY (for regular gated jobs)
107                // REDUCTION_READY (if it's the final reduction step)
108                triggerGatedExecutions(theInstance, jobDefinition);
109
110                if (theInstance.hasGatedStep() && theInstance.isRunning()) {
111                        Optional<JobInstance> updatedInstance = myJobPersistence.fetchInstance(theInstance.getInstanceId());
112
113                        if (updatedInstance.isEmpty()) {
114                                return;
115                        }
116
117                        JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
118                                        jobDefinition, updatedInstance.get().getCurrentGatedStepId());
119                        if (jobWorkCursor.isReductionStep()) {
120                                // Reduction step work chunks should never be sent to the queue but to its specific service instead.
121                                triggerReductionStep(theInstance, jobWorkCursor);
122                                return;
123                        }
124                }
125
126                // enqueue all READY chunks
127                enqueueReadyChunks(theInstance, jobDefinition);
128
129                ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
130        }
131
132        private boolean handleCancellation(JobInstance theInstance) {
133                if (theInstance.isPendingCancellationRequest()) {
134                        String errorMessage = buildCancelledMessage(theInstance);
135                        ourLog.info("Job {} moving to CANCELLED", theInstance.getInstanceId());
136                        return myJobPersistence.updateInstance(theInstance.getInstanceId(), instance -> {
137                                boolean changed = myJobInstanceStatusUpdater.updateInstanceStatus(instance, StatusEnum.CANCELLED);
138                                if (changed) {
139                                        instance.setErrorMessage(errorMessage);
140                                }
141                                return changed;
142                        });
143                }
144                return false;
145        }
146
147        private String buildCancelledMessage(JobInstance theInstance) {
148                String msg = "Job instance cancelled";
149                if (theInstance.hasGatedStep()) {
150                        msg += " while running step " + theInstance.getCurrentGatedStepId();
151                }
152                return msg;
153        }
154
155        private void cleanupInstance(JobInstance theInstance) {
156                switch (theInstance.getStatus()) {
157                        case QUEUED:
158                                // If we're still QUEUED, there are no stats to calculate
159                                break;
160                        case FINALIZE:
161                                // If we're in FINALIZE, the reduction step is working, so we should stay out of the way until it
162                                // marks the job as COMPLETED
163                                return;
164                        case IN_PROGRESS:
165                        case ERRORED:
166                                myJobInstanceProgressCalculator.calculateAndStoreInstanceProgress(theInstance.getInstanceId());
167                                break;
168                        case COMPLETED:
169                        case FAILED:
170                                if (purgeExpiredInstance(theInstance)) {
171                                        return;
172                                }
173                                break;
174                        case CANCELLED:
175                                purgeExpiredInstance(theInstance);
176                                // wipmb For 6.8 - Are we deliberately not purging chunks for cancelled jobs?  This is a very
177                                // complicated way to say that.
178                                return;
179                }
180
181                if (theInstance.isFinished() && !theInstance.isWorkChunksPurged()) {
182                        myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(theInstance.getInstanceId());
183                }
184        }
185
186        private boolean purgeExpiredInstance(JobInstance theInstance) {
187                if (theInstance.getEndTime() != null) {
188                        long cutoff = System.currentTimeMillis() - PURGE_THRESHOLD;
189                        if (theInstance.getEndTime().getTime() < cutoff) {
190                                ourLog.info("Deleting old job instance {}", theInstance.getInstanceId());
191                                myJobPersistence.deleteInstanceAndChunks(theInstance.getInstanceId());
192                                return true;
193                        }
194                }
195                return false;
196        }
197
198        private void triggerGatedExecutions(JobInstance theInstance, JobDefinition<?> theJobDefinition) {
199                // QUEUE'd jobs that are gated need to start; this step will do that
200                if (!theInstance.isRunning()
201                                && (theInstance.getStatus() != StatusEnum.QUEUED && theJobDefinition.isGatedExecution())) {
202                        ourLog.debug(
203                                        "JobInstance {} is not in a \"running\" state. Status {}",
204                                        theInstance.getInstanceId(),
205                                        theInstance.getStatus());
206                        return;
207                }
208
209                if (!theInstance.hasGatedStep()) {
210                        return;
211                }
212
213                JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
214                                theJobDefinition, theInstance.getCurrentGatedStepId());
215                String instanceId = theInstance.getInstanceId();
216                String currentStepId = jobWorkCursor.getCurrentStepId();
217                boolean canAdvance = canAdvanceGatedJob(theInstance);
218                if (canAdvance) {
219                        if (!jobWorkCursor.isFinalStep()) {
220                                // all other gated job steps except for final steps - final steps does not need to be advanced
221                                String nextStepId = jobWorkCursor.nextStep.getStepId();
222                                ourLog.info(
223                                                "All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
224                                                instanceId,
225                                                currentStepId,
226                                                nextStepId);
227
228                                processChunksForNextGatedSteps(theInstance, theJobDefinition, nextStepId);
229                        } else {
230                                ourLog.info(
231                                                "Ready to advance gated execution of instance {} but already at the final step {}. Not proceeding to advance steps.",
232                                                instanceId,
233                                                jobWorkCursor.getCurrentStepId());
234                        }
235                } else {
236                        String stepId = jobWorkCursor.nextStep != null
237                                        ? jobWorkCursor.nextStep.getStepId()
238                                        : jobWorkCursor.getCurrentStepId();
239                        ourLog.debug(
240                                        "Not ready to advance gated execution of instance {} from step {} to {}.",
241                                        instanceId,
242                                        currentStepId,
243                                        stepId);
244                }
245        }
246
247        private boolean canAdvanceGatedJob(JobInstance theInstance) {
248                // make sure our instance still exists
249                if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) {
250                        // no more job
251                        return false;
252                }
253                String currentGatedStepId = theInstance.getCurrentGatedStepId();
254
255                Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(
256                                theInstance.getInstanceId(), currentGatedStepId);
257
258                if (workChunkStatuses.isEmpty()) {
259                        // no work chunks = no output
260                        // trivial to advance to next step
261                        ourLog.info("No workchunks for {} in step id {}", theInstance.getInstanceId(), currentGatedStepId);
262                        return true;
263                }
264
265                // all workchunks for the current step are in COMPLETED -> proceed.
266                return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
267        }
268
269        protected PagingIterator<WorkChunkMetadata> getReadyChunks() {
270                return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> {
271                        Pageable pageable = Pageable.ofSize(batchsize).withPage(index);
272                        Page<WorkChunkMetadata> results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
273                                        pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY));
274                        for (WorkChunkMetadata metadata : results) {
275                                consumer.accept(metadata);
276                        }
277                });
278        }
279
280        /**
281         * Trigger the reduction step for the given job instance. Reduction step chunks should never be queued.
282         */
283        private void triggerReductionStep(JobInstance theInstance, JobWorkCursor<?, ?, ?> jobWorkCursor) {
284                String instanceId = theInstance.getInstanceId();
285                ourLog.debug("Triggering Reduction step {} of instance {}.", jobWorkCursor.getCurrentStepId(), instanceId);
286                myReductionStepExecutorService.triggerReductionStep(instanceId, jobWorkCursor);
287        }
288
289        /**
290         * Chunks are initially created in READY state.
291         * We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka)
292         * for processing.
293         */
294        private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
295                Iterator<WorkChunkMetadata> iter = getReadyChunks();
296
297                int counter = 0;
298                while (iter.hasNext()) {
299                        WorkChunkMetadata metadata = iter.next();
300
301                        /*
302                         * For each chunk id
303                         * * Move to QUEUE'd
304                         * * Send to topic
305                         * * flush changes
306                         * * commit
307                         */
308                        updateChunkAndSendToQueue(metadata);
309                        counter++;
310                }
311                ourLog.debug(
312                                "Encountered {} READY work chunks for job {} of type {}",
313                                counter,
314                                theJobInstance.getInstanceId(),
315                                theJobDefinition.getJobDefinitionId());
316        }
317
318        /**
319         * Updates the Work Chunk and sends it to the queue.
320         *
321         * Because ReductionSteps are done inline by the maintenance pass,
322         * those will not be sent to the queue (but they will still have their
323         * status updated from READY -> QUEUED).
324         *
325         * Returns true after processing.
326         */
327        private void updateChunkAndSendToQueue(WorkChunkMetadata theChunk) {
328                String chunkId = theChunk.getId();
329                myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> {
330                        ourLog.info("Updated {} workchunk with id {}", updated, chunkId);
331                        if (updated == 1) {
332                                sendNotification(theChunk);
333                        } else {
334                                // means the work chunk is likely already gone...
335                                // we'll log and skip it. If it's still in the DB, the next pass
336                                // will pick it up. Otherwise, it's no longer important
337                                ourLog.error(
338                                                "Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.",
339                                                theChunk.getInstanceId(),
340                                                theChunk.getId(),
341                                                updated);
342                        }
343                });
344        }
345
346        private void sendNotification(WorkChunkMetadata theChunk) {
347                // send to the queue
348                // we use current step id because it has not been moved to the next step (yet)
349                JobWorkNotification workNotification = new JobWorkNotification(
350                                theChunk.getJobDefinitionId(),
351                                theChunk.getJobDefinitionVersion(),
352                                theChunk.getInstanceId(),
353                                theChunk.getTargetStepId(),
354                                theChunk.getId());
355                myBatchJobSender.sendWorkChannelMessage(workNotification);
356        }
357
358        private void processChunksForNextGatedSteps(
359                        JobInstance theInstance, JobDefinition<?> theJobDefinition, String nextStepId) {
360                String instanceId = theInstance.getInstanceId();
361
362                List<String> gateWaitingChunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus(
363                                instanceId, nextStepId, WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED);
364                int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId);
365                if (totalChunksForNextStep != gateWaitingChunksForNextStep.size()) {
366                        ourLog.debug(
367                                        "Total ProgressAccumulator GATE_WAITING chunk count does not match GATE_WAITING chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]",
368                                        instanceId,
369                                        nextStepId,
370                                        totalChunksForNextStep,
371                                        gateWaitingChunksForNextStep.size());
372                }
373
374                JobWorkCursor<?, ?, ?> jobWorkCursor =
375                                JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, nextStepId);
376
377                // update the job step so the workers will process them.
378                // Sets all chunks from QUEUED/GATE_WAITING -> READY (REDUCTION_READY for reduction jobs)
379                myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId, jobWorkCursor.isReductionStep());
380        }
381
382        /**
383         * Moves all POLL_WAITING work chunks to READY for work chunks whose
384         * nextPollTime has expired.
385         */
386        private void processPollingChunks(String theInstanceId) {
387                int updatedChunkCount = myJobPersistence.updatePollWaitingChunksForJobIfReady(theInstanceId);
388
389                ourLog.debug(
390                                "Moved {} Work Chunks in POLL_WAITING to READY for Job Instance {}", updatedChunkCount, theInstanceId);
391        }
392}