Package ca.uhn.fhir.batch2
A running job corresponds to a JobInstance.
Jobs are modeled as a sequence of steps, operating on WorkChunks
containing json data. The first step is special -- it is empty, and the data is assumed to be the job parameters.
A JobDefinition defines the sequence of JobDefinitionSteps.
Each step defines the input chunk type, the output chunk type, and a procedure that receives the input and emits 0 or more outputs.
We have a special kind of final step called a reducer, which corresponds to the stream Collector concept.
Job instances and work chunks are stored in the database. Work is distributed to workers via queues.
The queue message is just the ids of the chunk (chunk id, step id, instance id, job definition id, etc.).
The worker receives the notification from Spring Messaging (MessageHandler.handleMessage(org.springframework.messaging.Message<?>)),
fetches the data from the store and processes the data using the handler defined in for the step.
The worker updates chunk state as appropriate. It may also update the job instance state.
Once a minute, Quartz runs the maintenance pass.
This loop inspects every job, and dispatches to the JobInstanceProcessor.
The JobInstanceProcessor counts the outstanding chunks for a job, and uses these statistics to fire the working state transitions (below).
Job and chunk processing follow state machines described {@link hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md}
Chunks have a simple state system with states
READY, QUEUED, IN_PROGRESS, ERRORED, FAILED, COMPLETED.
The initial state is READY, and the final states are FAILED, and COMPLETED.
There are 2 primary systems in play during Batch2 Jobs. A Maintenance Job and the Batch2 Job Notification topic.
- Moves POLL_WAITING work chunks to READY if their nextPollTime has expired.
- Moves READY work chunks to QUEUE and publishes it to the Batch2 Notification topic
- Calculates job progress (% of workchunks in complete status).
- If the job is finished, purges any left over work chunks.
- Cleans up any complete, failed, or cancelled jobs.
- Moves any gated jobs onto their next step.
- If the final step of a (gated) job is a reduction step, a reduction step execution will be triggered.
- Change the work chunk from QUEUED to IN_PROGRESS
- Change the Job Instance status from QUEUED to IN_PROGRESS
- If the Job Instance is cancelled, change the status to CANCELLED and abort processing
- If the step creates new work chunks, each work chunk will be created in the READY state
- If the step succeeds, the work chunk status is changed from IN_PROGRESS to COMPLETE
- If the step throws a RetryChunkLaterException, the work chunk status is changed from IN_PROGRESS to POLL_WAITING and a nextPollTime value set.
- If the step fails, the work chunk status is changed from IN_PROGRESS to either ERRORED or FAILED depending on the severity of the error
- Chunks are created READY (NB - should be READY or WAITING) and notification is posted to the channel for non-gated steps.
-
Workers receive a notification and advance QUEUED->IN_PROGRESS.
IWorkChunkPersistence.onWorkChunkDequeue(String) -
On normal execution, the chunk advances IN_PROGRESS->COMPLETED
IWorkChunkPersistence.onWorkChunkCompletion(ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent) - On a retryiable error, IN_PROGRESS->ERROR with an error message and the chunk is put back on the queue.
IWorkChunkPersistence.onWorkChunkError(ca.uhn.fhir.batch2.model.WorkChunkErrorEvent) - On a RetryChunkLaterException, IN_PROGRESS->POLL_WAITING with a nextPollTime set. The chunk is *not* put back on the queue, but is left for the maintenance job to manage.
- On a hard failure, or too many errors, IN_PROGRESS->FAILED with the error message.
IWorkChunkPersistence.onWorkChunkFailed(java.lang.String, java.lang.String)
Jobs have a state machine with states:
QUEUED, IN_PROGRESS, ERRORED, COMPLETED, FINALIZE, and FAILED.
ERRORED is a near synonym for IN_PROGRESS and shows that a chunk has shown a transient error during this job.
Hard failures move to final state FAILED.
The initial state is QUEUED, and the terminal states are COMPLETED, CANCELLED, and FAILED.
Most transitions happen during the maintenance run, but some are triggered by the worker.
- Jobs are created in QUEUED state, along with their first chunk. The chunk is also sent to the channel.
JobCoordinatorImpl.startInstance(ca.uhn.fhir.rest.api.server.RequestDetails, ca.uhn.fhir.batch2.model.JobInstanceStartRequest) - When workers dequeue a chunk, they trigger a QUEUED->IN_PROGRESS transition to report status.
WorkChannelMessageHandler.MessageProcess.updateAndValidateJobStatus() - As a special case, if the first chunk produces no children, the job advances IN_PROGRESS->COMPLETE
JobStepExecutor.executeStep() - Other transitions happen during maintenance runs. If a job is running, and the user has requested cancellation, the job transitions (IN_PROGRESS or ERRORED) -> CANCELLED.
- Then the processor looks at chunk statuses. If any chunks are FAILED, then the job moves
(IN_PROGRESS or ERRORED) -> FAILED.
InstanceProgress.calculateNewStatus(boolean) - If any chunk is currently in
ERROREDstate, the job progresses IN_PROGRESS->ERRORED, and the error message is copied over. - If all chunks are COMPLETED, then the job moves (IN_PROGRESS or ERRORED) -> COMPLETED.
- Gated jobs that have a reducer step will transtion (IN_PROGRESS or ERRORED) -> FINALIZE when
starting the reduction step
JobInstanceProcessor.triggerGatedExecutions(ca.uhn.fhir.batch2.model.JobInstance, ca.uhn.fhir.batch2.model.JobDefinition<?>)
- If the maintenance job is killed while sending notifications about a gated step advance, remaining chunks will never be notified. A READY state before QUEUED would catch this. A WAITING state for gated chunks will simplify that handling.
- A running reduction step will not restart if the server is killed.