001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.progress; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; 024import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator; 025import ca.uhn.fhir.batch2.model.JobDefinition; 026import ca.uhn.fhir.batch2.model.JobInstance; 027import ca.uhn.fhir.batch2.model.WorkChunk; 028import ca.uhn.fhir.i18n.Msg; 029import ca.uhn.fhir.model.api.IModelJson; 030import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 031import ca.uhn.fhir.util.Logs; 032import ca.uhn.fhir.util.StopWatch; 033import jakarta.annotation.Nonnull; 034import org.slf4j.Logger; 035 036import java.util.Iterator; 037import java.util.Optional; 038 039public class JobInstanceProgressCalculator { 040 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 041 private final IJobPersistence myJobPersistence; 042 private final JobChunkProgressAccumulator myProgressAccumulator; 043 private final JobInstanceStatusUpdater myJobInstanceStatusUpdater; 044 private final JobDefinitionRegistry myJobDefinitionRegistry; 045 046 public JobInstanceProgressCalculator( 047 IJobPersistence theJobPersistence, 048 JobChunkProgressAccumulator theProgressAccumulator, 049 JobDefinitionRegistry theJobDefinitionRegistry) { 050 myJobPersistence = theJobPersistence; 051 myProgressAccumulator = theProgressAccumulator; 052 myJobDefinitionRegistry = theJobDefinitionRegistry; 053 myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); 054 } 055 056 public void calculateAndStoreInstanceProgress(String theInstanceId) { 057 StopWatch stopWatch = new StopWatch(); 058 ourLog.trace("calculating progress: {}", theInstanceId); 059 060 // calculate progress based on number of work chunks in COMPLETE state 061 InstanceProgress instanceProgress = calculateInstanceProgress(theInstanceId); 062 063 myJobPersistence.updateInstance(theInstanceId, currentInstance -> { 064 instanceProgress.updateInstance(currentInstance); 065 066 if (currentInstance.getCombinedRecordsProcessed() > 0) { 067 ourLog.info( 068 "Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}", 069 currentInstance.getInstanceId(), 070 currentInstance.getJobDefinitionId(), 071 currentInstance.getStatus(), 072 currentInstance.getCombinedRecordsProcessed(), 073 currentInstance.getCombinedRecordsProcessedPerSecond(), 074 currentInstance.getEstimatedTimeRemaining()); 075 } else { 076 ourLog.info( 077 "Job {} of type {} has status {} - {} records processed", 078 currentInstance.getInstanceId(), 079 currentInstance.getJobDefinitionId(), 080 currentInstance.getStatus(), 081 currentInstance.getCombinedRecordsProcessed()); 082 } 083 ourLog.debug(instanceProgress.toString()); 084 085 if (instanceProgress.hasNewStatus()) { 086 myJobInstanceStatusUpdater.updateInstanceStatus(currentInstance, instanceProgress.getNewStatus()); 087 } 088 089 return true; 090 }); 091 ourLog.trace("calculating progress: {} - complete in {}", theInstanceId, stopWatch); 092 } 093 094 @Nonnull 095 public InstanceProgress calculateInstanceProgress(String instanceId) { 096 InstanceProgress instanceProgress = new InstanceProgress(); 097 Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false); 098 099 while (workChunkIterator.hasNext()) { 100 WorkChunk next = workChunkIterator.next(); 101 102 // global stats 103 myProgressAccumulator.addChunk(next); 104 // instance stats 105 instanceProgress.addChunk(next); 106 } 107 108 // wipmb separate status update from stats collection in 6.8 109 instanceProgress.calculateNewStatus(lastStepIsReduction(instanceId)); 110 111 return instanceProgress; 112 } 113 114 private boolean lastStepIsReduction(String theInstanceId) { 115 JobInstance jobInstance = getJobInstance(theInstanceId); 116 JobDefinition<IModelJson> jobDefinition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance); 117 return jobDefinition.isLastStepReduction(); 118 } 119 120 private JobInstance getJobInstance(String theInstanceId) { 121 Optional<JobInstance> oInstance = myJobPersistence.fetchInstance(theInstanceId); 122 return oInstance.orElseThrow(() -> 123 new InternalErrorException(Msg.code(2486) + "Failed to fetch JobInstance with id: " + theInstanceId)); 124 } 125}