001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.coordinator;
021
022import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
023import ca.uhn.fhir.batch2.api.IJobPersistence;
024import ca.uhn.fhir.batch2.model.JobDefinition;
025import ca.uhn.fhir.batch2.model.JobInstance;
026import ca.uhn.fhir.batch2.model.JobWorkCursor;
027import ca.uhn.fhir.batch2.model.StatusEnum;
028import ca.uhn.fhir.batch2.model.WorkChunk;
029import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
030import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils;
031import ca.uhn.fhir.model.api.IModelJson;
032import ca.uhn.fhir.util.Logs;
033import io.opentelemetry.instrumentation.annotations.WithSpan;
034import jakarta.annotation.Nonnull;
035import org.slf4j.Logger;
036
037import java.util.Date;
038
039import static ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils.JOB_STEP_EXECUTION_SPAN_NAME;
040
041public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
042        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
043
044        private final IJobPersistence myJobPersistence;
045        private final WorkChunkProcessor myJobExecutorSvc;
046        private final IJobMaintenanceService myJobMaintenanceService;
047        private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
048
049        private final JobDefinition<PT> myDefinition;
050        private final JobInstance myInstance;
051        private final String myInstanceId;
052        private final WorkChunk myWorkChunk;
053        private final JobWorkCursor<PT, IT, OT> myCursor;
054
055        JobStepExecutor(
056                        @Nonnull IJobPersistence theJobPersistence,
057                        @Nonnull JobInstance theInstance,
058                        WorkChunk theWorkChunk,
059                        @Nonnull JobWorkCursor<PT, IT, OT> theCursor,
060                        @Nonnull WorkChunkProcessor theExecutor,
061                        @Nonnull IJobMaintenanceService theJobMaintenanceService,
062                        @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) {
063                myJobPersistence = theJobPersistence;
064                myDefinition = theCursor.jobDefinition;
065                myInstance = theInstance;
066                myInstanceId = theInstance.getInstanceId();
067                myWorkChunk = theWorkChunk;
068                myCursor = theCursor;
069                myJobExecutorSvc = theExecutor;
070                myJobMaintenanceService = theJobMaintenanceService;
071                myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
072        }
073
074        @WithSpan(JOB_STEP_EXECUTION_SPAN_NAME)
075        public void executeStep() {
076
077                BatchJobOpenTelemetryUtils.addAttributesToCurrentSpan(
078                                myInstance.getJobDefinitionId(),
079                                myInstance.getJobDefinitionVersion(),
080                                myInstance.getInstanceId(),
081                                myCursor.getCurrentStepId(),
082                                myWorkChunk == null ? null : myWorkChunk.getId());
083
084                JobStepExecutorOutput<PT, IT, OT> stepExecutorOutput =
085                                myJobExecutorSvc.doExecution(myCursor, myInstance, myWorkChunk);
086
087                if (!stepExecutorOutput.isSuccessful()) {
088                        return;
089                }
090
091                /**
092                 * Jobs are completed in {@link ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator#calculateInstanceProgress}
093                 * We determine if the job is complete based on if there are *any* completed work chunks.
094                 * So if there are no COMPLETED work chunks (ie, first step produces no work chunks)
095                 * we must complete it here.
096                 */
097                if (stepExecutorOutput.getDataSink().firstStepProducedNothing() && !myDefinition.isLastStepReduction()) {
098                        ourLog.info(
099                                        "First step of job myInstance {} produced no work chunks and last step is not a reduction, "
100                                                        + "marking as completed and setting end date",
101                                        myInstanceId);
102                        myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> {
103                                instance.setEndTime(new Date());
104                                myJobInstanceStatusUpdater.updateInstanceStatus(instance, StatusEnum.COMPLETED);
105                                return true;
106                        });
107                }
108
109                // This flag could be stale, but checking for fast-track is a safe operation.
110                if (myInstance.isFastTracking()) {
111                        handleFastTracking(stepExecutorOutput.getDataSink());
112                }
113        }
114
115        private void handleFastTracking(BaseDataSink<PT, IT, OT> theDataSink) {
116                if (theDataSink.getWorkChunkCount() <= 1) {
117                        ourLog.debug(
118                                        "Gated job {} step {} produced exactly one chunk:  Triggering a maintenance pass.",
119                                        myDefinition.getJobDefinitionId(),
120                                        myCursor.currentStep.getStepId());
121                        // wipmb 6.8 either delete fast-tracking, or narrow this call to just this instance and step
122                        // This runs full maintenance for EVERY job as each chunk completes in a fast tracked job.  That's a LOT of
123                        // work.
124                        boolean success = myJobMaintenanceService.triggerMaintenancePass();
125                        if (!success) {
126                                myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> {
127                                        instance.setFastTracking(false);
128                                        return true;
129                                });
130                        }
131                } else {
132                        ourLog.debug(
133                                        "Gated job {} step {} produced {} chunks:  Disabling fast tracking.",
134                                        myDefinition.getJobDefinitionId(),
135                                        myCursor.currentStep.getStepId(),
136                                        theDataSink.getWorkChunkCount());
137                        myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> {
138                                instance.setFastTracking(false);
139                                return true;
140                        });
141                }
142        }
143}