001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.coordinator; 021 022import ca.uhn.fhir.batch2.api.ChunkExecutionDetails; 023import ca.uhn.fhir.batch2.api.IJobCompletionHandler; 024import ca.uhn.fhir.batch2.api.IJobPersistence; 025import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; 026import ca.uhn.fhir.batch2.api.IReductionStepWorker; 027import ca.uhn.fhir.batch2.api.JobCompletionDetails; 028import ca.uhn.fhir.batch2.api.StepExecutionDetails; 029import ca.uhn.fhir.batch2.model.ChunkOutcome; 030import ca.uhn.fhir.batch2.model.JobDefinitionStep; 031import ca.uhn.fhir.batch2.model.JobInstance; 032import ca.uhn.fhir.batch2.model.JobWorkCursor; 033import ca.uhn.fhir.batch2.model.StatusEnum; 034import ca.uhn.fhir.batch2.model.WorkChunk; 035import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 036import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils; 037import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 038import ca.uhn.fhir.jpa.model.sched.HapiJob; 039import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 040import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 041import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 042import ca.uhn.fhir.model.api.IModelJson; 043import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 044import com.google.common.annotations.VisibleForTesting; 045import io.opentelemetry.instrumentation.annotations.WithSpan; 046import jakarta.annotation.Nonnull; 047import org.apache.commons.lang3.time.DateUtils; 048import org.quartz.JobExecutionContext; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051import org.springframework.beans.factory.annotation.Autowired; 052import org.springframework.context.event.ContextClosedEvent; 053import org.springframework.context.event.ContextRefreshedEvent; 054import org.springframework.context.event.EventListener; 055import org.springframework.scheduling.concurrent.CustomizableThreadFactory; 056import org.springframework.transaction.annotation.Propagation; 057 058import java.util.Collections; 059import java.util.EnumSet; 060import java.util.LinkedHashMap; 061import java.util.Map; 062import java.util.Timer; 063import java.util.TimerTask; 064import java.util.concurrent.Callable; 065import java.util.concurrent.ExecutorService; 066import java.util.concurrent.Executors; 067import java.util.concurrent.Semaphore; 068import java.util.concurrent.atomic.AtomicReference; 069import java.util.stream.Stream; 070 071import static ca.uhn.fhir.batch2.model.StatusEnum.COMPLETED; 072import static ca.uhn.fhir.batch2.model.StatusEnum.ERRORED; 073import static ca.uhn.fhir.batch2.model.StatusEnum.FINALIZE; 074import static ca.uhn.fhir.batch2.model.StatusEnum.IN_PROGRESS; 075import static ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils.JOB_STEP_EXECUTION_SPAN_NAME; 076 077public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorService, IHasScheduledJobs { 078 public static final String SCHEDULED_JOB_ID = ReductionStepExecutorScheduledJob.class.getName(); 079 private static final Logger ourLog = LoggerFactory.getLogger(ReductionStepExecutorServiceImpl.class); 080 private final Map<String, JobWorkCursor> myInstanceIdToJobWorkCursor = 081 Collections.synchronizedMap(new LinkedHashMap<>()); 082 private final ExecutorService myReducerExecutor; 083 private final IJobPersistence myJobPersistence; 084 private final IHapiTransactionService myTransactionService; 085 private final Semaphore myCurrentlyExecuting = new Semaphore(1); 086 private final AtomicReference<String> myCurrentlyFinalizingInstanceId = new AtomicReference<>(); 087 private final JobDefinitionRegistry myJobDefinitionRegistry; 088 private Timer myHeartbeatTimer; 089 090 /** 091 * Constructor 092 */ 093 public ReductionStepExecutorServiceImpl( 094 IJobPersistence theJobPersistence, 095 IHapiTransactionService theTransactionService, 096 JobDefinitionRegistry theJobDefinitionRegistry) { 097 myJobPersistence = theJobPersistence; 098 myTransactionService = theTransactionService; 099 myJobDefinitionRegistry = theJobDefinitionRegistry; 100 101 myReducerExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("batch2-reducer")); 102 } 103 104 @EventListener(ContextRefreshedEvent.class) 105 public void start() { 106 if (myHeartbeatTimer == null) { 107 myHeartbeatTimer = new Timer("batch2-reducer-heartbeat"); 108 myHeartbeatTimer.schedule( 109 new HeartbeatTimerTask(), DateUtils.MILLIS_PER_MINUTE, DateUtils.MILLIS_PER_MINUTE); 110 } 111 } 112 113 private void runHeartbeat() { 114 String currentlyFinalizingInstanceId = myCurrentlyFinalizingInstanceId.get(); 115 if (currentlyFinalizingInstanceId != null) { 116 ourLog.info("Running heartbeat for instance: {}", currentlyFinalizingInstanceId); 117 executeInTransactionWithSynchronization(() -> { 118 myJobPersistence.updateInstanceUpdateTime(currentlyFinalizingInstanceId); 119 return null; 120 }); 121 } 122 } 123 124 @EventListener(ContextClosedEvent.class) 125 public void shutdown() { 126 if (myHeartbeatTimer != null) { 127 myHeartbeatTimer.cancel(); 128 myHeartbeatTimer = null; 129 } 130 } 131 132 @Override 133 public void triggerReductionStep(String theInstanceId, JobWorkCursor<?, ?, ?> theJobWorkCursor) { 134 myInstanceIdToJobWorkCursor.putIfAbsent(theInstanceId, theJobWorkCursor); 135 if (myCurrentlyExecuting.availablePermits() > 0) { 136 myReducerExecutor.submit(this::reducerPass); 137 } 138 } 139 140 @Override 141 public void reducerPass() { 142 if (myCurrentlyExecuting.tryAcquire()) { 143 try { 144 String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]); 145 if (instanceIds.length > 0) { 146 String instanceId = instanceIds[0]; 147 myCurrentlyFinalizingInstanceId.set(instanceId); 148 JobWorkCursor<?, ?, ?> jobWorkCursor = myInstanceIdToJobWorkCursor.get(instanceId); 149 executeReductionStep(instanceId, jobWorkCursor); 150 151 // If we get here, this succeeded. Purge the instance from the work queue 152 myInstanceIdToJobWorkCursor.remove(instanceId); 153 } 154 155 } catch (Exception e) { 156 ourLog.error("Failed to execute reducer pass", e); 157 } finally { 158 myCurrentlyFinalizingInstanceId.set(null); 159 myCurrentlyExecuting.release(); 160 } 161 } 162 } 163 164 @VisibleForTesting 165 @WithSpan(JOB_STEP_EXECUTION_SPAN_NAME) 166 <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> 167 ReductionStepChunkProcessingResponse executeReductionStep( 168 String theInstanceId, JobWorkCursor<PT, IT, OT> theJobWorkCursor) { 169 170 BatchJobOpenTelemetryUtils.addAttributesToCurrentSpan( 171 theJobWorkCursor.getJobDefinition().getJobDefinitionId(), 172 theJobWorkCursor.getJobDefinition().getJobDefinitionVersion(), 173 theInstanceId, 174 theJobWorkCursor.getCurrentStepId(), 175 null); 176 177 JobDefinitionStep<PT, IT, OT> step = theJobWorkCursor.getCurrentStep(); 178 179 // wipmb For 6.8 - this runs four tx. That's at least 2 too many 180 // combine the fetch and the case statement. Use optional for the boolean. 181 JobInstance instance = executeInTransactionWithSynchronization(() -> myJobPersistence 182 .fetchInstance(theInstanceId) 183 .orElseThrow(() -> new InternalErrorException("Unknown instance: " + theInstanceId))); 184 185 boolean shouldProceed = false; 186 switch (instance.getStatus()) { 187 case IN_PROGRESS: 188 case ERRORED: 189 // this will take a write lock on the JobInstance, preventing duplicates. 190 boolean changed = 191 executeInTransactionWithSynchronization(() -> myJobPersistence.markInstanceAsStatusWhenStatusIn( 192 instance.getInstanceId(), FINALIZE, EnumSet.of(IN_PROGRESS, ERRORED))); 193 if (changed) { 194 ourLog.info( 195 "Job instance {} has been set to FINALIZE state - Beginning reducer step", 196 instance.getInstanceId()); 197 shouldProceed = true; 198 } 199 break; 200 case FINALIZE: 201 case COMPLETED: 202 case FAILED: 203 case QUEUED: 204 case CANCELLED: 205 break; 206 } 207 208 if (!shouldProceed) { 209 ourLog.warn( 210 "JobInstance[{}] should not be finalized at this time. In memory status is {}. Reduction step will not rerun!" 211 + " This could be a long running reduction job resulting in the processed msg not being acknowledged," 212 + " or the result of a failed process or server restarting.", 213 instance.getInstanceId(), 214 instance.getStatus()); 215 return new ReductionStepChunkProcessingResponse(false); 216 } 217 218 PT parameters = 219 instance.getParameters(theJobWorkCursor.getJobDefinition().getParametersType()); 220 IReductionStepWorker<PT, IT, OT> reductionStepWorker = 221 (IReductionStepWorker<PT, IT, OT>) step.getJobStepWorker(); 222 223 instance.setStatus(FINALIZE); 224 225 boolean defaultSuccessValue = true; 226 ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue); 227 228 try { 229 processChunksAndCompleteJob(theJobWorkCursor, step, instance, parameters, reductionStepWorker, response); 230 } catch (Exception ex) { 231 ourLog.error("Job completion failed for Job {}", instance.getInstanceId(), ex); 232 233 executeInTransactionWithSynchronization(() -> { 234 myJobPersistence.updateInstance(instance.getInstanceId(), theInstance -> { 235 theInstance.setStatus(StatusEnum.FAILED); 236 return true; 237 }); 238 return null; 239 }); 240 response.setSuccessful(false); 241 } 242 243 // if no successful chunks, return false 244 if (!response.hasSuccessfulChunksIds()) { 245 response.setSuccessful(false); 246 } 247 248 return response; 249 } 250 251 private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunksAndCompleteJob( 252 JobWorkCursor<PT, IT, OT> theJobWorkCursor, 253 JobDefinitionStep<PT, IT, OT> step, 254 JobInstance instance, 255 PT parameters, 256 IReductionStepWorker<PT, IT, OT> reductionStepWorker, 257 ReductionStepChunkProcessingResponse response) { 258 try { 259 executeInTransactionWithSynchronization(() -> { 260 try (Stream<WorkChunk> chunkIterator = 261 myJobPersistence.fetchAllWorkChunksForStepStream(instance.getInstanceId(), step.getStepId())) { 262 chunkIterator.forEach(chunk -> 263 processChunk(chunk, instance, parameters, reductionStepWorker, response, theJobWorkCursor)); 264 } 265 return null; 266 }); 267 } finally { 268 executeInTransactionWithSynchronization(() -> { 269 ourLog.info( 270 "Reduction step for instance[{}] produced {} successful and {} failed chunks", 271 instance.getInstanceId(), 272 response.getSuccessfulChunkIds().size(), 273 response.getFailedChunksIds().size()); 274 275 ReductionStepDataSink<PT, IT, OT> dataSink = new ReductionStepDataSink<>( 276 instance.getInstanceId(), theJobWorkCursor, myJobPersistence, myJobDefinitionRegistry); 277 StepExecutionDetails<PT, IT> chunkDetails = 278 StepExecutionDetails.createReductionStepDetails(parameters, null, instance); 279 280 if (response.isSuccessful()) { 281 reductionStepWorker.run(chunkDetails, dataSink); 282 283 // the ReductionStepDataSink will update the job status to COMPLETED 284 // we should update instance here to keep it consistent with the newest version in persistence 285 instance.setStatus(COMPLETED); 286 } 287 288 if (response.hasSuccessfulChunksIds()) { 289 // complete the steps without making a new work chunk 290 myJobPersistence.markWorkChunksWithStatusAndWipeData( 291 instance.getInstanceId(), 292 response.getSuccessfulChunkIds(), 293 WorkChunkStatusEnum.COMPLETED, 294 null // error message - none 295 ); 296 } 297 298 if (response.hasFailedChunkIds()) { 299 // mark any failed chunks as failed for aborting 300 myJobPersistence.markWorkChunksWithStatusAndWipeData( 301 instance.getInstanceId(), 302 response.getFailedChunksIds(), 303 WorkChunkStatusEnum.FAILED, 304 "JOB ABORTED"); 305 } 306 307 if (response.isSuccessful()) { 308 /** 309 * All reduction steps are final steps. 310 */ 311 IJobCompletionHandler<PT> completionHandler = 312 theJobWorkCursor.getJobDefinition().getCompletionHandler(); 313 if (completionHandler != null) { 314 completionHandler.jobComplete(new JobCompletionDetails<>(parameters, instance)); 315 } 316 } 317 318 return null; 319 }); 320 } 321 } 322 323 private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) { 324 return myTransactionService 325 .withRequest(null) 326 .withPropagation(Propagation.REQUIRES_NEW) 327 .execute(runnable); 328 } 329 330 @Override 331 public void scheduleJobs(ISchedulerService theSchedulerService) { 332 theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, buildJobDefinition()); 333 } 334 335 @Nonnull 336 private ScheduledJobDefinition buildJobDefinition() { 337 ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); 338 jobDefinition.setId(SCHEDULED_JOB_ID); 339 jobDefinition.setJobClass(ReductionStepExecutorScheduledJob.class); 340 return jobDefinition; 341 } 342 343 private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunk( 344 WorkChunk theChunk, 345 JobInstance theInstance, 346 PT theParameters, 347 IReductionStepWorker<PT, IT, OT> theReductionStepWorker, 348 ReductionStepChunkProcessingResponse theResponseObject, 349 JobWorkCursor<PT, IT, OT> theJobWorkCursor) { 350 351 /* 352 * Reduction steps are done inline and only on gated jobs. 353 */ 354 if (theChunk.getStatus() == WorkChunkStatusEnum.COMPLETED) { 355 // This should never happen since jobs with reduction are required to be gated 356 ourLog.error( 357 "Unexpected chunk {} with status {} found while reducing {}. No chunks feeding into a reduction step should be in a state other than READY.", 358 theChunk.getId(), 359 theChunk.getStatus(), 360 theInstance); 361 return; 362 } 363 364 if (theResponseObject.hasFailedChunkIds()) { 365 // we are going to fail all future chunks now 366 theResponseObject.addFailedChunkId(theChunk); 367 } else { 368 try { 369 // feed them into our reduction worker 370 // this is the most likely area to throw, 371 // as this is where db actions and processing is likely to happen 372 IT chunkData = 373 theChunk.getData(theJobWorkCursor.getCurrentStep().getInputType()); 374 ChunkExecutionDetails<PT, IT> chunkDetails = new ChunkExecutionDetails<>( 375 chunkData, theParameters, theInstance.getInstanceId(), theChunk.getId()); 376 377 ChunkOutcome outcome = theReductionStepWorker.consume(chunkDetails); 378 379 switch (outcome.getStatus()) { 380 case SUCCESS: 381 theResponseObject.addSuccessfulChunkId(theChunk); 382 break; 383 384 case FAILED: 385 ourLog.error("Processing of work chunk {} resulted in aborting job.", theChunk.getId()); 386 387 // fail entire job - including all future workchunks 388 theResponseObject.addFailedChunkId(theChunk); 389 theResponseObject.setSuccessful(false); 390 break; 391 } 392 } catch (Exception e) { 393 String msg = String.format( 394 "Reduction step failed to execute chunk reduction for chunk %s with exception: %s.", 395 theChunk.getId(), e.getMessage()); 396 // we got a failure in a reduction 397 ourLog.error(msg, e); 398 theResponseObject.setSuccessful(false); 399 400 myJobPersistence.onWorkChunkFailed(theChunk.getId(), msg); 401 } 402 } 403 } 404 405 private class HeartbeatTimerTask extends TimerTask { 406 @Override 407 public void run() { 408 runHeartbeat(); 409 } 410 } 411 412 public static class ReductionStepExecutorScheduledJob implements HapiJob { 413 @Autowired 414 private IReductionStepExecutorService myTarget; 415 416 @Override 417 public void execute(JobExecutionContext theContext) { 418 myTarget.reducerPass(); 419 } 420 } 421}