001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.coordinator;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.IJobStepWorker;
024import ca.uhn.fhir.batch2.api.StepExecutionDetails;
025import ca.uhn.fhir.batch2.api.VoidModel;
026import ca.uhn.fhir.batch2.channel.BatchJobSender;
027import ca.uhn.fhir.batch2.model.JobDefinition;
028import ca.uhn.fhir.batch2.model.JobDefinitionStep;
029import ca.uhn.fhir.batch2.model.JobInstance;
030import ca.uhn.fhir.batch2.model.JobWorkCursor;
031import ca.uhn.fhir.batch2.model.WorkChunk;
032import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
033import ca.uhn.fhir.model.api.IModelJson;
034import ca.uhn.fhir.util.Logs;
035import jakarta.annotation.Nullable;
036import org.apache.commons.lang3.Validate;
037import org.slf4j.Logger;
038
039import java.util.Optional;
040
041import static org.apache.commons.lang3.StringUtils.isBlank;
042
043public class WorkChunkProcessor {
044        /**
045         * This retry only works if your channel producer supports
046         * retries on message processing exceptions.
047         * <p>
048         * What's more, we may one day want to have this configurable
049         * by the caller.
050         * But since this is not a feature of HAPI,
051         * this has not been done yet.
052         */
053        public static final int MAX_CHUNK_ERROR_COUNT = 3;
054
055        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
056        private final IJobPersistence myJobPersistence;
057        private final BatchJobSender myBatchJobSender;
058        private final StepExecutor myStepExecutor;
059        private final IHapiTransactionService myHapiTransactionService;
060
061        public WorkChunkProcessor(
062                        IJobPersistence theJobPersistence,
063                        BatchJobSender theSender,
064                        IHapiTransactionService theHapiTransactionService) {
065                myJobPersistence = theJobPersistence;
066                myBatchJobSender = theSender;
067                myStepExecutor = new StepExecutor(theJobPersistence);
068                myHapiTransactionService = theHapiTransactionService;
069        }
070
071        /**
072         * Execute the work chunk.
073         *
074         * @param theCursor    - work cursor
075         * @param theInstance  - the job instance
076         * @param theWorkChunk - the work chunk (if available); can be null (for reduction step only!)
077         * @param <PT>         - Job parameters Type
078         * @param <IT>         - Step input parameters Type
079         * @param <OT>         - Step output parameters Type
080         * @return - JobStepExecution output. Contains the datasink and whether or not the execution had succeeded.
081         */
082        public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
083                        JobStepExecutorOutput<PT, IT, OT> doExecution(
084                                        JobWorkCursor<PT, IT, OT> theCursor, JobInstance theInstance, @Nullable WorkChunk theWorkChunk) {
085                JobDefinitionStep<PT, IT, OT> step = theCursor.getCurrentStep();
086                JobDefinition<PT> jobDefinition = theCursor.getJobDefinition();
087                String instanceId = theInstance.getInstanceId();
088                Class<IT> inputType = step.getInputType();
089                PT parameters = theInstance.getParameters(jobDefinition.getParametersType());
090
091                IJobStepWorker<PT, IT, OT> worker = step.getJobStepWorker();
092                BaseDataSink<PT, IT, OT> dataSink = getDataSink(theCursor, jobDefinition, instanceId);
093
094                assert !step.isReductionStep();
095
096                // all other kinds of steps
097                Validate.notNull(theWorkChunk);
098                Optional<StepExecutionDetails<PT, IT>> stepExecutionDetailsOpt =
099                                getExecutionDetailsForNonReductionStep(theWorkChunk, theInstance, inputType, parameters);
100                if (!stepExecutionDetailsOpt.isPresent()) {
101                        return new JobStepExecutorOutput<>(false, dataSink);
102                }
103
104                StepExecutionDetails<PT, IT> stepExecutionDetails = stepExecutionDetailsOpt.get();
105
106                // execute the step
107                boolean success = myStepExecutor.executeStep(stepExecutionDetails, worker, dataSink);
108
109                // return results with data sink
110                return new JobStepExecutorOutput<>(success, dataSink);
111        }
112
113        /**
114         * Get the correct datasink for the cursor/job provided.
115         */
116        @SuppressWarnings("unchecked")
117        protected <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
118                        BaseDataSink<PT, IT, OT> getDataSink(
119                                        JobWorkCursor<PT, IT, OT> theCursor, JobDefinition<PT> theJobDefinition, String theInstanceId) {
120                BaseDataSink<PT, IT, OT> dataSink;
121
122                assert !theCursor.isReductionStep();
123                if (theCursor.isFinalStep()) {
124                        dataSink = (BaseDataSink<PT, IT, OT>) new FinalStepDataSink<>(
125                                        theJobDefinition.getJobDefinitionId(), theInstanceId, theCursor.asFinalCursor());
126                } else {
127                        dataSink = new JobDataSink<>(
128                                        myBatchJobSender,
129                                        myJobPersistence,
130                                        theJobDefinition,
131                                        theInstanceId,
132                                        theCursor,
133                                        myHapiTransactionService);
134                }
135                return dataSink;
136        }
137
138        /**
139         * Construct execution details for non-reduction step
140         */
141        private <PT extends IModelJson, IT extends IModelJson>
142                        Optional<StepExecutionDetails<PT, IT>> getExecutionDetailsForNonReductionStep(
143                                        WorkChunk theWorkChunk, JobInstance theInstance, Class<IT> theInputType, PT theParameters) {
144                IT inputData = null;
145
146                if (!theInputType.equals(VoidModel.class)) {
147                        if (isBlank(theWorkChunk.getData())) {
148                                ourLog.info(
149                                                "Ignoring chunk[{}] for step[{}] in status[{}] because it has no data",
150                                                theWorkChunk.getId(),
151                                                theWorkChunk.getTargetStepId(),
152                                                theWorkChunk.getStatus());
153                                return Optional.empty();
154                        }
155                        inputData = theWorkChunk.getData(theInputType);
156                }
157
158                return Optional.of(new StepExecutionDetails<>(theParameters, inputData, theInstance, theWorkChunk));
159        }
160}