001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.coordinator; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.IJobStepWorker; 024import ca.uhn.fhir.batch2.api.StepExecutionDetails; 025import ca.uhn.fhir.batch2.api.VoidModel; 026import ca.uhn.fhir.batch2.channel.BatchJobSender; 027import ca.uhn.fhir.batch2.model.JobDefinition; 028import ca.uhn.fhir.batch2.model.JobDefinitionStep; 029import ca.uhn.fhir.batch2.model.JobInstance; 030import ca.uhn.fhir.batch2.model.JobWorkCursor; 031import ca.uhn.fhir.batch2.model.WorkChunk; 032import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 033import ca.uhn.fhir.model.api.IModelJson; 034import ca.uhn.fhir.util.Logs; 035import jakarta.annotation.Nullable; 036import org.apache.commons.lang3.Validate; 037import org.slf4j.Logger; 038 039import java.util.Optional; 040 041import static org.apache.commons.lang3.StringUtils.isBlank; 042 043public class WorkChunkProcessor { 044 /** 045 * This retry only works if your channel producer supports 046 * retries on message processing exceptions. 047 * <p> 048 * What's more, we may one day want to have this configurable 049 * by the caller. 050 * But since this is not a feature of HAPI, 051 * this has not been done yet. 052 */ 053 public static final int MAX_CHUNK_ERROR_COUNT = 3; 054 055 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 056 private final IJobPersistence myJobPersistence; 057 private final BatchJobSender myBatchJobSender; 058 private final StepExecutor myStepExecutor; 059 private final IHapiTransactionService myHapiTransactionService; 060 061 public WorkChunkProcessor( 062 IJobPersistence theJobPersistence, 063 BatchJobSender theSender, 064 IHapiTransactionService theHapiTransactionService) { 065 myJobPersistence = theJobPersistence; 066 myBatchJobSender = theSender; 067 myStepExecutor = new StepExecutor(theJobPersistence); 068 myHapiTransactionService = theHapiTransactionService; 069 } 070 071 /** 072 * Execute the work chunk. 073 * 074 * @param theCursor - work cursor 075 * @param theInstance - the job instance 076 * @param theWorkChunk - the work chunk (if available); can be null (for reduction step only!) 077 * @param <PT> - Job parameters Type 078 * @param <IT> - Step input parameters Type 079 * @param <OT> - Step output parameters Type 080 * @return - JobStepExecution output. Contains the datasink and whether or not the execution had succeeded. 081 */ 082 public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> 083 JobStepExecutorOutput<PT, IT, OT> doExecution( 084 JobWorkCursor<PT, IT, OT> theCursor, JobInstance theInstance, @Nullable WorkChunk theWorkChunk) { 085 JobDefinitionStep<PT, IT, OT> step = theCursor.getCurrentStep(); 086 JobDefinition<PT> jobDefinition = theCursor.getJobDefinition(); 087 String instanceId = theInstance.getInstanceId(); 088 Class<IT> inputType = step.getInputType(); 089 PT parameters = theInstance.getParameters(jobDefinition.getParametersType()); 090 091 IJobStepWorker<PT, IT, OT> worker = step.getJobStepWorker(); 092 BaseDataSink<PT, IT, OT> dataSink = getDataSink(theCursor, jobDefinition, instanceId); 093 094 assert !step.isReductionStep(); 095 096 // all other kinds of steps 097 Validate.notNull(theWorkChunk); 098 Optional<StepExecutionDetails<PT, IT>> stepExecutionDetailsOpt = 099 getExecutionDetailsForNonReductionStep(theWorkChunk, theInstance, inputType, parameters); 100 if (!stepExecutionDetailsOpt.isPresent()) { 101 return new JobStepExecutorOutput<>(false, dataSink); 102 } 103 104 StepExecutionDetails<PT, IT> stepExecutionDetails = stepExecutionDetailsOpt.get(); 105 106 // execute the step 107 boolean success = myStepExecutor.executeStep(stepExecutionDetails, worker, dataSink); 108 109 // return results with data sink 110 return new JobStepExecutorOutput<>(success, dataSink); 111 } 112 113 /** 114 * Get the correct datasink for the cursor/job provided. 115 */ 116 @SuppressWarnings("unchecked") 117 protected <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> 118 BaseDataSink<PT, IT, OT> getDataSink( 119 JobWorkCursor<PT, IT, OT> theCursor, JobDefinition<PT> theJobDefinition, String theInstanceId) { 120 BaseDataSink<PT, IT, OT> dataSink; 121 122 assert !theCursor.isReductionStep(); 123 if (theCursor.isFinalStep()) { 124 dataSink = (BaseDataSink<PT, IT, OT>) new FinalStepDataSink<>( 125 theJobDefinition.getJobDefinitionId(), theInstanceId, theCursor.asFinalCursor()); 126 } else { 127 dataSink = new JobDataSink<>( 128 myBatchJobSender, 129 myJobPersistence, 130 theJobDefinition, 131 theInstanceId, 132 theCursor, 133 myHapiTransactionService); 134 } 135 return dataSink; 136 } 137 138 /** 139 * Construct execution details for non-reduction step 140 */ 141 private <PT extends IModelJson, IT extends IModelJson> 142 Optional<StepExecutionDetails<PT, IT>> getExecutionDetailsForNonReductionStep( 143 WorkChunk theWorkChunk, JobInstance theInstance, Class<IT> theInputType, PT theParameters) { 144 IT inputData = null; 145 146 if (!theInputType.equals(VoidModel.class)) { 147 if (isBlank(theWorkChunk.getData())) { 148 ourLog.info( 149 "Ignoring chunk[{}] for step[{}] in status[{}] because it has no data", 150 theWorkChunk.getId(), 151 theWorkChunk.getTargetStepId(), 152 theWorkChunk.getStatus()); 153 return Optional.empty(); 154 } 155 inputData = theWorkChunk.getData(theInputType); 156 } 157 158 return Optional.of(new StepExecutionDetails<>(theParameters, inputData, theInstance, theWorkChunk)); 159 } 160}