001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.progress;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
024import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
025import ca.uhn.fhir.batch2.model.JobDefinition;
026import ca.uhn.fhir.batch2.model.JobInstance;
027import ca.uhn.fhir.batch2.model.WorkChunk;
028import ca.uhn.fhir.i18n.Msg;
029import ca.uhn.fhir.model.api.IModelJson;
030import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
031import ca.uhn.fhir.util.Logs;
032import ca.uhn.fhir.util.StopWatch;
033import jakarta.annotation.Nonnull;
034import org.slf4j.Logger;
035
036import java.util.Iterator;
037import java.util.Optional;
038
039public class JobInstanceProgressCalculator {
040        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
041        private final IJobPersistence myJobPersistence;
042        private final JobChunkProgressAccumulator myProgressAccumulator;
043        private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
044        private final JobDefinitionRegistry myJobDefinitionRegistry;
045
046        public JobInstanceProgressCalculator(
047                        IJobPersistence theJobPersistence,
048                        JobChunkProgressAccumulator theProgressAccumulator,
049                        JobDefinitionRegistry theJobDefinitionRegistry) {
050                myJobPersistence = theJobPersistence;
051                myProgressAccumulator = theProgressAccumulator;
052                myJobDefinitionRegistry = theJobDefinitionRegistry;
053                myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
054        }
055
056        public void calculateAndStoreInstanceProgress(String theInstanceId) {
057                StopWatch stopWatch = new StopWatch();
058                ourLog.trace("calculating progress: {}", theInstanceId);
059
060                // calculate progress based on number of work chunks in COMPLETE state
061                InstanceProgress instanceProgress = calculateInstanceProgress(theInstanceId);
062
063                myJobPersistence.updateInstance(theInstanceId, currentInstance -> {
064                        instanceProgress.updateInstance(currentInstance);
065
066                        if (currentInstance.getCombinedRecordsProcessed() > 0) {
067                                ourLog.info(
068                                                "Job {} of type {} has status {} - {} records processed ({}/sec) - ETA: {}",
069                                                currentInstance.getInstanceId(),
070                                                currentInstance.getJobDefinitionId(),
071                                                currentInstance.getStatus(),
072                                                currentInstance.getCombinedRecordsProcessed(),
073                                                currentInstance.getCombinedRecordsProcessedPerSecond(),
074                                                currentInstance.getEstimatedTimeRemaining());
075                        } else {
076                                ourLog.info(
077                                                "Job {} of type {} has status {} - {} records processed",
078                                                currentInstance.getInstanceId(),
079                                                currentInstance.getJobDefinitionId(),
080                                                currentInstance.getStatus(),
081                                                currentInstance.getCombinedRecordsProcessed());
082                        }
083                        ourLog.debug(instanceProgress.toString());
084
085                        if (instanceProgress.hasNewStatus()) {
086                                myJobInstanceStatusUpdater.updateInstanceStatus(currentInstance, instanceProgress.getNewStatus());
087                        }
088
089                        return true;
090                });
091                ourLog.trace("calculating progress: {} - complete in {}", theInstanceId, stopWatch);
092        }
093
094        @Nonnull
095        public InstanceProgress calculateInstanceProgress(String instanceId) {
096                InstanceProgress instanceProgress = new InstanceProgress();
097                Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false);
098
099                while (workChunkIterator.hasNext()) {
100                        WorkChunk next = workChunkIterator.next();
101
102                        // global stats
103                        myProgressAccumulator.addChunk(next);
104                        // instance stats
105                        instanceProgress.addChunk(next);
106                }
107
108                // wipmb separate status update from stats collection in 6.8
109                instanceProgress.calculateNewStatus(lastStepIsReduction(instanceId));
110
111                return instanceProgress;
112        }
113
114        private boolean lastStepIsReduction(String theInstanceId) {
115                JobInstance jobInstance = getJobInstance(theInstanceId);
116                JobDefinition<IModelJson> jobDefinition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance);
117                return jobDefinition.isLastStepReduction();
118        }
119
120        private JobInstance getJobInstance(String theInstanceId) {
121                Optional<JobInstance> oInstance = myJobPersistence.fetchInstance(theInstanceId);
122                return oInstance.orElseThrow(() ->
123                                new InternalErrorException(Msg.code(2486) + "Failed to fetch JobInstance with id: " + theInstanceId));
124        }
125}