001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.coordinator; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.IJobStepWorker; 024import ca.uhn.fhir.batch2.api.JobExecutionFailedException; 025import ca.uhn.fhir.batch2.api.JobStepFailedException; 026import ca.uhn.fhir.batch2.api.RetryChunkLaterException; 027import ca.uhn.fhir.batch2.api.RunOutcome; 028import ca.uhn.fhir.batch2.api.StepExecutionDetails; 029import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 031import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 032import ca.uhn.fhir.i18n.Msg; 033import ca.uhn.fhir.model.api.IModelJson; 034import ca.uhn.fhir.util.Logs; 035import org.apache.commons.lang3.Validate; 036import org.slf4j.Logger; 037 038import java.time.Instant; 039import java.time.temporal.ChronoUnit; 040import java.util.Date; 041 042public class StepExecutor { 043 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 044 private final IJobPersistence myJobPersistence; 045 046 public StepExecutor(IJobPersistence theJobPersistence) { 047 myJobPersistence = theJobPersistence; 048 } 049 050 /** 051 * Calls the worker execution step, and performs error handling logic for jobs that failed. 052 */ 053 <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeStep( 054 StepExecutionDetails<PT, IT> theStepExecutionDetails, 055 IJobStepWorker<PT, IT, OT> theStepWorker, 056 BaseDataSink<PT, IT, OT> theDataSink) { 057 String jobDefinitionId = theDataSink.getJobDefinitionId(); 058 String targetStepId = theDataSink.getTargetStep().getStepId(); 059 String chunkId = theStepExecutionDetails.getChunkId(); 060 061 RunOutcome outcome; 062 try { 063 outcome = theStepWorker.run(theStepExecutionDetails, theDataSink); 064 Validate.notNull(outcome, "Step theWorker returned null: %s", theStepWorker.getClass()); 065 } catch (RetryChunkLaterException ex) { 066 Date nextPollTime = Date.from(Instant.now().plus(ex.getNextPollDuration())); 067 ourLog.debug( 068 "Polling job encountered; will retry chunk {} after after {}s", 069 theStepExecutionDetails.getChunkId(), 070 ex.getNextPollDuration().get(ChronoUnit.SECONDS)); 071 myJobPersistence.onWorkChunkPollDelay(theStepExecutionDetails.getChunkId(), nextPollTime); 072 return false; 073 } catch (JobExecutionFailedException e) { 074 ourLog.error( 075 "Unrecoverable failure executing job {} step {} chunk {}", 076 jobDefinitionId, 077 targetStepId, 078 chunkId, 079 e); 080 if (theStepExecutionDetails.hasAssociatedWorkChunk()) { 081 myJobPersistence.onWorkChunkFailed(chunkId, e.toString()); 082 } 083 return false; 084 } catch (Exception e) { 085 if (theStepExecutionDetails.hasAssociatedWorkChunk()) { 086 ourLog.info( 087 "Temporary problem executing job {} step {}, marking chunk {} as retriable ERRORED", 088 jobDefinitionId, 089 targetStepId, 090 chunkId); 091 WorkChunkErrorEvent parameters = new WorkChunkErrorEvent(chunkId, e.getMessage()); 092 WorkChunkStatusEnum newStatus = myJobPersistence.onWorkChunkError(parameters); 093 if (newStatus == WorkChunkStatusEnum.FAILED) { 094 ourLog.error( 095 "Exhausted retries: Failure executing job {} step {}, marking chunk {} as ERRORED", 096 jobDefinitionId, 097 targetStepId, 098 chunkId, 099 e); 100 return false; 101 } 102 } else { 103 ourLog.error( 104 "Failure executing job {} step {}, no associated work chunk", jobDefinitionId, targetStepId, e); 105 } 106 throw new JobStepFailedException(Msg.code(2041) + e.getMessage(), e); 107 } catch (Throwable t) { 108 ourLog.error("Unexpected failure executing job {} step {}", jobDefinitionId, targetStepId, t); 109 if (theStepExecutionDetails.hasAssociatedWorkChunk()) { 110 myJobPersistence.onWorkChunkFailed(chunkId, t.toString()); 111 } 112 return false; 113 } 114 115 if (theStepExecutionDetails.hasAssociatedWorkChunk()) { 116 int recordsProcessed = outcome.getRecordsProcessed(); 117 int recoveredErrorCount = theDataSink.getRecoveredErrorCount(); 118 WorkChunkCompletionEvent event = new WorkChunkCompletionEvent( 119 chunkId, recordsProcessed, recoveredErrorCount, theDataSink.getRecoveredWarning()); 120 121 myJobPersistence.onWorkChunkCompletion(event); 122 } 123 124 return true; 125 } 126}