001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.api; 021 022import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 023import ca.uhn.fhir.batch2.model.JobDefinition; 024import ca.uhn.fhir.batch2.model.JobInstance; 025import ca.uhn.fhir.batch2.model.StatusEnum; 026import ca.uhn.fhir.batch2.model.WorkChunk; 027import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 028import ca.uhn.fhir.batch2.model.WorkChunkMetadata; 029import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 030import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 031import com.google.common.annotations.VisibleForTesting; 032import jakarta.annotation.Nonnull; 033import org.apache.commons.lang3.builder.ToStringBuilder; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.springframework.data.domain.Page; 037import org.springframework.data.domain.Pageable; 038import org.springframework.transaction.annotation.Propagation; 039import org.springframework.transaction.annotation.Transactional; 040 041import java.util.Collections; 042import java.util.Date; 043import java.util.Iterator; 044import java.util.List; 045import java.util.Optional; 046import java.util.Set; 047import java.util.function.Consumer; 048import java.util.stream.Stream; 049 050/** 051 * 052 * Some of this is tested in {@link ca.uhn.hapi.fhir.batch2.test.AbstractIJobPersistenceSpecificationTest} 053 * This is a transactional interface, but we have pushed the declaration of calls that have 054 * {@code @Transactional(propagation = Propagation.REQUIRES_NEW)} down to the implementations since we have a synchronized 055 * wrapper that was double-creating the NEW transaction. 056 */ 057// wipmb For 6.8 - regularize the tx boundary. Probably make them all MANDATORY 058public interface IJobPersistence extends IWorkChunkPersistence { 059 Logger ourLog = LoggerFactory.getLogger(IJobPersistence.class); 060 061 /** 062 * Store a new job instance. This will be called when a new job instance is being kicked off. 063 * 064 * @param theInstance The details 065 */ 066 @Transactional(propagation = Propagation.REQUIRED) 067 String storeNewInstance(JobInstance theInstance); 068 069 /** 070 * Fetch an instance 071 * 072 * @param theInstanceId The instance ID 073 */ 074 Optional<JobInstance> fetchInstance(String theInstanceId); 075 076 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 077 List<JobInstance> fetchInstances( 078 String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable); 079 080 /** 081 * Fetches any existing jobs matching provided request parameters 082 * 083 */ 084 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 085 List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int theStart, int theBatchSize); 086 087 /** 088 * Fetch all instances 089 */ 090 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 091 List<JobInstance> fetchInstances(int thePageSize, int thePageIndex); 092 093 @Transactional(propagation = Propagation.REQUIRES_NEW) 094 void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback); 095 096 /** 097 * Updates all Work Chunks in POLL_WAITING if their nextPollTime <= now 098 * for the given Job Instance. 099 * @param theInstanceId the instance id 100 * @return the number of updated chunks 101 */ 102 @Transactional 103 int updatePollWaitingChunksForJobIfReady(String theInstanceId); 104 105 /** 106 * Fetch instances ordered by myCreateTime DESC 107 */ 108 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 109 List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex); 110 111 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 112 List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus( 113 String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex); 114 115 /** 116 * Fetch all job instances for a given job definition id 117 * 118 */ 119 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 120 List<JobInstance> fetchInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart); 121 122 /** 123 * Fetches all job instances based on the JobFetchRequest 124 * 125 * @param theRequest - the job fetch request 126 * @return - a page of job instances 127 */ 128 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 129 Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest); 130 131 /** 132 * Returns set of all distinct states for the specified job instance id 133 * and step id. 134 */ 135 @Transactional 136 Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(String theInstanceId, String theCurrentStepId); 137 138 /** 139 * Fetch all chunks for a given instance. 140 * 141 * @param theInstanceId - instance id 142 * @param theWithData - whether or not to include the data 143 * @return - an iterator for fetching work chunks 144 */ 145 Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData); 146 147 /** 148 * Fetch all chunks with data for a given instance for a given step id - read-only. 149 * 150 * @return - a stream for fetching work chunks 151 */ 152 Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId); 153 154 /** 155 * Fetches an iterator that retrieves WorkChunkMetadata from the db. 156 * @param theInstanceId instance id of job of interest 157 * @param theStates states of interset 158 * @return an iterator for the workchunks 159 */ 160 @Transactional(propagation = Propagation.SUPPORTS) 161 Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates( 162 Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates); 163 164 /** 165 * Callback to update a JobInstance within a locked transaction. 166 * Return true from the callback if the record write should continue, or false if 167 * the change should be discarded. 168 */ 169 interface JobInstanceUpdateCallback { 170 /** 171 * Modify theInstance within a write-lock transaction. 172 * @param theInstance a copy of the instance to modify. 173 * @return true if the change to theInstance should be written back to the db. 174 */ 175 boolean doUpdate(JobInstance theInstance); 176 } 177 178 /** 179 * Brute-force hack for now to create a tx boundary - takes a write-lock on the instance 180 * while the theModifier runs. 181 * Keep the callback short to keep the lock-time short. 182 * If the status is changing, use {@link ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater} 183 * inside theModifier to ensure state-change callbacks are invoked properly. 184 * 185 * @param theInstanceId the id of the instance to modify 186 * @param theModifier a hook to modify the instance - return true to finish the record write 187 * @return true if the instance was modified 188 */ 189 // wipmb For 6.8 - consider changing callers to actual objects we can unit test 190 @Transactional 191 boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier); 192 193 /** 194 * Deletes the instance and all associated work chunks 195 * 196 * @param theInstanceId The instance ID 197 */ 198 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 199 void deleteInstanceAndChunks(String theInstanceId); 200 201 /** 202 * Deletes all work chunks associated with the instance 203 * 204 * @param theInstanceId The instance ID 205 */ 206 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 207 void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId); 208 209 @Transactional(propagation = Propagation.MANDATORY) 210 boolean markInstanceAsStatusWhenStatusIn( 211 String theInstance, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates); 212 213 /** 214 * Marks an instance as cancelled 215 * 216 * @param theInstanceId The instance ID 217 */ 218 // on implementations @Transactional(propagation = Propagation.REQUIRES_NEW) 219 JobOperationResultJson cancelInstance(String theInstanceId); 220 221 @Transactional(propagation = Propagation.MANDATORY) 222 void updateInstanceUpdateTime(String theInstanceId); 223 224 /* 225 * State transition events for job instances. 226 * These cause the transitions along {@link ca.uhn.fhir.batch2.model.StatusEnum} 227 * 228 * @see hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/docs/server_jpa_batch/batch2_states.md 229 */ 230 231 class CreateResult { 232 public final String jobInstanceId; 233 public final String workChunkId; 234 235 public CreateResult(String theJobInstanceId, String theWorkChunkId) { 236 jobInstanceId = theJobInstanceId; 237 workChunkId = theWorkChunkId; 238 } 239 240 @Override 241 public String toString() { 242 return new ToStringBuilder(this) 243 .append("jobInstanceId", jobInstanceId) 244 .append("workChunkId", workChunkId) 245 .toString(); 246 } 247 } 248 249 /** 250 * Create the job, and it's first chunk. 251 * 252 * We create the chunk atomically with the job so that we never have a state with 253 * zero unfinished chunks left until the job is complete. Makes the maintenance run simpler. 254 * 255 * @param theJobDefinition what kind of job 256 * @param theParameters params for the job 257 * @return the ids of the instance and first chunk 258 */ 259 @Nonnull 260 @Transactional(propagation = Propagation.MANDATORY) 261 default CreateResult onCreateWithFirstChunk(JobDefinition<?> theJobDefinition, String theParameters) { 262 JobInstance instance = JobInstance.fromJobDefinition(theJobDefinition); 263 instance.setParameters(theParameters); 264 instance.setStatus(StatusEnum.QUEUED); 265 266 String instanceId = storeNewInstance(instance); 267 ourLog.info( 268 "Stored new {} job {} with status {}", 269 theJobDefinition.getJobDefinitionId(), 270 instanceId, 271 instance.getStatus()); 272 ourLog.debug("Job parameters: {}", instance.getParameters()); 273 274 WorkChunkCreateEvent batchWorkChunk = WorkChunkCreateEvent.firstChunk(theJobDefinition, instanceId); 275 String chunkId = onWorkChunkCreate(batchWorkChunk); 276 return new CreateResult(instanceId, chunkId); 277 } 278 279 /** 280 * Move from QUEUED->IN_PROGRESS when a work chunk arrives. 281 * Ignore other prior states. 282 * @return did the transition happen 283 */ 284 @Transactional(propagation = Propagation.MANDATORY) 285 default boolean onChunkDequeued(String theJobInstanceId) { 286 return markInstanceAsStatusWhenStatusIn( 287 theJobInstanceId, StatusEnum.IN_PROGRESS, Collections.singleton(StatusEnum.QUEUED)); 288 } 289 290 @VisibleForTesting 291 WorkChunk createWorkChunk(WorkChunk theWorkChunk); 292 293 /** 294 * Atomically advance the given job to the given step and change the status of all QUEUED and GATE_WAITING chunks 295 * in the next step to READY 296 * @param theJobInstanceId the id of the job instance to be updated 297 * @param theNextStepId the id of the next job step 298 * @return whether any changes were made 299 */ 300 @Transactional(propagation = Propagation.REQUIRES_NEW) 301 boolean advanceJobStepAndUpdateChunkStatus( 302 String theJobInstanceId, String theNextStepId, boolean theIsReductionStepBoolean); 303}