001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.api; 021 022import ca.uhn.fhir.batch2.model.WorkChunk; 023import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 024import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 025import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 026import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 027import org.springframework.transaction.annotation.Propagation; 028import org.springframework.transaction.annotation.Transactional; 029 030import java.util.Date; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Optional; 034import java.util.stream.Stream; 035 036/** 037 * Work Chunk api, implementing the WorkChunk state machine. 038 * Test specification is in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest}. 039 * Note on transaction boundaries: these are messy - some methods expect an existing transaction and are 040 * marked with {@code @Transactional(propagation = Propagation.MANDATORY)}, some will create a tx as needed 041 * and are marked {@code @Transactional(propagation = Propagation.REQUIRED)}, and some run in a NEW transaction 042 * and are not marked on the interface, but on the implementors instead. We had a bug where interface 043 * methods marked {@code @Transactional(propagation = Propagation.REQUIRES_NEW)} were starting two (2!) 044 * transactions because of our synchronized wrapper. 045 * 046 * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md 047 */ 048public interface IWorkChunkPersistence { 049 050 ////////////////////////////////// 051 // WorkChunk calls 052 ////////////////////////////////// 053 054 /** 055 * Stores a chunk of work for later retrieval. 056 * The first state event, as the chunk is created. 057 * This method should be atomic and should only 058 * return when the chunk has been successfully stored in the database. 059 * Chunk should be stored with a status of {@link WorkChunkStatusEnum#READY} or 060 * {@link WorkChunkStatusEnum#GATE_WAITING} for ungated and gated jobs, respectively. 061 * 062 * @param theBatchWorkChunk the batch work chunk to be stored 063 * @return a globally unique identifier for this chunk. 064 */ 065 String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk); 066 067 /** 068 * On arrival at a worker. 069 * The second state event, as the worker starts processing. 070 * Transition to {@link WorkChunkStatusEnum#IN_PROGRESS} if unless not in QUEUED or ERRORRED state. 071 * 072 * @param theChunkId The ID from {@link #onWorkChunkCreate} 073 * @return The WorkChunk or empty if no chunk exists, or not in a runnable state (QUEUED or ERRORRED) 074 */ 075 @Transactional(propagation = Propagation.MANDATORY) 076 Optional<WorkChunk> onWorkChunkDequeue(String theChunkId); 077 078 /** 079 * A retryable error. 080 * Transition to {@link WorkChunkStatusEnum#ERRORED} unless max-retries passed, then 081 * transition to {@link WorkChunkStatusEnum#FAILED}. 082 * 083 * @param theParameters - the error message and max retry count. 084 * @return - the new status - ERRORED or ERRORED, depending on retry count 085 */ 086 // on impl - @Transactional(propagation = Propagation.REQUIRES_NEW) 087 WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters); 088 089 /** 090 * Updates the specified Work Chunk to set the next polling interval. 091 * It wil also: 092 * * update the poll attempts 093 * * sets the workchunk status to POLL_WAITING (if it's not already in this state) 094 * @param theChunkId the id of the chunk to update 095 * @param theNewDeadline the time when polling should be redone 096 */ 097 @Transactional 098 void onWorkChunkPollDelay(String theChunkId, Date theNewDeadline); 099 100 /** 101 * An unrecoverable error. 102 * Transition to {@link WorkChunkStatusEnum#FAILED} 103 * 104 * @param theChunkId The chunk ID 105 */ 106 @Transactional(propagation = Propagation.REQUIRED) 107 void onWorkChunkFailed(String theChunkId, String theErrorMessage); 108 109 /** 110 * Report success and complete the chunk. 111 * Transition to {@link WorkChunkStatusEnum#COMPLETED} 112 * 113 * @param theEvent with record and error count 114 */ 115 @Transactional(propagation = Propagation.REQUIRED) 116 void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent); 117 118 /** 119 * Marks all work chunks with the provided status and erases the data 120 * 121 * @param theInstanceId - the instance id 122 * @param theChunkIds - the ids of work chunks being reduced to single chunk 123 * @param theStatus - the status to mark 124 * @param theErrorMsg - error message (if status warrants it) 125 */ 126 @Transactional(propagation = Propagation.MANDATORY) 127 void markWorkChunksWithStatusAndWipeData( 128 String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMsg); 129 130 /** 131 * Fetch all chunks for a given instance. 132 * @param theInstanceId - instance id 133 * @param theWithData - whether to include the data - not needed for stats collection 134 * @return - an iterator for fetching work chunks 135 * wipmb replace with a stream and a consumer in 6.8 136 */ 137 Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData); 138 139 /** 140 * Fetch all chunks with data for a given instance for a given step id 141 * 142 * @return - a stream for fetching work chunks 143 */ 144 @Transactional(propagation = Propagation.MANDATORY, readOnly = true) 145 Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId); 146}