001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.maintenance; 021 022import ca.uhn.fhir.batch2.api.IJobMaintenanceService; 023import ca.uhn.fhir.batch2.api.IJobPersistence; 024import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; 025import ca.uhn.fhir.batch2.channel.BatchJobSender; 026import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; 027import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; 028import ca.uhn.fhir.batch2.model.JobInstance; 029import ca.uhn.fhir.i18n.Msg; 030import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 031import ca.uhn.fhir.jpa.model.sched.HapiJob; 032import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 033import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 034import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 035import ca.uhn.fhir.util.Logs; 036import com.google.common.annotations.VisibleForTesting; 037import jakarta.annotation.Nonnull; 038import org.apache.commons.lang3.Validate; 039import org.apache.commons.lang3.time.DateUtils; 040import org.quartz.JobExecutionContext; 041import org.slf4j.Logger; 042import org.springframework.beans.factory.annotation.Autowired; 043 044import java.util.HashSet; 045import java.util.List; 046import java.util.Set; 047import java.util.concurrent.Semaphore; 048import java.util.concurrent.TimeUnit; 049 050/** 051 * This class performs regular polls of the stored jobs in order to 052 * perform maintenance. This includes two major functions. 053 * 054 * <p> 055 * First, we calculate statistics and delete expired tasks. This class does 056 * the following things: 057 * <ul> 058 * <li>For instances that are IN_PROGRESS, calculates throughput and percent complete</li> 059 * <li>For instances that are IN_PROGRESS where all chunks are COMPLETE, marks instance as COMPLETE</li> 060 * <li>For instances that are COMPLETE, purges chunk data</li> 061 * <li>For instances that are IN_PROGRESS where at least one chunk is FAILED, marks instance as FAILED and propagates the error message to the instance, and purges chunk data</li> 062 * <li>For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)</li> 063 * <li>For instances that are IN_PROGRESS and isCancelled flag is set marks them as ERRORED and indicating the current running step if any</li> 064 * <li>For instances that are COMPLETE or FAILED and are old, delete them entirely</li> 065 * </ul> 066 * </p> 067 * 068 * <p> 069 * Second, we check for any job instances where the job is configured to 070 * have gated execution. For these instances, we check if the current step 071 * is complete (all chunks are in COMPLETE status) and trigger the next step. 072 * </p> 073 * 074 * <p> 075 * The maintenance pass is run once per minute. However if a gated job is fast-tracking (i.e. every step produced 076 * exactly one chunk, then the maintenance task will be triggered earlier than scheduled by the step executor. 077 * </p> 078 */ 079public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasScheduledJobs { 080 static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 081 082 public static final int INSTANCES_PER_PASS = 100; 083 public static final String SCHEDULED_JOB_ID = JobMaintenanceScheduledJob.class.getName(); 084 public static final int MAINTENANCE_TRIGGER_RUN_WITHOUT_SCHEDULER_TIMEOUT = 5; 085 086 private final IJobPersistence myJobPersistence; 087 private final ISchedulerService mySchedulerService; 088 private final JpaStorageSettings myStorageSettings; 089 private final JobDefinitionRegistry myJobDefinitionRegistry; 090 private final BatchJobSender myBatchJobSender; 091 private final WorkChunkProcessor myJobExecutorSvc; 092 093 private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1); 094 095 private long myScheduledJobFrequencyMillis = DateUtils.MILLIS_PER_MINUTE; 096 private Runnable myMaintenanceJobStartedCallback = () -> {}; 097 private Runnable myMaintenanceJobFinishedCallback = () -> {}; 098 private final IReductionStepExecutorService myReductionStepExecutorService; 099 100 private boolean myEnabledBool = true; 101 102 /** 103 * Constructor 104 */ 105 public JobMaintenanceServiceImpl( 106 @Nonnull ISchedulerService theSchedulerService, 107 @Nonnull IJobPersistence theJobPersistence, 108 JpaStorageSettings theStorageSettings, 109 @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, 110 @Nonnull BatchJobSender theBatchJobSender, 111 @Nonnull WorkChunkProcessor theExecutor, 112 @Nonnull IReductionStepExecutorService theReductionStepExecutorService) { 113 myStorageSettings = theStorageSettings; 114 myReductionStepExecutorService = theReductionStepExecutorService; 115 Validate.notNull(theSchedulerService); 116 Validate.notNull(theJobPersistence); 117 Validate.notNull(theJobDefinitionRegistry); 118 Validate.notNull(theBatchJobSender); 119 120 myJobPersistence = theJobPersistence; 121 mySchedulerService = theSchedulerService; 122 myJobDefinitionRegistry = theJobDefinitionRegistry; 123 myBatchJobSender = theBatchJobSender; 124 myJobExecutorSvc = theExecutor; 125 } 126 127 @Override 128 public void scheduleJobs(ISchedulerService theSchedulerService) { 129 mySchedulerService.scheduleClusteredJob(myScheduledJobFrequencyMillis, buildJobDefinition()); 130 } 131 132 @Nonnull 133 private ScheduledJobDefinition buildJobDefinition() { 134 ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); 135 jobDefinition.setId(SCHEDULED_JOB_ID); 136 jobDefinition.setJobClass(JobMaintenanceScheduledJob.class); 137 return jobDefinition; 138 } 139 140 public void setScheduledJobFrequencyMillis(long theScheduledJobFrequencyMillis) { 141 myScheduledJobFrequencyMillis = theScheduledJobFrequencyMillis; 142 } 143 144 /** 145 * @return true if a request to run a maintance pass was submitted 146 */ 147 @Override 148 public boolean triggerMaintenancePass() { 149 if (!myStorageSettings.isJobFastTrackingEnabled()) { 150 return false; 151 } 152 if (mySchedulerService.isClusteredSchedulingEnabled()) { 153 mySchedulerService.triggerClusteredJobImmediately(buildJobDefinition()); 154 return true; 155 } else { 156 // We are probably running a unit test 157 return runMaintenanceDirectlyWithTimeout(); 158 } 159 } 160 161 private boolean runMaintenanceDirectlyWithTimeout() { 162 if (getQueueLength() > 0) { 163 ourLog.debug( 164 "There are already {} threads waiting to run a maintenance pass. Ignoring request.", 165 getQueueLength()); 166 return false; 167 } 168 169 try { 170 ourLog.debug( 171 "There is no clustered scheduling service. Requesting semaphore to run maintenance pass directly."); 172 // Some unit test, esp. the Loinc terminology tests, depend on this maintenance pass being run shortly after 173 // it is requested 174 if (myRunMaintenanceSemaphore.tryAcquire( 175 MAINTENANCE_TRIGGER_RUN_WITHOUT_SCHEDULER_TIMEOUT, TimeUnit.MINUTES)) { 176 ourLog.debug("Semaphore acquired. Starting maintenance pass."); 177 doMaintenancePass(); 178 } 179 return true; 180 } catch (InterruptedException e) { 181 throw new RuntimeException(Msg.code(2134) + "Timed out waiting to run a maintenance pass", e); 182 } finally { 183 ourLog.debug("Maintenance pass complete. Releasing semaphore."); 184 myRunMaintenanceSemaphore.release(); 185 } 186 } 187 188 @VisibleForTesting 189 int getQueueLength() { 190 return myRunMaintenanceSemaphore.getQueueLength(); 191 } 192 193 @Override 194 @VisibleForTesting 195 public void forceMaintenancePass() { 196 // to simulate a long running job! 197 ourLog.info("Forcing a maintenance pass run; semaphore at {}", getQueueLength()); 198 doMaintenancePass(); 199 } 200 201 @Override 202 public void enableMaintenancePass(boolean theToEnable) { 203 myEnabledBool = theToEnable; 204 } 205 206 @Override 207 public void runMaintenancePass() { 208 if (!myEnabledBool) { 209 ourLog.error("Maintenance job is disabled! This will affect all batch2 jobs!"); 210 } 211 212 if (!myRunMaintenanceSemaphore.tryAcquire()) { 213 ourLog.debug("Another maintenance pass is already in progress. Ignoring request."); 214 return; 215 } 216 try { 217 ourLog.debug("Maintenance pass starting."); 218 doMaintenancePass(); 219 } catch (Exception e) { 220 ourLog.error("Maintenance pass failed", e); 221 } finally { 222 myRunMaintenanceSemaphore.release(); 223 } 224 } 225 226 private void doMaintenancePass() { 227 myMaintenanceJobStartedCallback.run(); 228 Set<String> processedInstanceIds = new HashSet<>(); 229 JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator(); 230 for (int page = 0; ; page++) { 231 List<JobInstance> instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page); 232 233 for (JobInstance instance : instances) { 234 String instanceId = instance.getInstanceId(); 235 if (myJobDefinitionRegistry 236 .getJobDefinition(instance.getJobDefinitionId(), instance.getJobDefinitionVersion()) 237 .isPresent()) { 238 if (processedInstanceIds.add(instanceId)) { 239 myJobDefinitionRegistry.setJobDefinition(instance); 240 JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor( 241 myJobPersistence, 242 myBatchJobSender, 243 instanceId, 244 progressAccumulator, 245 myReductionStepExecutorService, 246 myJobDefinitionRegistry); 247 ourLog.debug( 248 "Triggering maintenance process for instance {} in status {}", 249 instanceId, 250 instance.getStatus()); 251 jobInstanceProcessor.process(); 252 } 253 } else { 254 ourLog.warn( 255 "Job definition {} for instance {} is currently unavailable", 256 instance.getJobDefinitionId(), 257 instanceId); 258 } 259 } 260 261 if (instances.size() < INSTANCES_PER_PASS) { 262 break; 263 } 264 } 265 myMaintenanceJobFinishedCallback.run(); 266 } 267 268 public void setMaintenanceJobStartedCallback(Runnable theMaintenanceJobStartedCallback) { 269 myMaintenanceJobStartedCallback = theMaintenanceJobStartedCallback; 270 } 271 272 public void setMaintenanceJobFinishedCallback(Runnable theMaintenanceJobFinishedCallback) { 273 myMaintenanceJobFinishedCallback = theMaintenanceJobFinishedCallback; 274 } 275 276 public static class JobMaintenanceScheduledJob implements HapiJob { 277 @Autowired 278 private IJobMaintenanceService myTarget; 279 280 @Override 281 public void execute(JobExecutionContext theContext) { 282 myTarget.runMaintenancePass(); 283 } 284 } 285}