001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.maintenance; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; 024import ca.uhn.fhir.batch2.channel.BatchJobSender; 025import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; 026import ca.uhn.fhir.batch2.model.JobDefinition; 027import ca.uhn.fhir.batch2.model.JobInstance; 028import ca.uhn.fhir.batch2.model.JobWorkCursor; 029import ca.uhn.fhir.batch2.model.JobWorkNotification; 030import ca.uhn.fhir.batch2.model.StatusEnum; 031import ca.uhn.fhir.batch2.model.WorkChunkMetadata; 032import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 033import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; 034import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; 035import ca.uhn.fhir.model.api.IModelJson; 036import ca.uhn.fhir.model.api.PagingIterator; 037import ca.uhn.fhir.util.Logs; 038import ca.uhn.fhir.util.StopWatch; 039import org.apache.commons.lang3.time.DateUtils; 040import org.slf4j.Logger; 041import org.springframework.data.domain.Page; 042import org.springframework.data.domain.Pageable; 043 044import java.util.Iterator; 045import java.util.List; 046import java.util.Optional; 047import java.util.Set; 048 049public class JobInstanceProcessor { 050 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 051 public static final long PURGE_THRESHOLD = 7L * DateUtils.MILLIS_PER_DAY; 052 053 // 10k; we want to get as many as we can 054 private static final int WORK_CHUNK_METADATA_BATCH_SIZE = 10000; 055 private final IJobPersistence myJobPersistence; 056 private final BatchJobSender myBatchJobSender; 057 private final JobChunkProgressAccumulator myProgressAccumulator; 058 private final JobInstanceProgressCalculator myJobInstanceProgressCalculator; 059 private final JobInstanceStatusUpdater myJobInstanceStatusUpdater; 060 private final IReductionStepExecutorService myReductionStepExecutorService; 061 private final String myInstanceId; 062 private final JobDefinitionRegistry myJobDefinitionegistry; 063 064 public JobInstanceProcessor( 065 IJobPersistence theJobPersistence, 066 BatchJobSender theBatchJobSender, 067 String theInstanceId, 068 JobChunkProgressAccumulator theProgressAccumulator, 069 IReductionStepExecutorService theReductionStepExecutorService, 070 JobDefinitionRegistry theJobDefinitionRegistry) { 071 myJobPersistence = theJobPersistence; 072 myBatchJobSender = theBatchJobSender; 073 myInstanceId = theInstanceId; 074 myProgressAccumulator = theProgressAccumulator; 075 myReductionStepExecutorService = theReductionStepExecutorService; 076 myJobDefinitionegistry = theJobDefinitionRegistry; 077 myJobInstanceProgressCalculator = 078 new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry); 079 myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); 080 } 081 082 public void process() { 083 ourLog.debug("Starting job processing: {}", myInstanceId); 084 StopWatch stopWatch = new StopWatch(); 085 086 JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null); 087 if (theInstance == null) { 088 return; 089 } 090 091 boolean cancelUpdate = handleCancellation(theInstance); 092 if (cancelUpdate) { 093 // reload after update 094 theInstance = myJobPersistence.fetchInstance(myInstanceId).orElseThrow(); 095 } 096 097 JobDefinition<? extends IModelJson> jobDefinition = 098 myJobDefinitionegistry.getJobDefinitionOrThrowException(theInstance); 099 100 // move POLL_WAITING -> READY 101 processPollingChunks(theInstance.getInstanceId()); 102 // determine job progress; delete CANCELED/COMPLETE/FAILED jobs that are no longer needed 103 cleanupInstance(theInstance); 104 // move gated jobs to the next step, if needed 105 // moves GATE_WAITING / QUEUED (legacy) chunks to: 106 // READY (for regular gated jobs) 107 // REDUCTION_READY (if it's the final reduction step) 108 triggerGatedExecutions(theInstance, jobDefinition); 109 110 if (theInstance.hasGatedStep() && theInstance.isRunning()) { 111 Optional<JobInstance> updatedInstance = myJobPersistence.fetchInstance(theInstance.getInstanceId()); 112 113 if (updatedInstance.isEmpty()) { 114 return; 115 } 116 117 JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( 118 jobDefinition, updatedInstance.get().getCurrentGatedStepId()); 119 if (jobWorkCursor.isReductionStep()) { 120 // Reduction step work chunks should never be sent to the queue but to its specific service instead. 121 triggerReductionStep(theInstance, jobWorkCursor); 122 return; 123 } 124 } 125 126 // enqueue all READY chunks 127 enqueueReadyChunks(theInstance, jobDefinition); 128 129 ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch); 130 } 131 132 private boolean handleCancellation(JobInstance theInstance) { 133 if (theInstance.isPendingCancellationRequest()) { 134 String errorMessage = buildCancelledMessage(theInstance); 135 ourLog.info("Job {} moving to CANCELLED", theInstance.getInstanceId()); 136 return myJobPersistence.updateInstance(theInstance.getInstanceId(), instance -> { 137 boolean changed = myJobInstanceStatusUpdater.updateInstanceStatus(instance, StatusEnum.CANCELLED); 138 if (changed) { 139 instance.setErrorMessage(errorMessage); 140 } 141 return changed; 142 }); 143 } 144 return false; 145 } 146 147 private String buildCancelledMessage(JobInstance theInstance) { 148 String msg = "Job instance cancelled"; 149 if (theInstance.hasGatedStep()) { 150 msg += " while running step " + theInstance.getCurrentGatedStepId(); 151 } 152 return msg; 153 } 154 155 private void cleanupInstance(JobInstance theInstance) { 156 switch (theInstance.getStatus()) { 157 case QUEUED: 158 // If we're still QUEUED, there are no stats to calculate 159 break; 160 case FINALIZE: 161 // If we're in FINALIZE, the reduction step is working, so we should stay out of the way until it 162 // marks the job as COMPLETED 163 return; 164 case IN_PROGRESS: 165 case ERRORED: 166 myJobInstanceProgressCalculator.calculateAndStoreInstanceProgress(theInstance.getInstanceId()); 167 break; 168 case COMPLETED: 169 case FAILED: 170 if (purgeExpiredInstance(theInstance)) { 171 return; 172 } 173 break; 174 case CANCELLED: 175 purgeExpiredInstance(theInstance); 176 // wipmb For 6.8 - Are we deliberately not purging chunks for cancelled jobs? This is a very 177 // complicated way to say that. 178 return; 179 } 180 181 if (theInstance.isFinished() && !theInstance.isWorkChunksPurged()) { 182 myJobPersistence.deleteChunksAndMarkInstanceAsChunksPurged(theInstance.getInstanceId()); 183 } 184 } 185 186 private boolean purgeExpiredInstance(JobInstance theInstance) { 187 if (theInstance.getEndTime() != null) { 188 long cutoff = System.currentTimeMillis() - PURGE_THRESHOLD; 189 if (theInstance.getEndTime().getTime() < cutoff) { 190 ourLog.info("Deleting old job instance {}", theInstance.getInstanceId()); 191 myJobPersistence.deleteInstanceAndChunks(theInstance.getInstanceId()); 192 return true; 193 } 194 } 195 return false; 196 } 197 198 private void triggerGatedExecutions(JobInstance theInstance, JobDefinition<?> theJobDefinition) { 199 // QUEUE'd jobs that are gated need to start; this step will do that 200 if (!theInstance.isRunning() 201 && (theInstance.getStatus() != StatusEnum.QUEUED && theJobDefinition.isGatedExecution())) { 202 ourLog.debug( 203 "JobInstance {} is not in a \"running\" state. Status {}", 204 theInstance.getInstanceId(), 205 theInstance.getStatus()); 206 return; 207 } 208 209 if (!theInstance.hasGatedStep()) { 210 return; 211 } 212 213 JobWorkCursor<?, ?, ?> jobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( 214 theJobDefinition, theInstance.getCurrentGatedStepId()); 215 String instanceId = theInstance.getInstanceId(); 216 String currentStepId = jobWorkCursor.getCurrentStepId(); 217 boolean canAdvance = canAdvanceGatedJob(theInstance); 218 if (canAdvance) { 219 if (!jobWorkCursor.isFinalStep()) { 220 // all other gated job steps except for final steps - final steps does not need to be advanced 221 String nextStepId = jobWorkCursor.nextStep.getStepId(); 222 ourLog.info( 223 "All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", 224 instanceId, 225 currentStepId, 226 nextStepId); 227 228 processChunksForNextGatedSteps(theInstance, theJobDefinition, nextStepId); 229 } else { 230 ourLog.info( 231 "Ready to advance gated execution of instance {} but already at the final step {}. Not proceeding to advance steps.", 232 instanceId, 233 jobWorkCursor.getCurrentStepId()); 234 } 235 } else { 236 String stepId = jobWorkCursor.nextStep != null 237 ? jobWorkCursor.nextStep.getStepId() 238 : jobWorkCursor.getCurrentStepId(); 239 ourLog.debug( 240 "Not ready to advance gated execution of instance {} from step {} to {}.", 241 instanceId, 242 currentStepId, 243 stepId); 244 } 245 } 246 247 private boolean canAdvanceGatedJob(JobInstance theInstance) { 248 // make sure our instance still exists 249 if (myJobPersistence.fetchInstance(theInstance.getInstanceId()).isEmpty()) { 250 // no more job 251 return false; 252 } 253 String currentGatedStepId = theInstance.getCurrentGatedStepId(); 254 255 Set<WorkChunkStatusEnum> workChunkStatuses = myJobPersistence.getDistinctWorkChunkStatesForJobAndStep( 256 theInstance.getInstanceId(), currentGatedStepId); 257 258 if (workChunkStatuses.isEmpty()) { 259 // no work chunks = no output 260 // trivial to advance to next step 261 ourLog.info("No workchunks for {} in step id {}", theInstance.getInstanceId(), currentGatedStepId); 262 return true; 263 } 264 265 // all workchunks for the current step are in COMPLETED -> proceed. 266 return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); 267 } 268 269 protected PagingIterator<WorkChunkMetadata> getReadyChunks() { 270 return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> { 271 Pageable pageable = Pageable.ofSize(batchsize).withPage(index); 272 Page<WorkChunkMetadata> results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( 273 pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); 274 for (WorkChunkMetadata metadata : results) { 275 consumer.accept(metadata); 276 } 277 }); 278 } 279 280 /** 281 * Trigger the reduction step for the given job instance. Reduction step chunks should never be queued. 282 */ 283 private void triggerReductionStep(JobInstance theInstance, JobWorkCursor<?, ?, ?> jobWorkCursor) { 284 String instanceId = theInstance.getInstanceId(); 285 ourLog.debug("Triggering Reduction step {} of instance {}.", jobWorkCursor.getCurrentStepId(), instanceId); 286 myReductionStepExecutorService.triggerReductionStep(instanceId, jobWorkCursor); 287 } 288 289 /** 290 * Chunks are initially created in READY state. 291 * We will move READY chunks to QUEUE'd and send them to the queue/topic (kafka) 292 * for processing. 293 */ 294 private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) { 295 Iterator<WorkChunkMetadata> iter = getReadyChunks(); 296 297 int counter = 0; 298 while (iter.hasNext()) { 299 WorkChunkMetadata metadata = iter.next(); 300 301 /* 302 * For each chunk id 303 * * Move to QUEUE'd 304 * * Send to topic 305 * * flush changes 306 * * commit 307 */ 308 updateChunkAndSendToQueue(metadata); 309 counter++; 310 } 311 ourLog.debug( 312 "Encountered {} READY work chunks for job {} of type {}", 313 counter, 314 theJobInstance.getInstanceId(), 315 theJobDefinition.getJobDefinitionId()); 316 } 317 318 /** 319 * Updates the Work Chunk and sends it to the queue. 320 * 321 * Because ReductionSteps are done inline by the maintenance pass, 322 * those will not be sent to the queue (but they will still have their 323 * status updated from READY -> QUEUED). 324 * 325 * Returns true after processing. 326 */ 327 private void updateChunkAndSendToQueue(WorkChunkMetadata theChunk) { 328 String chunkId = theChunk.getId(); 329 myJobPersistence.enqueueWorkChunkForProcessing(chunkId, updated -> { 330 ourLog.info("Updated {} workchunk with id {}", updated, chunkId); 331 if (updated == 1) { 332 sendNotification(theChunk); 333 } else { 334 // means the work chunk is likely already gone... 335 // we'll log and skip it. If it's still in the DB, the next pass 336 // will pick it up. Otherwise, it's no longer important 337 ourLog.error( 338 "Job Instance {} failed to transition work chunk with id {} from READY to QUEUED; found {}, expected 1; skipping work chunk.", 339 theChunk.getInstanceId(), 340 theChunk.getId(), 341 updated); 342 } 343 }); 344 } 345 346 private void sendNotification(WorkChunkMetadata theChunk) { 347 // send to the queue 348 // we use current step id because it has not been moved to the next step (yet) 349 JobWorkNotification workNotification = new JobWorkNotification( 350 theChunk.getJobDefinitionId(), 351 theChunk.getJobDefinitionVersion(), 352 theChunk.getInstanceId(), 353 theChunk.getTargetStepId(), 354 theChunk.getId()); 355 myBatchJobSender.sendWorkChannelMessage(workNotification); 356 } 357 358 private void processChunksForNextGatedSteps( 359 JobInstance theInstance, JobDefinition<?> theJobDefinition, String nextStepId) { 360 String instanceId = theInstance.getInstanceId(); 361 362 List<String> gateWaitingChunksForNextStep = myProgressAccumulator.getChunkIdsWithStatus( 363 instanceId, nextStepId, WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED); 364 int totalChunksForNextStep = myProgressAccumulator.getTotalChunkCountForInstanceAndStep(instanceId, nextStepId); 365 if (totalChunksForNextStep != gateWaitingChunksForNextStep.size()) { 366 ourLog.debug( 367 "Total ProgressAccumulator GATE_WAITING chunk count does not match GATE_WAITING chunk size! [instanceId={}, stepId={}, totalChunks={}, queuedChunks={}]", 368 instanceId, 369 nextStepId, 370 totalChunksForNextStep, 371 gateWaitingChunksForNextStep.size()); 372 } 373 374 JobWorkCursor<?, ?, ?> jobWorkCursor = 375 JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, nextStepId); 376 377 // update the job step so the workers will process them. 378 // Sets all chunks from QUEUED/GATE_WAITING -> READY (REDUCTION_READY for reduction jobs) 379 myJobPersistence.advanceJobStepAndUpdateChunkStatus(instanceId, nextStepId, jobWorkCursor.isReductionStep()); 380 } 381 382 /** 383 * Moves all POLL_WAITING work chunks to READY for work chunks whose 384 * nextPollTime has expired. 385 */ 386 private void processPollingChunks(String theInstanceId) { 387 int updatedChunkCount = myJobPersistence.updatePollWaitingChunksForJobIfReady(theInstanceId); 388 389 ourLog.debug( 390 "Moved {} Work Chunks in POLL_WAITING to READY for Job Instance {}", updatedChunkCount, theInstanceId); 391 } 392}