001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.progress;
021
022import ca.uhn.fhir.batch2.model.JobInstance;
023import ca.uhn.fhir.batch2.model.StatusEnum;
024import ca.uhn.fhir.batch2.model.WorkChunk;
025import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
026import ca.uhn.fhir.util.Logs;
027import ca.uhn.fhir.util.StopWatch;
028import org.apache.commons.lang3.StringUtils;
029import org.apache.commons.lang3.builder.ToStringBuilder;
030import org.slf4j.Logger;
031
032import java.util.Date;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.Map;
036import java.util.Set;
037import java.util.concurrent.TimeUnit;
038
039public class InstanceProgress {
040        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
041
042        private int myRecordsProcessed = 0;
043
044        // these 4 cover all chunks
045        private int myIncompleteChunkCount = 0;
046        private int myCompleteChunkCount = 0;
047        private int myErroredChunkCount = 0;
048        private int myFailedChunkCount = 0;
049
050        private int myErrorCountForAllStatuses = 0;
051        private Date myEarliestStartTime = null;
052        private Date myLatestEndTime = null;
053        private String myErrormessage = null;
054        private StatusEnum myNewStatus = null;
055        private final Map<String, Map<WorkChunkStatusEnum, Integer>> myStepToStatusCountMap = new HashMap<>();
056        private final Set<String> myWarningMessages = new HashSet<>();
057
058        public void addChunk(WorkChunk theChunk) {
059                myErrorCountForAllStatuses += theChunk.getErrorCount();
060                if (theChunk.getWarningMessage() != null) {
061                        myWarningMessages.add(theChunk.getWarningMessage());
062                }
063                updateRecordsProcessed(theChunk);
064                updateEarliestTime(theChunk);
065                updateLatestEndTime(theChunk);
066                updateCompletionStatus(theChunk);
067        }
068
069        private void updateCompletionStatus(WorkChunk theChunk) {
070                // Update the status map first.
071                Map<WorkChunkStatusEnum, Integer> statusToCountMap =
072                                myStepToStatusCountMap.getOrDefault(theChunk.getTargetStepId(), new HashMap<>());
073                statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1);
074
075                switch (theChunk.getStatus()) {
076                        case GATE_WAITING:
077                        case READY:
078                        case QUEUED:
079                        case POLL_WAITING:
080                        case IN_PROGRESS:
081                                myIncompleteChunkCount++;
082                                break;
083                        case COMPLETED:
084                                myCompleteChunkCount++;
085                                break;
086                        case ERRORED:
087                                myErroredChunkCount++;
088                                if (myErrormessage == null) {
089                                        myErrormessage = theChunk.getErrorMessage();
090                                }
091                                break;
092                        case FAILED:
093                                myFailedChunkCount++;
094                                myErrormessage = theChunk.getErrorMessage();
095                                break;
096                }
097                ourLog.trace("Chunk has status {} with errored chunk count {}", theChunk.getStatus(), myErroredChunkCount);
098        }
099
100        private void updateLatestEndTime(WorkChunk theChunk) {
101                if (theChunk.getEndTime() != null
102                                && (myLatestEndTime == null || myLatestEndTime.before(theChunk.getEndTime()))) {
103                        myLatestEndTime = theChunk.getEndTime();
104                }
105        }
106
107        private void updateEarliestTime(WorkChunk theChunk) {
108                if (theChunk.getStartTime() != null
109                                && (myEarliestStartTime == null || myEarliestStartTime.after(theChunk.getStartTime()))) {
110                        myEarliestStartTime = theChunk.getStartTime();
111                }
112        }
113
114        private void updateRecordsProcessed(WorkChunk theChunk) {
115                if (theChunk.getRecordsProcessed() != null) {
116                        myRecordsProcessed += theChunk.getRecordsProcessed();
117                }
118        }
119
120        /**
121         * Signal to the progress calculator to skip the incomplete work chunk count when determining the completed percentage.
122         * <p/>
123         * This is a hack:  The reason we do this is to get around a race condition in which all work chunks are complete but
124         * the last chunk is * still in QUEUED status and will only be marked COMPLETE later.
125         *
126         * @param theInstance The Batch 2 {@link JobInstance} that we're updating
127         */
128        public void updateInstanceForReductionStep(JobInstance theInstance) {
129                updateInstance(theInstance, true);
130        }
131
132        public void updateInstance(JobInstance theInstance) {
133                updateInstance(theInstance, false);
134
135                String newWarningMessage = StringUtils.right(String.join("\n", myWarningMessages), 4000);
136                theInstance.setWarningMessages(newWarningMessage);
137        }
138
139        /**
140         * Update the job instance with status information.
141         * We shouldn't read any values from theInstance here -- just write.
142         *
143         * @param theInstance the instance to update with progress statistics
144         */
145        public void updateInstance(JobInstance theInstance, boolean theCalledFromReducer) {
146                ourLog.debug("updateInstance {}: {}", theInstance.getInstanceId(), this);
147                if (myEarliestStartTime != null) {
148                        theInstance.setStartTime(myEarliestStartTime);
149                }
150                if (myLatestEndTime != null && hasNewStatus() && myNewStatus.isEnded()) {
151                        theInstance.setEndTime(myLatestEndTime);
152                }
153                theInstance.setErrorCount(myErrorCountForAllStatuses);
154                theInstance.setCombinedRecordsProcessed(myRecordsProcessed);
155
156                if (getChunkCount() > 0) {
157                        final int chunkCount = getChunkCount();
158                        final int conditionalChunkCount = theCalledFromReducer ? (chunkCount - myIncompleteChunkCount) : chunkCount;
159                        final double percentComplete = (double) (myCompleteChunkCount) / (double) conditionalChunkCount;
160                        theInstance.setProgress(percentComplete);
161                }
162
163                if (myEarliestStartTime != null && myLatestEndTime != null) {
164                        long elapsedTime = myLatestEndTime.getTime() - myEarliestStartTime.getTime();
165                        if (elapsedTime > 0) {
166                                double throughput = StopWatch.getThroughput(myRecordsProcessed, elapsedTime, TimeUnit.SECONDS);
167                                theInstance.setCombinedRecordsProcessedPerSecond(throughput);
168
169                                String estimatedTimeRemaining =
170                                                StopWatch.formatEstimatedTimeRemaining(myCompleteChunkCount, getChunkCount(), elapsedTime);
171                                theInstance.setEstimatedTimeRemaining(estimatedTimeRemaining);
172                        }
173                }
174
175                theInstance.setErrorMessage(myErrormessage);
176
177                if (hasNewStatus()) {
178                        ourLog.trace("Status will change for {}: {}", theInstance.getInstanceId(), myNewStatus);
179                }
180
181                ourLog.trace("Updating status for instance with errors: {}", myErroredChunkCount);
182                ourLog.trace(
183                                "Statistics for job {}: complete/in-progress/errored/failed chunk count {}/{}/{}/{}",
184                                theInstance.getInstanceId(),
185                                myCompleteChunkCount,
186                                myIncompleteChunkCount,
187                                myErroredChunkCount,
188                                myFailedChunkCount);
189        }
190
191        private int getChunkCount() {
192                return myIncompleteChunkCount + myCompleteChunkCount + myFailedChunkCount + myErroredChunkCount;
193        }
194
195        /**
196         * Transitions from IN_PROGRESS/ERRORED based on chunk statuses.
197         */
198        public void calculateNewStatus(boolean theLastStepIsReduction) {
199                if (myFailedChunkCount > 0) {
200                        myNewStatus = StatusEnum.FAILED;
201                } else if (myErroredChunkCount > 0) {
202                        myNewStatus = StatusEnum.ERRORED;
203                } else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0 && !theLastStepIsReduction) {
204                        myNewStatus = StatusEnum.COMPLETED;
205                }
206        }
207
208        @Override
209        public String toString() {
210                ToStringBuilder builder = new ToStringBuilder(this)
211                                .append("myIncompleteChunkCount", myIncompleteChunkCount)
212                                .append("myCompleteChunkCount", myCompleteChunkCount)
213                                .append("myErroredChunkCount", myErroredChunkCount)
214                                .append("myFailedChunkCount", myFailedChunkCount)
215                                .append("myErrormessage", myErrormessage)
216                                .append("myRecordsProcessed", myRecordsProcessed);
217
218                builder.append("myStepToStatusCountMap", myStepToStatusCountMap);
219
220                return builder.toString();
221        }
222
223        public StatusEnum getNewStatus() {
224                return myNewStatus;
225        }
226
227        public boolean hasNewStatus() {
228                return myNewStatus != null;
229        }
230}