Interface IWorkChunkPersistence

All Known Subinterfaces:
IJobPersistence

public interface IWorkChunkPersistence
Work Chunk api, implementing the WorkChunk state machine. Test specification is in ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest. Note on transaction boundaries: these are messy - some methods expect an existing transaction and are marked with @Transactional(propagation = Propagation.MANDATORY), some will create a tx as needed and are marked @Transactional(propagation = Propagation.REQUIRED), and some run in a NEW transaction and are not marked on the interface, but on the implementors instead. We had a bug where interface methods marked @Transactional(propagation = Propagation.REQUIRES_NEW) were starting two (2!) transactions because of our synchronized wrapper.
  • Method Details

    • onWorkChunkCreate

      Stores a chunk of work for later retrieval. The first state event, as the chunk is created. This method should be atomic and should only return when the chunk has been successfully stored in the database. Chunk should be stored with a status of WorkChunkStatusEnum.READY or WorkChunkStatusEnum.GATE_WAITING for ungated and gated jobs, respectively.
      Parameters:
      theBatchWorkChunk - the batch work chunk to be stored
      Returns:
      a globally unique identifier for this chunk.
    • onWorkChunkDequeue

      @Transactional(propagation=MANDATORY) Optional<WorkChunk> onWorkChunkDequeue(String theChunkId)
      On arrival at a worker. The second state event, as the worker starts processing. Transition to WorkChunkStatusEnum.IN_PROGRESS if unless not in QUEUED or ERRORRED state.
      Parameters:
      theChunkId - The ID from onWorkChunkCreate(ca.uhn.fhir.batch2.model.WorkChunkCreateEvent)
      Returns:
      The WorkChunk or empty if no chunk exists, or not in a runnable state (QUEUED or ERRORRED)
    • onWorkChunkError

      A retryable error. Transition to WorkChunkStatusEnum.ERRORED unless max-retries passed, then transition to WorkChunkStatusEnum.FAILED.
      Parameters:
      theParameters - - the error message and max retry count.
      Returns:
      - the new status - ERRORED or ERRORED, depending on retry count
    • onWorkChunkPollDelay

      @Transactional void onWorkChunkPollDelay(String theChunkId, Date theNewDeadline)
      Updates the specified Work Chunk to set the next polling interval. It wil also: * update the poll attempts * sets the workchunk status to POLL_WAITING (if it's not already in this state)
      Parameters:
      theChunkId - the id of the chunk to update
      theNewDeadline - the time when polling should be redone
    • onWorkChunkFailed

      @Transactional(propagation=REQUIRED) void onWorkChunkFailed(String theChunkId, String theErrorMessage)
      An unrecoverable error. Transition to WorkChunkStatusEnum.FAILED
      Parameters:
      theChunkId - The chunk ID
    • onWorkChunkCompletion

      @Transactional(propagation=REQUIRED) void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent)
      Report success and complete the chunk. Transition to WorkChunkStatusEnum.COMPLETED
      Parameters:
      theEvent - with record and error count
    • markWorkChunksWithStatusAndWipeData

      @Transactional(propagation=MANDATORY) void markWorkChunksWithStatusAndWipeData(String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg)
      Marks all work chunks with the provided status and erases the data
      Parameters:
      theInstanceId - - the instance id
      theChunkIds - - the ids of work chunks being reduced to single chunk
      theStatus - - the status to mark
      theErrorMsg - - error message (if status warrants it)
    • fetchAllWorkChunksIterator

      Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData)
      Fetch all chunks for a given instance.
      Parameters:
      theInstanceId - - instance id
      theWithData - - whether to include the data - not needed for stats collection
      Returns:
      - an iterator for fetching work chunks wipmb replace with a stream and a consumer in 6.8
    • fetchAllWorkChunksForStepStream

      @Transactional(propagation=MANDATORY, readOnly=true) Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId)
      Fetch all chunks with data for a given instance for a given step id
      Returns:
      - a stream for fetching work chunks