001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.coordinator;
021
022import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
023import ca.uhn.fhir.batch2.api.IJobCompletionHandler;
024import ca.uhn.fhir.batch2.api.IJobPersistence;
025import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
026import ca.uhn.fhir.batch2.api.IReductionStepWorker;
027import ca.uhn.fhir.batch2.api.JobCompletionDetails;
028import ca.uhn.fhir.batch2.api.StepExecutionDetails;
029import ca.uhn.fhir.batch2.model.ChunkOutcome;
030import ca.uhn.fhir.batch2.model.JobDefinitionStep;
031import ca.uhn.fhir.batch2.model.JobInstance;
032import ca.uhn.fhir.batch2.model.JobWorkCursor;
033import ca.uhn.fhir.batch2.model.StatusEnum;
034import ca.uhn.fhir.batch2.model.WorkChunk;
035import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
036import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils;
037import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
038import ca.uhn.fhir.jpa.model.sched.HapiJob;
039import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
040import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
041import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
042import ca.uhn.fhir.model.api.IModelJson;
043import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
044import com.google.common.annotations.VisibleForTesting;
045import io.opentelemetry.instrumentation.annotations.WithSpan;
046import jakarta.annotation.Nonnull;
047import org.apache.commons.lang3.time.DateUtils;
048import org.quartz.JobExecutionContext;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051import org.springframework.beans.factory.annotation.Autowired;
052import org.springframework.context.event.ContextClosedEvent;
053import org.springframework.context.event.ContextRefreshedEvent;
054import org.springframework.context.event.EventListener;
055import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
056import org.springframework.transaction.annotation.Propagation;
057
058import java.util.Collections;
059import java.util.EnumSet;
060import java.util.LinkedHashMap;
061import java.util.Map;
062import java.util.Timer;
063import java.util.TimerTask;
064import java.util.concurrent.Callable;
065import java.util.concurrent.ExecutorService;
066import java.util.concurrent.Executors;
067import java.util.concurrent.Semaphore;
068import java.util.concurrent.atomic.AtomicReference;
069import java.util.stream.Stream;
070
071import static ca.uhn.fhir.batch2.model.StatusEnum.COMPLETED;
072import static ca.uhn.fhir.batch2.model.StatusEnum.ERRORED;
073import static ca.uhn.fhir.batch2.model.StatusEnum.FINALIZE;
074import static ca.uhn.fhir.batch2.model.StatusEnum.IN_PROGRESS;
075import static ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils.JOB_STEP_EXECUTION_SPAN_NAME;
076
077public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorService, IHasScheduledJobs {
078        public static final String SCHEDULED_JOB_ID = ReductionStepExecutorScheduledJob.class.getName();
079        private static final Logger ourLog = LoggerFactory.getLogger(ReductionStepExecutorServiceImpl.class);
080        private final Map<String, JobWorkCursor> myInstanceIdToJobWorkCursor =
081                        Collections.synchronizedMap(new LinkedHashMap<>());
082        private final ExecutorService myReducerExecutor;
083        private final IJobPersistence myJobPersistence;
084        private final IHapiTransactionService myTransactionService;
085        private final Semaphore myCurrentlyExecuting = new Semaphore(1);
086        private final AtomicReference<String> myCurrentlyFinalizingInstanceId = new AtomicReference<>();
087        private final JobDefinitionRegistry myJobDefinitionRegistry;
088        private Timer myHeartbeatTimer;
089
090        /**
091         * Constructor
092         */
093        public ReductionStepExecutorServiceImpl(
094                        IJobPersistence theJobPersistence,
095                        IHapiTransactionService theTransactionService,
096                        JobDefinitionRegistry theJobDefinitionRegistry) {
097                myJobPersistence = theJobPersistence;
098                myTransactionService = theTransactionService;
099                myJobDefinitionRegistry = theJobDefinitionRegistry;
100
101                myReducerExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("batch2-reducer"));
102        }
103
104        @EventListener(ContextRefreshedEvent.class)
105        public void start() {
106                if (myHeartbeatTimer == null) {
107                        myHeartbeatTimer = new Timer("batch2-reducer-heartbeat");
108                        myHeartbeatTimer.schedule(
109                                        new HeartbeatTimerTask(), DateUtils.MILLIS_PER_MINUTE, DateUtils.MILLIS_PER_MINUTE);
110                }
111        }
112
113        private void runHeartbeat() {
114                String currentlyFinalizingInstanceId = myCurrentlyFinalizingInstanceId.get();
115                if (currentlyFinalizingInstanceId != null) {
116                        ourLog.info("Running heartbeat for instance: {}", currentlyFinalizingInstanceId);
117                        executeInTransactionWithSynchronization(() -> {
118                                myJobPersistence.updateInstanceUpdateTime(currentlyFinalizingInstanceId);
119                                return null;
120                        });
121                }
122        }
123
124        @EventListener(ContextClosedEvent.class)
125        public void shutdown() {
126                if (myHeartbeatTimer != null) {
127                        myHeartbeatTimer.cancel();
128                        myHeartbeatTimer = null;
129                }
130        }
131
132        @Override
133        public void triggerReductionStep(String theInstanceId, JobWorkCursor<?, ?, ?> theJobWorkCursor) {
134                myInstanceIdToJobWorkCursor.putIfAbsent(theInstanceId, theJobWorkCursor);
135                if (myCurrentlyExecuting.availablePermits() > 0) {
136                        myReducerExecutor.submit(this::reducerPass);
137                }
138        }
139
140        @Override
141        public void reducerPass() {
142                if (myCurrentlyExecuting.tryAcquire()) {
143                        try {
144                                String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]);
145                                if (instanceIds.length > 0) {
146                                        String instanceId = instanceIds[0];
147                                        myCurrentlyFinalizingInstanceId.set(instanceId);
148                                        JobWorkCursor<?, ?, ?> jobWorkCursor = myInstanceIdToJobWorkCursor.get(instanceId);
149                                        executeReductionStep(instanceId, jobWorkCursor);
150
151                                        // If we get here, this succeeded. Purge the instance from the work queue
152                                        myInstanceIdToJobWorkCursor.remove(instanceId);
153                                }
154
155                        } catch (Exception e) {
156                                ourLog.error("Failed to execute reducer pass", e);
157                        } finally {
158                                myCurrentlyFinalizingInstanceId.set(null);
159                                myCurrentlyExecuting.release();
160                        }
161                }
162        }
163
164        @VisibleForTesting
165        @WithSpan(JOB_STEP_EXECUTION_SPAN_NAME)
166        <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
167                        ReductionStepChunkProcessingResponse executeReductionStep(
168                                        String theInstanceId, JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
169
170                BatchJobOpenTelemetryUtils.addAttributesToCurrentSpan(
171                                theJobWorkCursor.getJobDefinition().getJobDefinitionId(),
172                                theJobWorkCursor.getJobDefinition().getJobDefinitionVersion(),
173                                theInstanceId,
174                                theJobWorkCursor.getCurrentStepId(),
175                                null);
176
177                JobDefinitionStep<PT, IT, OT> step = theJobWorkCursor.getCurrentStep();
178
179                // wipmb For 6.8 - this runs four tx. That's at least 2 too many
180                // combine the fetch and the case statement.  Use optional for the boolean.
181                JobInstance instance = executeInTransactionWithSynchronization(() -> myJobPersistence
182                                .fetchInstance(theInstanceId)
183                                .orElseThrow(() -> new InternalErrorException("Unknown instance: " + theInstanceId)));
184
185                boolean shouldProceed = false;
186                switch (instance.getStatus()) {
187                        case IN_PROGRESS:
188                        case ERRORED:
189                                // this will take a write lock on the JobInstance, preventing duplicates.
190                                boolean changed =
191                                                executeInTransactionWithSynchronization(() -> myJobPersistence.markInstanceAsStatusWhenStatusIn(
192                                                                instance.getInstanceId(), FINALIZE, EnumSet.of(IN_PROGRESS, ERRORED)));
193                                if (changed) {
194                                        ourLog.info(
195                                                        "Job instance {} has been set to FINALIZE state - Beginning reducer step",
196                                                        instance.getInstanceId());
197                                        shouldProceed = true;
198                                }
199                                break;
200                        case FINALIZE:
201                        case COMPLETED:
202                        case FAILED:
203                        case QUEUED:
204                        case CANCELLED:
205                                break;
206                }
207
208                if (!shouldProceed) {
209                        ourLog.warn(
210                                        "JobInstance[{}] should not be finalized at this time. In memory status is {}. Reduction step will not rerun!"
211                                                        + " This could be a long running reduction job resulting in the processed msg not being acknowledged,"
212                                                        + " or the result of a failed process or server restarting.",
213                                        instance.getInstanceId(),
214                                        instance.getStatus());
215                        return new ReductionStepChunkProcessingResponse(false);
216                }
217
218                PT parameters =
219                                instance.getParameters(theJobWorkCursor.getJobDefinition().getParametersType());
220                IReductionStepWorker<PT, IT, OT> reductionStepWorker =
221                                (IReductionStepWorker<PT, IT, OT>) step.getJobStepWorker();
222
223                instance.setStatus(FINALIZE);
224
225                boolean defaultSuccessValue = true;
226                ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);
227
228                try {
229                        processChunksAndCompleteJob(theJobWorkCursor, step, instance, parameters, reductionStepWorker, response);
230                } catch (Exception ex) {
231                        ourLog.error("Job completion failed for Job {}", instance.getInstanceId(), ex);
232
233                        executeInTransactionWithSynchronization(() -> {
234                                myJobPersistence.updateInstance(instance.getInstanceId(), theInstance -> {
235                                        theInstance.setStatus(StatusEnum.FAILED);
236                                        return true;
237                                });
238                                return null;
239                        });
240                        response.setSuccessful(false);
241                }
242
243                // if no successful chunks, return false
244                if (!response.hasSuccessfulChunksIds()) {
245                        response.setSuccessful(false);
246                }
247
248                return response;
249        }
250
251        private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunksAndCompleteJob(
252                        JobWorkCursor<PT, IT, OT> theJobWorkCursor,
253                        JobDefinitionStep<PT, IT, OT> step,
254                        JobInstance instance,
255                        PT parameters,
256                        IReductionStepWorker<PT, IT, OT> reductionStepWorker,
257                        ReductionStepChunkProcessingResponse response) {
258                try {
259                        executeInTransactionWithSynchronization(() -> {
260                                try (Stream<WorkChunk> chunkIterator =
261                                                myJobPersistence.fetchAllWorkChunksForStepStream(instance.getInstanceId(), step.getStepId())) {
262                                        chunkIterator.forEach(chunk ->
263                                                        processChunk(chunk, instance, parameters, reductionStepWorker, response, theJobWorkCursor));
264                                }
265                                return null;
266                        });
267                } finally {
268                        executeInTransactionWithSynchronization(() -> {
269                                ourLog.info(
270                                                "Reduction step for instance[{}] produced {} successful and {} failed chunks",
271                                                instance.getInstanceId(),
272                                                response.getSuccessfulChunkIds().size(),
273                                                response.getFailedChunksIds().size());
274
275                                ReductionStepDataSink<PT, IT, OT> dataSink = new ReductionStepDataSink<>(
276                                                instance.getInstanceId(), theJobWorkCursor, myJobPersistence, myJobDefinitionRegistry);
277                                StepExecutionDetails<PT, IT> chunkDetails =
278                                                StepExecutionDetails.createReductionStepDetails(parameters, null, instance);
279
280                                if (response.isSuccessful()) {
281                                        reductionStepWorker.run(chunkDetails, dataSink);
282
283                                        // the ReductionStepDataSink will update the job status to COMPLETED
284                                        // we should update instance here to keep it consistent with the newest version in persistence
285                                        instance.setStatus(COMPLETED);
286                                }
287
288                                if (response.hasSuccessfulChunksIds()) {
289                                        // complete the steps without making a new work chunk
290                                        myJobPersistence.markWorkChunksWithStatusAndWipeData(
291                                                        instance.getInstanceId(),
292                                                        response.getSuccessfulChunkIds(),
293                                                        WorkChunkStatusEnum.COMPLETED,
294                                                        null // error message - none
295                                                        );
296                                }
297
298                                if (response.hasFailedChunkIds()) {
299                                        // mark any failed chunks as failed for aborting
300                                        myJobPersistence.markWorkChunksWithStatusAndWipeData(
301                                                        instance.getInstanceId(),
302                                                        response.getFailedChunksIds(),
303                                                        WorkChunkStatusEnum.FAILED,
304                                                        "JOB ABORTED");
305                                }
306
307                                if (response.isSuccessful()) {
308                                        /**
309                                         * All reduction steps are final steps.
310                                         */
311                                        IJobCompletionHandler<PT> completionHandler =
312                                                        theJobWorkCursor.getJobDefinition().getCompletionHandler();
313                                        if (completionHandler != null) {
314                                                completionHandler.jobComplete(new JobCompletionDetails<>(parameters, instance));
315                                        }
316                                }
317
318                                return null;
319                        });
320                }
321        }
322
323        private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) {
324                return myTransactionService
325                                .withRequest(null)
326                                .withPropagation(Propagation.REQUIRES_NEW)
327                                .execute(runnable);
328        }
329
330        @Override
331        public void scheduleJobs(ISchedulerService theSchedulerService) {
332                theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, buildJobDefinition());
333        }
334
335        @Nonnull
336        private ScheduledJobDefinition buildJobDefinition() {
337                ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
338                jobDefinition.setId(SCHEDULED_JOB_ID);
339                jobDefinition.setJobClass(ReductionStepExecutorScheduledJob.class);
340                return jobDefinition;
341        }
342
343        private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunk(
344                        WorkChunk theChunk,
345                        JobInstance theInstance,
346                        PT theParameters,
347                        IReductionStepWorker<PT, IT, OT> theReductionStepWorker,
348                        ReductionStepChunkProcessingResponse theResponseObject,
349                        JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
350
351                /*
352                 * Reduction steps are done inline and only on gated jobs.
353                 */
354                if (theChunk.getStatus() == WorkChunkStatusEnum.COMPLETED) {
355                        // This should never happen since jobs with reduction are required to be gated
356                        ourLog.error(
357                                        "Unexpected chunk {} with status {} found while reducing {}.  No chunks feeding into a reduction step should be in a state other than READY.",
358                                        theChunk.getId(),
359                                        theChunk.getStatus(),
360                                        theInstance);
361                        return;
362                }
363
364                if (theResponseObject.hasFailedChunkIds()) {
365                        // we are going to fail all future chunks now
366                        theResponseObject.addFailedChunkId(theChunk);
367                } else {
368                        try {
369                                // feed them into our reduction worker
370                                // this is the most likely area to throw,
371                                // as this is where db actions and processing is likely to happen
372                                IT chunkData =
373                                                theChunk.getData(theJobWorkCursor.getCurrentStep().getInputType());
374                                ChunkExecutionDetails<PT, IT> chunkDetails = new ChunkExecutionDetails<>(
375                                                chunkData, theParameters, theInstance.getInstanceId(), theChunk.getId());
376
377                                ChunkOutcome outcome = theReductionStepWorker.consume(chunkDetails);
378
379                                switch (outcome.getStatus()) {
380                                        case SUCCESS:
381                                                theResponseObject.addSuccessfulChunkId(theChunk);
382                                                break;
383
384                                        case FAILED:
385                                                ourLog.error("Processing of work chunk {} resulted in aborting job.", theChunk.getId());
386
387                                                // fail entire job - including all future workchunks
388                                                theResponseObject.addFailedChunkId(theChunk);
389                                                theResponseObject.setSuccessful(false);
390                                                break;
391                                }
392                        } catch (Exception e) {
393                                String msg = String.format(
394                                                "Reduction step failed to execute chunk reduction for chunk %s with exception: %s.",
395                                                theChunk.getId(), e.getMessage());
396                                // we got a failure in a reduction
397                                ourLog.error(msg, e);
398                                theResponseObject.setSuccessful(false);
399
400                                myJobPersistence.onWorkChunkFailed(theChunk.getId(), msg);
401                        }
402                }
403        }
404
405        private class HeartbeatTimerTask extends TimerTask {
406                @Override
407                public void run() {
408                        runHeartbeat();
409                }
410        }
411
412        public static class ReductionStepExecutorScheduledJob implements HapiJob {
413                @Autowired
414                private IReductionStepExecutorService myTarget;
415
416                @Override
417                public void execute(JobExecutionContext theContext) {
418                        myTarget.reducerPass();
419                }
420        }
421}