001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.batch2; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.JobOperationResultJson; 024import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 025import ca.uhn.fhir.batch2.model.JobInstance; 026import ca.uhn.fhir.batch2.model.StatusEnum; 027import ca.uhn.fhir.batch2.model.WorkChunk; 028import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 029import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 031import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 032import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 033import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; 034import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; 035import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 036import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; 037import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; 038import ca.uhn.fhir.model.api.PagingIterator; 039import ca.uhn.fhir.util.Batch2JobDefinitionConstants; 040import ca.uhn.fhir.util.Logs; 041import com.fasterxml.jackson.core.JsonParser; 042import com.fasterxml.jackson.databind.JsonNode; 043import com.fasterxml.jackson.databind.ObjectMapper; 044import com.fasterxml.jackson.databind.node.ObjectNode; 045import org.apache.commons.collections4.ListUtils; 046import org.apache.commons.lang3.Validate; 047import org.slf4j.Logger; 048import org.springframework.data.domain.Page; 049import org.springframework.data.domain.PageImpl; 050import org.springframework.data.domain.PageRequest; 051import org.springframework.data.domain.Pageable; 052import org.springframework.data.domain.Sort; 053import org.springframework.transaction.annotation.Propagation; 054import org.springframework.transaction.annotation.Transactional; 055import org.springframework.transaction.support.TransactionSynchronizationManager; 056 057import java.util.Date; 058import java.util.Iterator; 059import java.util.List; 060import java.util.Objects; 061import java.util.Optional; 062import java.util.Set; 063import java.util.UUID; 064import java.util.function.Consumer; 065import java.util.stream.Collectors; 066import java.util.stream.Stream; 067import javax.annotation.Nonnull; 068import javax.annotation.Nullable; 069import javax.persistence.EntityManager; 070import javax.persistence.LockModeType; 071import javax.persistence.Query; 072 073import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT; 074import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; 075import static org.apache.commons.lang3.StringUtils.isBlank; 076 077public class JpaJobPersistenceImpl implements IJobPersistence { 078 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 079 public static final String CREATE_TIME = "myCreateTime"; 080 081 private final IBatch2JobInstanceRepository myJobInstanceRepository; 082 private final IBatch2WorkChunkRepository myWorkChunkRepository; 083 private final EntityManager myEntityManager; 084 private final IHapiTransactionService myTransactionService; 085 086 /** 087 * Constructor 088 */ 089 public JpaJobPersistenceImpl( 090 IBatch2JobInstanceRepository theJobInstanceRepository, 091 IBatch2WorkChunkRepository theWorkChunkRepository, 092 IHapiTransactionService theTransactionService, 093 EntityManager theEntityManager) { 094 Validate.notNull(theJobInstanceRepository); 095 Validate.notNull(theWorkChunkRepository); 096 myJobInstanceRepository = theJobInstanceRepository; 097 myWorkChunkRepository = theWorkChunkRepository; 098 myTransactionService = theTransactionService; 099 myEntityManager = theEntityManager; 100 } 101 102 @Override 103 public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { 104 Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); 105 entity.setId(UUID.randomUUID().toString()); 106 entity.setSequence(theBatchWorkChunk.sequence); 107 entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId); 108 entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion); 109 entity.setTargetStepId(theBatchWorkChunk.targetStepId); 110 entity.setInstanceId(theBatchWorkChunk.instanceId); 111 entity.setSerializedData(theBatchWorkChunk.serializedData); 112 entity.setCreateTime(new Date()); 113 entity.setStartTime(new Date()); 114 entity.setStatus(WorkChunkStatusEnum.QUEUED); 115 ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId()); 116 ourLog.trace( 117 "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData()); 118 myWorkChunkRepository.save(entity); 119 return entity.getId(); 120 } 121 122 @Override 123 @Transactional(propagation = Propagation.REQUIRED) 124 public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) { 125 // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here. On chunk failure, we probably shouldn't be allowed. 126 // But how does re-run happen if k8s kills a processor mid run? 127 List<WorkChunkStatusEnum> priorStates = 128 List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS); 129 int rowsModified = myWorkChunkRepository.updateChunkStatusForStart( 130 theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates); 131 if (rowsModified == 0) { 132 ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId); 133 return Optional.empty(); 134 } else { 135 Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId); 136 return chunk.map(this::toChunk); 137 } 138 } 139 140 @Override 141 @Transactional(propagation = Propagation.REQUIRED) 142 public String storeNewInstance(JobInstance theInstance) { 143 Validate.isTrue(isBlank(theInstance.getInstanceId())); 144 145 Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity(); 146 entity.setId(UUID.randomUUID().toString()); 147 entity.setDefinitionId(theInstance.getJobDefinitionId()); 148 entity.setDefinitionVersion(theInstance.getJobDefinitionVersion()); 149 entity.setStatus(theInstance.getStatus()); 150 entity.setParams(theInstance.getParameters()); 151 entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); 152 entity.setFastTracking(theInstance.isFastTracking()); 153 entity.setCreateTime(new Date()); 154 entity.setStartTime(new Date()); 155 entity.setReport(theInstance.getReport()); 156 157 entity = myJobInstanceRepository.save(entity); 158 return entity.getId(); 159 } 160 161 @Override 162 @Transactional(propagation = Propagation.REQUIRES_NEW) 163 public List<JobInstance> fetchInstances( 164 String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) { 165 return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry( 166 theJobDefinitionId, theStatuses, theCutoff, thePageable)); 167 } 168 169 @Override 170 @Transactional(propagation = Propagation.REQUIRES_NEW) 171 public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus( 172 String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) { 173 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 174 return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus( 175 theJobDefinitionId, theRequestedStatuses, pageRequest)); 176 } 177 178 @Override 179 @Transactional(propagation = Propagation.REQUIRES_NEW) 180 public List<JobInstance> fetchInstancesByJobDefinitionId( 181 String theJobDefinitionId, int thePageSize, int thePageIndex) { 182 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 183 return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest)); 184 } 185 186 @Override 187 @Transactional(propagation = Propagation.REQUIRES_NEW) 188 public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) { 189 PageRequest pageRequest = 190 PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort()); 191 192 String jobStatus = theRequest.getJobStatus(); 193 if (Objects.equals(jobStatus, "")) { 194 Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest); 195 return pageOfEntities.map(this::toInstance); 196 } 197 198 StatusEnum status = StatusEnum.valueOf(jobStatus); 199 List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest)); 200 Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status); 201 return new PageImpl<>(jobs, pageRequest, jobsOfStatus); 202 } 203 204 private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) { 205 return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList()); 206 } 207 208 @Override 209 @Nonnull 210 public Optional<JobInstance> fetchInstance(String theInstanceId) { 211 return myTransactionService 212 .withSystemRequest() 213 .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); 214 } 215 216 @Override 217 @Transactional(propagation = Propagation.REQUIRES_NEW) 218 public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { 219 String definitionId = theRequest.getJobDefinition(); 220 String params = theRequest.getParameters(); 221 Set<StatusEnum> statuses = theRequest.getStatuses(); 222 223 Pageable pageable = PageRequest.of(thePage, theBatchSize); 224 225 List<Batch2JobInstanceEntity> instanceEntities; 226 227 if (statuses != null && !statuses.isEmpty()) { 228 if (definitionId.equals(Batch2JobDefinitionConstants.BULK_EXPORT)) { 229 if (originalRequestUrlTruncation(params) != null) { 230 params = originalRequestUrlTruncation(params); 231 } 232 } 233 instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus( 234 definitionId, params, statuses, pageable); 235 } else { 236 instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable); 237 } 238 return toInstanceList(instanceEntities); 239 } 240 241 private String originalRequestUrlTruncation(String theParams) { 242 try { 243 ObjectMapper mapper = new ObjectMapper(); 244 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 245 mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); 246 JsonNode rootNode = mapper.readTree(theParams); 247 String originalUrl = "originalRequestUrl"; 248 249 if (rootNode instanceof ObjectNode) { 250 ObjectNode objectNode = (ObjectNode) rootNode; 251 252 if (objectNode.has(originalUrl)) { 253 String url = objectNode.get(originalUrl).asText(); 254 if (url.contains("?")) { 255 objectNode.put(originalUrl, url.split("\\?")[0]); 256 } 257 } 258 return mapper.writeValueAsString(objectNode); 259 } 260 } catch (Exception e) { 261 ourLog.info("Error Truncating Original Request Url", e); 262 } 263 return null; 264 } 265 266 @Override 267 @Transactional(propagation = Propagation.REQUIRES_NEW) 268 public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) { 269 // default sort is myCreateTime Asc 270 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 271 return myJobInstanceRepository.findAll(pageRequest).stream() 272 .map(this::toInstance) 273 .collect(Collectors.toList()); 274 } 275 276 @Override 277 @Transactional(propagation = Propagation.REQUIRES_NEW) 278 public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) { 279 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME); 280 return myJobInstanceRepository.findAll(pageRequest).stream() 281 .map(this::toInstance) 282 .collect(Collectors.toList()); 283 } 284 285 private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) { 286 return JobInstanceUtil.fromEntityToWorkChunk(theEntity); 287 } 288 289 private JobInstance toInstance(Batch2JobInstanceEntity theEntity) { 290 return JobInstanceUtil.fromEntityToInstance(theEntity); 291 } 292 293 @Override 294 @Transactional(propagation = Propagation.REQUIRES_NEW) 295 public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { 296 String chunkId = theParameters.getChunkId(); 297 String errorMessage = truncateErrorMessage(theParameters.getErrorMsg()); 298 int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 299 chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); 300 Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId); 301 302 Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed " 303 + ",myErrorMessage = CONCAT('Too many errors: ', myErrorCount, '. Last error msg was ', myErrorMessage) " 304 + "where myId = :chunkId and myErrorCount > :maxCount"); 305 query.setParameter("chunkId", chunkId); 306 query.setParameter("failed", WorkChunkStatusEnum.FAILED); 307 query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT); 308 int failChangeCount = query.executeUpdate(); 309 310 if (failChangeCount > 0) { 311 return WorkChunkStatusEnum.FAILED; 312 } else { 313 return WorkChunkStatusEnum.ERRORED; 314 } 315 } 316 317 @Override 318 @Transactional 319 public void onWorkChunkFailed(String theChunkId, String theErrorMessage) { 320 ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage); 321 String errorMessage = truncateErrorMessage(theErrorMessage); 322 myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 323 theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED); 324 } 325 326 @Override 327 @Transactional 328 public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { 329 myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess( 330 theEvent.getChunkId(), 331 new Date(), 332 theEvent.getRecordsProcessed(), 333 theEvent.getRecoveredErrorCount(), 334 WorkChunkStatusEnum.COMPLETED, 335 theEvent.getRecoveredWarningMessage()); 336 } 337 338 @Nullable 339 private static String truncateErrorMessage(String theErrorMessage) { 340 String errorMessage; 341 if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) { 342 ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage); 343 errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH); 344 } else { 345 errorMessage = theErrorMessage; 346 } 347 return errorMessage; 348 } 349 350 @Override 351 public void markWorkChunksWithStatusAndWipeData( 352 String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) { 353 assert TransactionSynchronizationManager.isActualTransactionActive(); 354 355 ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus); 356 String errorMessage = truncateErrorMessage(theErrorMessage); 357 List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100); 358 for (List<String> idList : listOfListOfIds) { 359 myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError( 360 idList, new Date(), theStatus, errorMessage); 361 } 362 } 363 364 @Override 365 @Transactional(propagation = Propagation.REQUIRES_NEW) 366 public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) { 367 Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId); 368 if (instance.isEmpty()) { 369 return false; 370 } 371 if (instance.get().getStatus().isEnded()) { 372 return false; 373 } 374 Set<WorkChunkStatusEnum> statusesForStep = 375 myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId); 376 377 ourLog.debug( 378 "Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]", 379 theInstanceId, 380 theCurrentStepId, 381 statusesForStep); 382 return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); 383 } 384 385 private void fetchChunks( 386 String theInstanceId, 387 boolean theIncludeData, 388 int thePageSize, 389 int thePageIndex, 390 Consumer<WorkChunk> theConsumer) { 391 myTransactionService 392 .withSystemRequest() 393 .withPropagation(Propagation.REQUIRES_NEW) 394 .execute(() -> { 395 List<Batch2WorkChunkEntity> chunks; 396 if (theIncludeData) { 397 chunks = myWorkChunkRepository.fetchChunks( 398 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 399 } else { 400 chunks = myWorkChunkRepository.fetchChunksNoData( 401 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 402 } 403 for (Batch2WorkChunkEntity chunk : chunks) { 404 theConsumer.accept(toChunk(chunk)); 405 } 406 }); 407 } 408 409 @Override 410 public List<String> fetchAllChunkIdsForStepWithStatus( 411 String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum) { 412 return myTransactionService 413 .withSystemRequest() 414 .withPropagation(Propagation.REQUIRES_NEW) 415 .execute(() -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus( 416 theInstanceId, theStepId, theStatusEnum)); 417 } 418 419 @Override 420 public void updateInstanceUpdateTime(String theInstanceId) { 421 myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date()); 422 } 423 424 /** 425 * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope 426 */ 427 @Override 428 public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) { 429 return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> 430 fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); 431 } 432 433 @Override 434 public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) { 435 return myWorkChunkRepository 436 .fetchChunksForStep(theInstanceId, theStepId) 437 .map(this::toChunk); 438 } 439 440 @Override 441 public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) { 442 Batch2JobInstanceEntity instanceEntity = 443 myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE); 444 if (null == instanceEntity) { 445 ourLog.error("No instance found with Id {}", theInstanceId); 446 return false; 447 } 448 // convert to JobInstance for public api 449 JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity); 450 451 // run the modification callback 452 boolean wasModified = theModifier.doUpdate(jobInstance); 453 454 if (wasModified) { 455 // copy fields back for flush. 456 JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity); 457 } 458 459 return wasModified; 460 } 461 462 @Override 463 @Transactional(propagation = Propagation.REQUIRES_NEW) 464 public void deleteInstanceAndChunks(String theInstanceId) { 465 ourLog.info("Deleting instance and chunks: {}", theInstanceId); 466 myWorkChunkRepository.deleteAllForInstance(theInstanceId); 467 myJobInstanceRepository.deleteById(theInstanceId); 468 } 469 470 @Override 471 @Transactional(propagation = Propagation.REQUIRES_NEW) 472 public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) { 473 ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId); 474 int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId); 475 int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId); 476 ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount); 477 } 478 479 @Override 480 public boolean markInstanceAsStatusWhenStatusIn( 481 String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) { 482 int recordsChanged = 483 myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates); 484 ourLog.debug( 485 "Update job {} to status {} if in status {}: {}", 486 theInstanceId, 487 theStatusEnum, 488 thePriorStates, 489 recordsChanged > 0); 490 return recordsChanged > 0; 491 } 492 493 @Override 494 @Transactional(propagation = Propagation.REQUIRES_NEW) 495 public JobOperationResultJson cancelInstance(String theInstanceId) { 496 int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true); 497 String operationString = "Cancel job instance " + theInstanceId; 498 499 // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer. 500 // Replace with boolean result or ResourceNotFound exception. Build the message up at the ui. 501 String messagePrefix = "Job instance <" + theInstanceId + ">"; 502 if (recordsChanged > 0) { 503 return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled."); 504 } else { 505 Optional<JobInstance> instance = fetchInstance(theInstanceId); 506 if (instance.isPresent()) { 507 return JobOperationResultJson.newFailure( 508 operationString, messagePrefix + " was already cancelled. Nothing to do."); 509 } else { 510 return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found."); 511 } 512 } 513 } 514}