001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.api;
021
022import ca.uhn.fhir.batch2.model.WorkChunk;
023import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
024import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
025import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
026import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
027import org.springframework.transaction.annotation.Propagation;
028import org.springframework.transaction.annotation.Transactional;
029
030import java.util.Date;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Optional;
034import java.util.stream.Stream;
035
036/**
037 * Work Chunk api, implementing the WorkChunk state machine.
038 * Test specification is in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}.
039 * Note on transaction boundaries:  these are messy - some methods expect an existing transaction and are
040 * marked with {@code @Transactional(propagation = Propagation.MANDATORY)}, some will create a tx as needed
041 * and are marked {@code @Transactional(propagation = Propagation.REQUIRED)}, and some run in a NEW transaction
042 * and are not marked on the interface, but on the implementors instead.  We had a bug where interface
043 * methods marked {@code @Transactional(propagation = Propagation.REQUIRES_NEW)} were starting two (2!)
044 * transactions because of our synchronized wrapper.
045 *
046 * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md
047 */
048public interface IWorkChunkPersistence {
049
050        //////////////////////////////////
051        // WorkChunk calls
052        //////////////////////////////////
053
054        /**
055         * Stores a chunk of work for later retrieval.
056         * The first state event, as the chunk is created.
057         * This method should be atomic and should only
058         * return when the chunk has been successfully stored in the database.
059         * Chunk should be stored with a status of {@link WorkChunkStatusEnum#READY} or
060         * {@link WorkChunkStatusEnum#GATE_WAITING} for ungated and gated jobs, respectively.
061         *
062         * @param theBatchWorkChunk the batch work chunk to be stored
063         * @return a globally unique identifier for this chunk.
064         */
065        String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk);
066
067        /**
068         * On arrival at a worker.
069         * The second state event, as the worker starts processing.
070         * Transition to {@link WorkChunkStatusEnum#IN_PROGRESS} if unless not in QUEUED or ERRORRED state.
071         *
072         * @param theChunkId The ID from {@link #onWorkChunkCreate}
073         * @return The WorkChunk or empty if no chunk exists, or not in a runnable state (QUEUED or ERRORRED)
074         */
075        @Transactional(propagation = Propagation.MANDATORY)
076        Optional<WorkChunk> onWorkChunkDequeue(String theChunkId);
077
078        /**
079         * A retryable error.
080         * Transition to {@link WorkChunkStatusEnum#ERRORED} unless max-retries passed, then
081         * transition to {@link WorkChunkStatusEnum#FAILED}.
082         *
083         * @param theParameters - the error message and max retry count.
084         * @return - the new status - ERRORED or ERRORED, depending on retry count
085         */
086        // on impl - @Transactional(propagation = Propagation.REQUIRES_NEW)
087        WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters);
088
089        /**
090         * Updates the specified Work Chunk to set the next polling interval.
091         * It wil also:
092         * * update the poll attempts
093         * * sets the workchunk status to POLL_WAITING (if it's not already in this state)
094         * @param theChunkId the id of the chunk to update
095         * @param theNewDeadline the time when polling should be redone
096         */
097        @Transactional
098        void onWorkChunkPollDelay(String theChunkId, Date theNewDeadline);
099
100        /**
101         * An unrecoverable error.
102         * Transition to {@link WorkChunkStatusEnum#FAILED}
103         *
104         * @param theChunkId The chunk ID
105         */
106        @Transactional(propagation = Propagation.REQUIRED)
107        void onWorkChunkFailed(String theChunkId, String theErrorMessage);
108
109        /**
110         * Report success and complete the chunk.
111         * Transition to {@link WorkChunkStatusEnum#COMPLETED}
112         *
113         * @param theEvent with record and error count
114         */
115        @Transactional(propagation = Propagation.REQUIRED)
116        void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent);
117
118        /**
119         * Marks all work chunks with the provided status and erases the data
120         *
121         * @param theInstanceId - the instance id
122         * @param theChunkIds   - the ids of work chunks being reduced to single chunk
123         * @param theStatus     - the status to mark
124         * @param theErrorMsg   - error message (if status warrants it)
125         */
126        @Transactional(propagation = Propagation.MANDATORY)
127        void markWorkChunksWithStatusAndWipeData(
128                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg);
129
130        /**
131         * Fetch all chunks for a given instance.
132         * @param theInstanceId - instance id
133         * @param theWithData - whether to include the data - not needed for stats collection
134         * @return - an iterator for fetching work chunks
135         * wipmb replace with a stream and a consumer in 6.8
136         */
137        Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData);
138
139        /**
140         * Fetch all chunks with data for a given instance for a given step id
141         *
142         * @return - a stream for fetching work chunks
143         */
144        @Transactional(propagation = Propagation.MANDATORY, readOnly = true)
145        Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId);
146}