001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.maintenance;
021
022import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
023import ca.uhn.fhir.batch2.api.IJobPersistence;
024import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
025import ca.uhn.fhir.batch2.channel.BatchJobSender;
026import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
027import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
028import ca.uhn.fhir.batch2.model.JobInstance;
029import ca.uhn.fhir.i18n.Msg;
030import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
031import ca.uhn.fhir.jpa.model.sched.HapiJob;
032import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
033import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
034import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
035import ca.uhn.fhir.util.Logs;
036import com.google.common.annotations.VisibleForTesting;
037import jakarta.annotation.Nonnull;
038import org.apache.commons.lang3.Validate;
039import org.apache.commons.lang3.time.DateUtils;
040import org.quartz.JobExecutionContext;
041import org.slf4j.Logger;
042import org.springframework.beans.factory.annotation.Autowired;
043
044import java.util.HashSet;
045import java.util.List;
046import java.util.Set;
047import java.util.concurrent.Semaphore;
048import java.util.concurrent.TimeUnit;
049
050/**
051 * This class performs regular polls of the stored jobs in order to
052 * perform maintenance. This includes two major functions.
053 *
054 * <p>
055 * First, we calculate statistics and delete expired tasks. This class does
056 * the following things:
057 * <ul>
058 *    <li>For instances that are IN_PROGRESS, calculates throughput and percent complete</li>
059 *    <li>For instances that are IN_PROGRESS where all chunks are COMPLETE, marks instance as COMPLETE</li>
060 *    <li>For instances that are COMPLETE, purges chunk data</li>
061 *    <li>For instances that are IN_PROGRESS where at least one chunk is FAILED, marks instance as FAILED and propagates the error message to the instance, and purges chunk data</li>
062 *    <li>For instances that are IN_PROGRESS with an error message set where no chunks are ERRORED or FAILED, clears the error message in the instance (meaning presumably there was an error but it cleared)</li>
063 *    <li>For instances that are IN_PROGRESS and isCancelled flag is set marks them as ERRORED and indicating the current running step if any</li>
064 *    <li>For instances that are COMPLETE or FAILED and are old, delete them entirely</li>
065 * </ul>
066 *      </p>
067 *
068 *      <p>
069 * Second, we check for any job instances where the job is configured to
070 * have gated execution. For these instances, we check if the current step
071 * is complete (all chunks are in COMPLETE status) and trigger the next step.
072 * </p>
073 *
074 * <p>
075 *    The maintenance pass is run once per minute.  However if a gated job is fast-tracking (i.e. every step produced
076 *    exactly one chunk, then the maintenance task will be triggered earlier than scheduled by the step executor.
077 * </p>
078 */
079public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasScheduledJobs {
080        static final Logger ourLog = Logs.getBatchTroubleshootingLog();
081
082        public static final int INSTANCES_PER_PASS = 100;
083        public static final String SCHEDULED_JOB_ID = JobMaintenanceScheduledJob.class.getName();
084        public static final int MAINTENANCE_TRIGGER_RUN_WITHOUT_SCHEDULER_TIMEOUT = 5;
085
086        private final IJobPersistence myJobPersistence;
087        private final ISchedulerService mySchedulerService;
088        private final JpaStorageSettings myStorageSettings;
089        private final JobDefinitionRegistry myJobDefinitionRegistry;
090        private final BatchJobSender myBatchJobSender;
091        private final WorkChunkProcessor myJobExecutorSvc;
092
093        private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1);
094
095        private long myScheduledJobFrequencyMillis = DateUtils.MILLIS_PER_MINUTE;
096        private Runnable myMaintenanceJobStartedCallback = () -> {};
097        private Runnable myMaintenanceJobFinishedCallback = () -> {};
098        private final IReductionStepExecutorService myReductionStepExecutorService;
099
100        private boolean myEnabledBool = true;
101
102        /**
103         * Constructor
104         */
105        public JobMaintenanceServiceImpl(
106                        @Nonnull ISchedulerService theSchedulerService,
107                        @Nonnull IJobPersistence theJobPersistence,
108                        JpaStorageSettings theStorageSettings,
109                        @Nonnull JobDefinitionRegistry theJobDefinitionRegistry,
110                        @Nonnull BatchJobSender theBatchJobSender,
111                        @Nonnull WorkChunkProcessor theExecutor,
112                        @Nonnull IReductionStepExecutorService theReductionStepExecutorService) {
113                myStorageSettings = theStorageSettings;
114                myReductionStepExecutorService = theReductionStepExecutorService;
115                Validate.notNull(theSchedulerService);
116                Validate.notNull(theJobPersistence);
117                Validate.notNull(theJobDefinitionRegistry);
118                Validate.notNull(theBatchJobSender);
119
120                myJobPersistence = theJobPersistence;
121                mySchedulerService = theSchedulerService;
122                myJobDefinitionRegistry = theJobDefinitionRegistry;
123                myBatchJobSender = theBatchJobSender;
124                myJobExecutorSvc = theExecutor;
125        }
126
127        @Override
128        public void scheduleJobs(ISchedulerService theSchedulerService) {
129                mySchedulerService.scheduleClusteredJob(myScheduledJobFrequencyMillis, buildJobDefinition());
130        }
131
132        @Nonnull
133        private ScheduledJobDefinition buildJobDefinition() {
134                ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
135                jobDefinition.setId(SCHEDULED_JOB_ID);
136                jobDefinition.setJobClass(JobMaintenanceScheduledJob.class);
137                return jobDefinition;
138        }
139
140        public void setScheduledJobFrequencyMillis(long theScheduledJobFrequencyMillis) {
141                myScheduledJobFrequencyMillis = theScheduledJobFrequencyMillis;
142        }
143
144        /**
145         * @return true if a request to run a maintance pass was submitted
146         */
147        @Override
148        public boolean triggerMaintenancePass() {
149                if (!myStorageSettings.isJobFastTrackingEnabled()) {
150                        return false;
151                }
152                if (mySchedulerService.isClusteredSchedulingEnabled()) {
153                        mySchedulerService.triggerClusteredJobImmediately(buildJobDefinition());
154                        return true;
155                } else {
156                        // We are probably running a unit test
157                        return runMaintenanceDirectlyWithTimeout();
158                }
159        }
160
161        private boolean runMaintenanceDirectlyWithTimeout() {
162                if (getQueueLength() > 0) {
163                        ourLog.debug(
164                                        "There are already {} threads waiting to run a maintenance pass.  Ignoring request.",
165                                        getQueueLength());
166                        return false;
167                }
168
169                try {
170                        ourLog.debug(
171                                        "There is no clustered scheduling service.  Requesting semaphore to run maintenance pass directly.");
172                        // Some unit test, esp. the Loinc terminology tests, depend on this maintenance pass being run shortly after
173                        // it is requested
174                        if (myRunMaintenanceSemaphore.tryAcquire(
175                                        MAINTENANCE_TRIGGER_RUN_WITHOUT_SCHEDULER_TIMEOUT, TimeUnit.MINUTES)) {
176                                ourLog.debug("Semaphore acquired.  Starting maintenance pass.");
177                                doMaintenancePass();
178                        }
179                        return true;
180                } catch (InterruptedException e) {
181                        throw new RuntimeException(Msg.code(2134) + "Timed out waiting to run a maintenance pass", e);
182                } finally {
183                        ourLog.debug("Maintenance pass complete.  Releasing semaphore.");
184                        myRunMaintenanceSemaphore.release();
185                }
186        }
187
188        @VisibleForTesting
189        int getQueueLength() {
190                return myRunMaintenanceSemaphore.getQueueLength();
191        }
192
193        @Override
194        @VisibleForTesting
195        public void forceMaintenancePass() {
196                // to simulate a long running job!
197                ourLog.info("Forcing a maintenance pass run; semaphore at {}", getQueueLength());
198                doMaintenancePass();
199        }
200
201        @Override
202        public void enableMaintenancePass(boolean theToEnable) {
203                myEnabledBool = theToEnable;
204        }
205
206        @Override
207        public void runMaintenancePass() {
208                if (!myEnabledBool) {
209                        ourLog.error("Maintenance job is disabled! This will affect all batch2 jobs!");
210                }
211
212                if (!myRunMaintenanceSemaphore.tryAcquire()) {
213                        ourLog.debug("Another maintenance pass is already in progress.  Ignoring request.");
214                        return;
215                }
216                try {
217                        ourLog.debug("Maintenance pass starting.");
218                        doMaintenancePass();
219                } catch (Exception e) {
220                        ourLog.error("Maintenance pass failed", e);
221                } finally {
222                        myRunMaintenanceSemaphore.release();
223                }
224        }
225
226        private void doMaintenancePass() {
227                myMaintenanceJobStartedCallback.run();
228                Set<String> processedInstanceIds = new HashSet<>();
229                JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
230                for (int page = 0; ; page++) {
231                        List<JobInstance> instances = myJobPersistence.fetchInstances(INSTANCES_PER_PASS, page);
232
233                        for (JobInstance instance : instances) {
234                                String instanceId = instance.getInstanceId();
235                                if (myJobDefinitionRegistry
236                                                .getJobDefinition(instance.getJobDefinitionId(), instance.getJobDefinitionVersion())
237                                                .isPresent()) {
238                                        if (processedInstanceIds.add(instanceId)) {
239                                                myJobDefinitionRegistry.setJobDefinition(instance);
240                                                JobInstanceProcessor jobInstanceProcessor = new JobInstanceProcessor(
241                                                                myJobPersistence,
242                                                                myBatchJobSender,
243                                                                instanceId,
244                                                                progressAccumulator,
245                                                                myReductionStepExecutorService,
246                                                                myJobDefinitionRegistry);
247                                                ourLog.debug(
248                                                                "Triggering maintenance process for instance {} in status {}",
249                                                                instanceId,
250                                                                instance.getStatus());
251                                                jobInstanceProcessor.process();
252                                        }
253                                } else {
254                                        ourLog.warn(
255                                                        "Job definition {} for instance {} is currently unavailable",
256                                                        instance.getJobDefinitionId(),
257                                                        instanceId);
258                                }
259                        }
260
261                        if (instances.size() < INSTANCES_PER_PASS) {
262                                break;
263                        }
264                }
265                myMaintenanceJobFinishedCallback.run();
266        }
267
268        public void setMaintenanceJobStartedCallback(Runnable theMaintenanceJobStartedCallback) {
269                myMaintenanceJobStartedCallback = theMaintenanceJobStartedCallback;
270        }
271
272        public void setMaintenanceJobFinishedCallback(Runnable theMaintenanceJobFinishedCallback) {
273                myMaintenanceJobFinishedCallback = theMaintenanceJobFinishedCallback;
274        }
275
276        public static class JobMaintenanceScheduledJob implements HapiJob {
277                @Autowired
278                private IJobMaintenanceService myTarget;
279
280                @Override
281                public void execute(JobExecutionContext theContext) {
282                        myTarget.runMaintenancePass();
283                }
284        }
285}