Package ca.uhn.fhir.batch2.api
Interface IWorkChunkPersistence
- All Known Subinterfaces:
IJobPersistence
public interface IWorkChunkPersistence
Work Chunk api, implementing the WorkChunk state machine.
Test specification is in
ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest.
Note on transaction boundaries: these are messy - some methods expect an existing transaction and are
marked with @Transactional(propagation = Propagation.MANDATORY), some will create a tx as needed
and are marked @Transactional(propagation = Propagation.REQUIRED), and some run in a NEW transaction
and are not marked on the interface, but on the implementors instead. We had a bug where interface
methods marked @Transactional(propagation = Propagation.REQUIRES_NEW) were starting two (2!)
transactions because of our synchronized wrapper.-
Method Summary
Modifier and TypeMethodDescriptionfetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) Fetch all chunks with data for a given instance for a given step idfetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) Fetch all chunks for a given instance.voidmarkWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg) Marks all work chunks with the provided status and erases the datavoidReport success and complete the chunk.onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) Stores a chunk of work for later retrieval.onWorkChunkDequeue(String theChunkId) On arrival at a worker.onWorkChunkError(WorkChunkErrorEvent theParameters) A retryable error.voidonWorkChunkFailed(String theChunkId, String theErrorMessage) An unrecoverable error.voidonWorkChunkPollDelay(String theChunkId, Date theNewDeadline) Updates the specified Work Chunk to set the next polling interval.
-
Method Details
-
onWorkChunkCreate
Stores a chunk of work for later retrieval. The first state event, as the chunk is created. This method should be atomic and should only return when the chunk has been successfully stored in the database. Chunk should be stored with a status ofWorkChunkStatusEnum.READYorWorkChunkStatusEnum.GATE_WAITINGfor ungated and gated jobs, respectively.- Parameters:
theBatchWorkChunk- the batch work chunk to be stored- Returns:
- a globally unique identifier for this chunk.
-
onWorkChunkDequeue
On arrival at a worker. The second state event, as the worker starts processing. Transition toWorkChunkStatusEnum.IN_PROGRESSif unless not in QUEUED or ERRORRED state.- Parameters:
theChunkId- The ID fromonWorkChunkCreate(ca.uhn.fhir.batch2.model.WorkChunkCreateEvent)- Returns:
- The WorkChunk or empty if no chunk exists, or not in a runnable state (QUEUED or ERRORRED)
-
onWorkChunkError
A retryable error. Transition toWorkChunkStatusEnum.ERROREDunless max-retries passed, then transition toWorkChunkStatusEnum.FAILED.- Parameters:
theParameters- - the error message and max retry count.- Returns:
- - the new status - ERRORED or ERRORED, depending on retry count
-
onWorkChunkPollDelay
Updates the specified Work Chunk to set the next polling interval. It wil also: * update the poll attempts * sets the workchunk status to POLL_WAITING (if it's not already in this state)- Parameters:
theChunkId- the id of the chunk to updatetheNewDeadline- the time when polling should be redone
-
onWorkChunkFailed
@Transactional(propagation=REQUIRED) void onWorkChunkFailed(String theChunkId, String theErrorMessage) An unrecoverable error. Transition toWorkChunkStatusEnum.FAILED- Parameters:
theChunkId- The chunk ID
-
onWorkChunkCompletion
Report success and complete the chunk. Transition toWorkChunkStatusEnum.COMPLETED- Parameters:
theEvent- with record and error count
-
markWorkChunksWithStatusAndWipeData
@Transactional(propagation=MANDATORY) void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg) Marks all work chunks with the provided status and erases the data- Parameters:
theInstanceId- - the instance idtheChunkIds- - the ids of work chunks being reduced to single chunktheStatus- - the status to marktheErrorMsg- - error message (if status warrants it)
-
fetchAllWorkChunksIterator
Fetch all chunks for a given instance.- Parameters:
theInstanceId- - instance idtheWithData- - whether to include the data - not needed for stats collection- Returns:
- - an iterator for fetching work chunks wipmb replace with a stream and a consumer in 6.8
-
fetchAllWorkChunksForStepStream
@Transactional(propagation=MANDATORY, readOnly=true) Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) Fetch all chunks with data for a given instance for a given step id- Returns:
- - a stream for fetching work chunks
-