001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.coordinator; 021 022import ca.uhn.fhir.batch2.api.IJobMaintenanceService; 023import ca.uhn.fhir.batch2.api.IJobPersistence; 024import ca.uhn.fhir.batch2.model.JobDefinition; 025import ca.uhn.fhir.batch2.model.JobInstance; 026import ca.uhn.fhir.batch2.model.JobWorkCursor; 027import ca.uhn.fhir.batch2.model.StatusEnum; 028import ca.uhn.fhir.batch2.model.WorkChunk; 029import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; 030import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils; 031import ca.uhn.fhir.model.api.IModelJson; 032import ca.uhn.fhir.util.Logs; 033import io.opentelemetry.instrumentation.annotations.WithSpan; 034import jakarta.annotation.Nonnull; 035import org.slf4j.Logger; 036 037import java.util.Date; 038 039import static ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils.JOB_STEP_EXECUTION_SPAN_NAME; 040 041public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> { 042 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 043 044 private final IJobPersistence myJobPersistence; 045 private final WorkChunkProcessor myJobExecutorSvc; 046 private final IJobMaintenanceService myJobMaintenanceService; 047 private final JobInstanceStatusUpdater myJobInstanceStatusUpdater; 048 049 private final JobDefinition<PT> myDefinition; 050 private final JobInstance myInstance; 051 private final String myInstanceId; 052 private final WorkChunk myWorkChunk; 053 private final JobWorkCursor<PT, IT, OT> myCursor; 054 055 JobStepExecutor( 056 @Nonnull IJobPersistence theJobPersistence, 057 @Nonnull JobInstance theInstance, 058 WorkChunk theWorkChunk, 059 @Nonnull JobWorkCursor<PT, IT, OT> theCursor, 060 @Nonnull WorkChunkProcessor theExecutor, 061 @Nonnull IJobMaintenanceService theJobMaintenanceService, 062 @Nonnull JobDefinitionRegistry theJobDefinitionRegistry) { 063 myJobPersistence = theJobPersistence; 064 myDefinition = theCursor.jobDefinition; 065 myInstance = theInstance; 066 myInstanceId = theInstance.getInstanceId(); 067 myWorkChunk = theWorkChunk; 068 myCursor = theCursor; 069 myJobExecutorSvc = theExecutor; 070 myJobMaintenanceService = theJobMaintenanceService; 071 myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); 072 } 073 074 @WithSpan(JOB_STEP_EXECUTION_SPAN_NAME) 075 public void executeStep() { 076 077 BatchJobOpenTelemetryUtils.addAttributesToCurrentSpan( 078 myInstance.getJobDefinitionId(), 079 myInstance.getJobDefinitionVersion(), 080 myInstance.getInstanceId(), 081 myCursor.getCurrentStepId(), 082 myWorkChunk == null ? null : myWorkChunk.getId()); 083 084 JobStepExecutorOutput<PT, IT, OT> stepExecutorOutput = 085 myJobExecutorSvc.doExecution(myCursor, myInstance, myWorkChunk); 086 087 if (!stepExecutorOutput.isSuccessful()) { 088 return; 089 } 090 091 /** 092 * Jobs are completed in {@link ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator#calculateInstanceProgress} 093 * We determine if the job is complete based on if there are *any* completed work chunks. 094 * So if there are no COMPLETED work chunks (ie, first step produces no work chunks) 095 * we must complete it here. 096 */ 097 if (stepExecutorOutput.getDataSink().firstStepProducedNothing() && !myDefinition.isLastStepReduction()) { 098 ourLog.info( 099 "First step of job myInstance {} produced no work chunks and last step is not a reduction, " 100 + "marking as completed and setting end date", 101 myInstanceId); 102 myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> { 103 instance.setEndTime(new Date()); 104 myJobInstanceStatusUpdater.updateInstanceStatus(instance, StatusEnum.COMPLETED); 105 return true; 106 }); 107 } 108 109 // This flag could be stale, but checking for fast-track is a safe operation. 110 if (myInstance.isFastTracking()) { 111 handleFastTracking(stepExecutorOutput.getDataSink()); 112 } 113 } 114 115 private void handleFastTracking(BaseDataSink<PT, IT, OT> theDataSink) { 116 if (theDataSink.getWorkChunkCount() <= 1) { 117 ourLog.debug( 118 "Gated job {} step {} produced exactly one chunk: Triggering a maintenance pass.", 119 myDefinition.getJobDefinitionId(), 120 myCursor.currentStep.getStepId()); 121 // wipmb 6.8 either delete fast-tracking, or narrow this call to just this instance and step 122 // This runs full maintenance for EVERY job as each chunk completes in a fast tracked job. That's a LOT of 123 // work. 124 boolean success = myJobMaintenanceService.triggerMaintenancePass(); 125 if (!success) { 126 myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> { 127 instance.setFastTracking(false); 128 return true; 129 }); 130 } 131 } else { 132 ourLog.debug( 133 "Gated job {} step {} produced {} chunks: Disabling fast tracking.", 134 myDefinition.getJobDefinitionId(), 135 myCursor.currentStep.getStepId(), 136 theDataSink.getWorkChunkCount()); 137 myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> { 138 instance.setFastTracking(false); 139 return true; 140 }); 141 } 142 } 143}