001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.coordinator;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.IJobStepWorker;
024import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
025import ca.uhn.fhir.batch2.api.JobStepFailedException;
026import ca.uhn.fhir.batch2.api.RetryChunkLaterException;
027import ca.uhn.fhir.batch2.api.RunOutcome;
028import ca.uhn.fhir.batch2.api.StepExecutionDetails;
029import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
032import ca.uhn.fhir.i18n.Msg;
033import ca.uhn.fhir.model.api.IModelJson;
034import ca.uhn.fhir.util.Logs;
035import org.apache.commons.lang3.Validate;
036import org.slf4j.Logger;
037
038import java.time.Instant;
039import java.time.temporal.ChronoUnit;
040import java.util.Date;
041
042public class StepExecutor {
043        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
044        private final IJobPersistence myJobPersistence;
045
046        public StepExecutor(IJobPersistence theJobPersistence) {
047                myJobPersistence = theJobPersistence;
048        }
049
050        /**
051         * Calls the worker execution step, and performs error handling logic for jobs that failed.
052         */
053        <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeStep(
054                        StepExecutionDetails<PT, IT> theStepExecutionDetails,
055                        IJobStepWorker<PT, IT, OT> theStepWorker,
056                        BaseDataSink<PT, IT, OT> theDataSink) {
057                String jobDefinitionId = theDataSink.getJobDefinitionId();
058                String targetStepId = theDataSink.getTargetStep().getStepId();
059                String chunkId = theStepExecutionDetails.getChunkId();
060
061                RunOutcome outcome;
062                try {
063                        outcome = theStepWorker.run(theStepExecutionDetails, theDataSink);
064                        Validate.notNull(outcome, "Step theWorker returned null: %s", theStepWorker.getClass());
065                } catch (RetryChunkLaterException ex) {
066                        Date nextPollTime = Date.from(Instant.now().plus(ex.getNextPollDuration()));
067                        ourLog.debug(
068                                        "Polling job encountered; will retry chunk {} after after {}s",
069                                        theStepExecutionDetails.getChunkId(),
070                                        ex.getNextPollDuration().get(ChronoUnit.SECONDS));
071                        myJobPersistence.onWorkChunkPollDelay(theStepExecutionDetails.getChunkId(), nextPollTime);
072                        return false;
073                } catch (JobExecutionFailedException e) {
074                        ourLog.error(
075                                        "Unrecoverable failure executing job {} step {} chunk {}",
076                                        jobDefinitionId,
077                                        targetStepId,
078                                        chunkId,
079                                        e);
080                        if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
081                                myJobPersistence.onWorkChunkFailed(chunkId, e.toString());
082                        }
083                        return false;
084                } catch (Exception e) {
085                        if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
086                                ourLog.info(
087                                                "Temporary problem executing job {} step {}, marking chunk {} as retriable ERRORED",
088                                                jobDefinitionId,
089                                                targetStepId,
090                                                chunkId);
091                                WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(chunkId, e.getMessage());
092                                WorkChunkStatusEnum newStatus = myJobPersistence.onWorkChunkError(parameters);
093                                if (newStatus == WorkChunkStatusEnum.FAILED) {
094                                        ourLog.error(
095                                                        "Exhausted retries:  Failure executing job {} step {}, marking chunk {} as ERRORED",
096                                                        jobDefinitionId,
097                                                        targetStepId,
098                                                        chunkId,
099                                                        e);
100                                        return false;
101                                }
102                        } else {
103                                ourLog.error(
104                                                "Failure executing job {} step {}, no associated work chunk", jobDefinitionId, targetStepId, e);
105                        }
106                        throw new JobStepFailedException(Msg.code(2041) + e.getMessage(), e);
107                } catch (Throwable t) {
108                        ourLog.error("Unexpected failure executing job {} step {}", jobDefinitionId, targetStepId, t);
109                        if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
110                                myJobPersistence.onWorkChunkFailed(chunkId, t.toString());
111                        }
112                        return false;
113                }
114
115                if (theStepExecutionDetails.hasAssociatedWorkChunk()) {
116                        int recordsProcessed = outcome.getRecordsProcessed();
117                        int recoveredErrorCount = theDataSink.getRecoveredErrorCount();
118                        WorkChunkCompletionEvent event = new WorkChunkCompletionEvent(
119                                        chunkId, recordsProcessed, recoveredErrorCount, theDataSink.getRecoveredWarning());
120
121                        myJobPersistence.onWorkChunkCompletion(event);
122                }
123
124                return true;
125        }
126}