001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.api;
021
022import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
023import ca.uhn.fhir.batch2.model.JobDefinition;
024import ca.uhn.fhir.batch2.model.JobInstance;
025import ca.uhn.fhir.batch2.model.StatusEnum;
026import ca.uhn.fhir.batch2.model.WorkChunk;
027import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
028import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
029import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
030import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
031import com.google.common.annotations.VisibleForTesting;
032import jakarta.annotation.Nonnull;
033import org.apache.commons.lang3.builder.ToStringBuilder;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.springframework.data.domain.Page;
037import org.springframework.data.domain.Pageable;
038import org.springframework.transaction.annotation.Propagation;
039import org.springframework.transaction.annotation.Transactional;
040
041import java.util.Collections;
042import java.util.Date;
043import java.util.Iterator;
044import java.util.List;
045import java.util.Optional;
046import java.util.Set;
047import java.util.function.Consumer;
048import java.util.stream.Stream;
049
050/**
051 *
052 * Some of this is tested in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}
053 * This is a transactional interface, but we have pushed the declaration of calls that have
054 * {@code @Transactional(propagation = Propagation.REQUIRES_NEW)} down to the implementations since we have a synchronized
055 * wrapper that was double-creating the NEW transaction.
056 */
057// wipmb For 6.8 - regularize the tx boundary.  Probably make them all MANDATORY
058public interface IJobPersistence extends IWorkChunkPersistence {
059        Logger ourLog = LoggerFactory.getLogger(IJobPersistence.class);
060
061        /**
062         * Store a new job instance. This will be called when a new job instance is being kicked off.
063         *
064         * @param theInstance The details
065         */
066        @Transactional(propagation = Propagation.REQUIRED)
067        String storeNewInstance(JobInstance theInstance);
068
069        /**
070         * Fetch an instance
071         *
072         * @param theInstanceId The instance ID
073         */
074        Optional<JobInstance> fetchInstance(String theInstanceId);
075
076        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
077        List<JobInstance> fetchInstances(
078                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable);
079
080        /**
081         * Fetches any existing jobs matching provided request parameters
082         *
083         */
084        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
085        List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize);
086
087        /**
088         * Fetch all instances
089         */
090        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
091        List<JobInstance> fetchInstances(int thePageSize, int thePageIndex);
092
093        @Transactional(propagation = Propagation.REQUIRES_NEW)
094        void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback);
095
096        /**
097         * Updates all Work Chunks in POLL_WAITING if their nextPollTime <= now
098         * for the given Job Instance.
099         * @param theInstanceId the instance id
100         * @return the number of updated chunks
101         */
102        @Transactional
103        int updatePollWaitingChunksForJobIfReady(String theInstanceId);
104
105        /**
106         * Fetch instances ordered by myCreateTime DESC
107         */
108        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
109        List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex);
110
111        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
112        List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
113                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex);
114
115        /**
116         * Fetch all job instances for a given job definition id
117         *
118         */
119        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
120        List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart);
121
122        /**
123         * Fetches all job instances based on the JobFetchRequest
124         *
125         * @param theRequest - the job fetch request
126         * @return - a page of job instances
127         */
128        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
129        Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest);
130
131        /**
132         * Returns set of all distinct states for the specified job instance id
133         * and step id.
134         */
135        @Transactional
136        Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(String theInstanceId, String theCurrentStepId);
137
138        /**
139         * Fetch all chunks for a given instance.
140         *
141         * @param theInstanceId - instance id
142         * @param theWithData - whether or not to include the data
143         * @return - an iterator for fetching work chunks
144         */
145        Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
146
147        /**
148         * Fetch all chunks with data for a given instance for a given step id - read-only.
149         *
150         * @return - a stream for fetching work chunks
151         */
152        Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
153
154        /**
155         * Fetches an iterator that retrieves WorkChunkMetadata from the db.
156         * @param theInstanceId instance id of job of interest
157         * @param theStates states of interset
158         * @return an iterator for the workchunks
159         */
160        @Transactional(propagation = Propagation.SUPPORTS)
161        Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
162                        Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates);
163
164        /**
165         * Callback to update a JobInstance within a locked transaction.
166         * Return true from the callback if the record write should continue, or false if
167         * the change should be discarded.
168         */
169        interface JobInstanceUpdateCallback {
170                /**
171                 * Modify theInstance within a write-lock transaction.
172                 * @param theInstance a copy of the instance to modify.
173                 * @return true if the change to theInstance should be written back to the db.
174                 */
175                boolean doUpdate(JobInstance theInstance);
176        }
177
178        /**
179         * Brute-force hack for now to create a tx boundary - takes a write-lock on the instance
180         * while the theModifier runs.
181         * Keep the callback short to keep the lock-time short.
182         * If the status is changing, use {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater}
183         *      inside theModifier to ensure state-change callbacks are invoked properly.
184         *
185         * @param theInstanceId the id of the instance to modify
186         * @param theModifier a hook to modify the instance - return true to finish the record write
187         * @return true if the instance was modified
188         */
189        // wipmb For 6.8 - consider changing callers to actual objects we can unit test
190        @Transactional
191        boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier);
192
193        /**
194         * Deletes the instance and all associated work chunks
195         *
196         * @param theInstanceId The instance ID
197         */
198        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
199        void deleteInstanceAndChunks(String theInstanceId);
200
201        /**
202         * Deletes all work chunks associated with the instance
203         *
204         * @param theInstanceId The instance ID
205         */
206        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
207        void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId);
208
209        @Transactional(propagation = Propagation.MANDATORY)
210        boolean markInstanceAsStatusWhenStatusIn(
211                        String theInstance, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates);
212
213        /**
214         * Marks an instance as cancelled
215         *
216         * @param theInstanceId The instance ID
217         */
218        // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW)
219        JobOperationResultJson cancelInstance(String theInstanceId);
220
221        @Transactional(propagation = Propagation.MANDATORY)
222        void updateInstanceUpdateTime(String theInstanceId);
223
224        /*
225         * State transition events for job instances.
226         * These cause the transitions along {@link ca.uhn.fhir.batch2.model.StatusEnum}
227         *
228         * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
229         */
230
231        class CreateResult {
232                public final String jobInstanceId;
233                public final String workChunkId;
234
235                public CreateResult(String theJobInstanceId, String theWorkChunkId) {
236                        jobInstanceId = theJobInstanceId;
237                        workChunkId = theWorkChunkId;
238                }
239
240                @Override
241                public String toString() {
242                        return new ToStringBuilder(this)
243                                        .append("jobInstanceId", jobInstanceId)
244                                        .append("workChunkId", workChunkId)
245                                        .toString();
246                }
247        }
248
249        /**
250         * Create the job, and it's first chunk.
251         *
252         * We create the chunk atomically with the job so that we never have a state with
253         * zero unfinished chunks left until the job is complete.  Makes the maintenance run simpler.
254         *
255         * @param theJobDefinition what kind of job
256         * @param theParameters params for the job
257         * @return the ids of the instance and first chunk
258         */
259        @Nonnull
260        @Transactional(propagation = Propagation.MANDATORY)
261        default CreateResult onCreateWithFirstChunk(JobDefinition<?> theJobDefinition, String theParameters) {
262                JobInstance instance = JobInstance.fromJobDefinition(theJobDefinition);
263                instance.setParameters(theParameters);
264                instance.setStatus(StatusEnum.QUEUED);
265
266                String instanceId = storeNewInstance(instance);
267                ourLog.info(
268                                "Stored new {} job {} with status {}",
269                                theJobDefinition.getJobDefinitionId(),
270                                instanceId,
271                                instance.getStatus());
272                ourLog.debug("Job parameters: {}", instance.getParameters());
273
274                WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(theJobDefinition, instanceId);
275                String chunkId = onWorkChunkCreate(batchWorkChunk);
276                return new CreateResult(instanceId, chunkId);
277        }
278
279        /**
280         * Move from QUEUED->IN_PROGRESS when a work chunk arrives.
281         * Ignore other prior states.
282         * @return did the transition happen
283         */
284        @Transactional(propagation = Propagation.MANDATORY)
285        default boolean onChunkDequeued(String theJobInstanceId) {
286                return markInstanceAsStatusWhenStatusIn(
287                                theJobInstanceId, StatusEnum.IN_PROGRESS, Collections.singleton(StatusEnum.QUEUED));
288        }
289
290        @VisibleForTesting
291        WorkChunk createWorkChunk(WorkChunk theWorkChunk);
292
293        /**
294         * Atomically advance the given job to the given step and change the status of all QUEUED and GATE_WAITING chunks
295         * in the next step to READY
296         * @param theJobInstanceId the id of the job instance to be updated
297         * @param theNextStepId the id of the next job step
298         * @return whether any changes were made
299         */
300        @Transactional(propagation = Propagation.REQUIRES_NEW)
301        boolean advanceJobStepAndUpdateChunkStatus(
302                        String theJobInstanceId, String theNextStepId, boolean theIsReductionStepBoolean);
303}