001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.progress; 021 022import ca.uhn.fhir.batch2.model.JobInstance; 023import ca.uhn.fhir.batch2.model.StatusEnum; 024import ca.uhn.fhir.batch2.model.WorkChunk; 025import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 026import ca.uhn.fhir.util.Logs; 027import ca.uhn.fhir.util.StopWatch; 028import org.apache.commons.lang3.StringUtils; 029import org.apache.commons.lang3.builder.ToStringBuilder; 030import org.slf4j.Logger; 031 032import java.util.Date; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.Map; 036import java.util.Set; 037import java.util.concurrent.TimeUnit; 038 039public class InstanceProgress { 040 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 041 042 private int myRecordsProcessed = 0; 043 044 // these 4 cover all chunks 045 private int myIncompleteChunkCount = 0; 046 private int myCompleteChunkCount = 0; 047 private int myErroredChunkCount = 0; 048 private int myFailedChunkCount = 0; 049 050 private int myErrorCountForAllStatuses = 0; 051 private Date myEarliestStartTime = null; 052 private Date myLatestEndTime = null; 053 private String myErrormessage = null; 054 private StatusEnum myNewStatus = null; 055 private final Map<String, Map<WorkChunkStatusEnum, Integer>> myStepToStatusCountMap = new HashMap<>(); 056 private final Set<String> myWarningMessages = new HashSet<>(); 057 058 public void addChunk(WorkChunk theChunk) { 059 myErrorCountForAllStatuses += theChunk.getErrorCount(); 060 if (theChunk.getWarningMessage() != null) { 061 myWarningMessages.add(theChunk.getWarningMessage()); 062 } 063 updateRecordsProcessed(theChunk); 064 updateEarliestTime(theChunk); 065 updateLatestEndTime(theChunk); 066 updateCompletionStatus(theChunk); 067 } 068 069 private void updateCompletionStatus(WorkChunk theChunk) { 070 // Update the status map first. 071 Map<WorkChunkStatusEnum, Integer> statusToCountMap = 072 myStepToStatusCountMap.getOrDefault(theChunk.getTargetStepId(), new HashMap<>()); 073 statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1); 074 075 switch (theChunk.getStatus()) { 076 case GATE_WAITING: 077 case READY: 078 case QUEUED: 079 case POLL_WAITING: 080 case IN_PROGRESS: 081 myIncompleteChunkCount++; 082 break; 083 case COMPLETED: 084 myCompleteChunkCount++; 085 break; 086 case ERRORED: 087 myErroredChunkCount++; 088 if (myErrormessage == null) { 089 myErrormessage = theChunk.getErrorMessage(); 090 } 091 break; 092 case FAILED: 093 myFailedChunkCount++; 094 myErrormessage = theChunk.getErrorMessage(); 095 break; 096 } 097 ourLog.trace("Chunk has status {} with errored chunk count {}", theChunk.getStatus(), myErroredChunkCount); 098 } 099 100 private void updateLatestEndTime(WorkChunk theChunk) { 101 if (theChunk.getEndTime() != null 102 && (myLatestEndTime == null || myLatestEndTime.before(theChunk.getEndTime()))) { 103 myLatestEndTime = theChunk.getEndTime(); 104 } 105 } 106 107 private void updateEarliestTime(WorkChunk theChunk) { 108 if (theChunk.getStartTime() != null 109 && (myEarliestStartTime == null || myEarliestStartTime.after(theChunk.getStartTime()))) { 110 myEarliestStartTime = theChunk.getStartTime(); 111 } 112 } 113 114 private void updateRecordsProcessed(WorkChunk theChunk) { 115 if (theChunk.getRecordsProcessed() != null) { 116 myRecordsProcessed += theChunk.getRecordsProcessed(); 117 } 118 } 119 120 /** 121 * Signal to the progress calculator to skip the incomplete work chunk count when determining the completed percentage. 122 * <p/> 123 * This is a hack: The reason we do this is to get around a race condition in which all work chunks are complete but 124 * the last chunk is * still in QUEUED status and will only be marked COMPLETE later. 125 * 126 * @param theInstance The Batch 2 {@link JobInstance} that we're updating 127 */ 128 public void updateInstanceForReductionStep(JobInstance theInstance) { 129 updateInstance(theInstance, true); 130 } 131 132 public void updateInstance(JobInstance theInstance) { 133 updateInstance(theInstance, false); 134 135 String newWarningMessage = StringUtils.right(String.join("\n", myWarningMessages), 4000); 136 theInstance.setWarningMessages(newWarningMessage); 137 } 138 139 /** 140 * Update the job instance with status information. 141 * We shouldn't read any values from theInstance here -- just write. 142 * 143 * @param theInstance the instance to update with progress statistics 144 */ 145 public void updateInstance(JobInstance theInstance, boolean theCalledFromReducer) { 146 ourLog.debug("updateInstance {}: {}", theInstance.getInstanceId(), this); 147 if (myEarliestStartTime != null) { 148 theInstance.setStartTime(myEarliestStartTime); 149 } 150 if (myLatestEndTime != null && hasNewStatus() && myNewStatus.isEnded()) { 151 theInstance.setEndTime(myLatestEndTime); 152 } 153 theInstance.setErrorCount(myErrorCountForAllStatuses); 154 theInstance.setCombinedRecordsProcessed(myRecordsProcessed); 155 156 if (getChunkCount() > 0) { 157 final int chunkCount = getChunkCount(); 158 final int conditionalChunkCount = theCalledFromReducer ? (chunkCount - myIncompleteChunkCount) : chunkCount; 159 final double percentComplete = (double) (myCompleteChunkCount) / (double) conditionalChunkCount; 160 theInstance.setProgress(percentComplete); 161 } 162 163 if (myEarliestStartTime != null && myLatestEndTime != null) { 164 long elapsedTime = myLatestEndTime.getTime() - myEarliestStartTime.getTime(); 165 if (elapsedTime > 0) { 166 double throughput = StopWatch.getThroughput(myRecordsProcessed, elapsedTime, TimeUnit.SECONDS); 167 theInstance.setCombinedRecordsProcessedPerSecond(throughput); 168 169 String estimatedTimeRemaining = 170 StopWatch.formatEstimatedTimeRemaining(myCompleteChunkCount, getChunkCount(), elapsedTime); 171 theInstance.setEstimatedTimeRemaining(estimatedTimeRemaining); 172 } 173 } 174 175 theInstance.setErrorMessage(myErrormessage); 176 177 if (hasNewStatus()) { 178 ourLog.trace("Status will change for {}: {}", theInstance.getInstanceId(), myNewStatus); 179 } 180 181 ourLog.trace("Updating status for instance with errors: {}", myErroredChunkCount); 182 ourLog.trace( 183 "Statistics for job {}: complete/in-progress/errored/failed chunk count {}/{}/{}/{}", 184 theInstance.getInstanceId(), 185 myCompleteChunkCount, 186 myIncompleteChunkCount, 187 myErroredChunkCount, 188 myFailedChunkCount); 189 } 190 191 private int getChunkCount() { 192 return myIncompleteChunkCount + myCompleteChunkCount + myFailedChunkCount + myErroredChunkCount; 193 } 194 195 /** 196 * Transitions from IN_PROGRESS/ERRORED based on chunk statuses. 197 */ 198 public void calculateNewStatus(boolean theLastStepIsReduction) { 199 if (myFailedChunkCount > 0) { 200 myNewStatus = StatusEnum.FAILED; 201 } else if (myErroredChunkCount > 0) { 202 myNewStatus = StatusEnum.ERRORED; 203 } else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0 && !theLastStepIsReduction) { 204 myNewStatus = StatusEnum.COMPLETED; 205 } 206 } 207 208 @Override 209 public String toString() { 210 ToStringBuilder builder = new ToStringBuilder(this) 211 .append("myIncompleteChunkCount", myIncompleteChunkCount) 212 .append("myCompleteChunkCount", myCompleteChunkCount) 213 .append("myErroredChunkCount", myErroredChunkCount) 214 .append("myFailedChunkCount", myFailedChunkCount) 215 .append("myErrormessage", myErrormessage) 216 .append("myRecordsProcessed", myRecordsProcessed); 217 218 builder.append("myStepToStatusCountMap", myStepToStatusCountMap); 219 220 return builder.toString(); 221 } 222 223 public StatusEnum getNewStatus() { 224 return myNewStatus; 225 } 226 227 public boolean hasNewStatus() { 228 return myNewStatus != null; 229 } 230}