001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
025import ca.uhn.fhir.batch2.model.JobInstance;
026import ca.uhn.fhir.batch2.model.StatusEnum;
027import ca.uhn.fhir.batch2.model.WorkChunk;
028import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
029import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
032import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
033import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
034import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
035import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
036import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
037import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
038import ca.uhn.fhir.model.api.PagingIterator;
039import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
040import ca.uhn.fhir.util.Logs;
041import com.fasterxml.jackson.core.JsonParser;
042import com.fasterxml.jackson.databind.JsonNode;
043import com.fasterxml.jackson.databind.ObjectMapper;
044import com.fasterxml.jackson.databind.node.ObjectNode;
045import org.apache.commons.collections4.ListUtils;
046import org.apache.commons.lang3.Validate;
047import org.slf4j.Logger;
048import org.springframework.data.domain.Page;
049import org.springframework.data.domain.PageImpl;
050import org.springframework.data.domain.PageRequest;
051import org.springframework.data.domain.Pageable;
052import org.springframework.data.domain.Sort;
053import org.springframework.transaction.annotation.Propagation;
054import org.springframework.transaction.annotation.Transactional;
055import org.springframework.transaction.support.TransactionSynchronizationManager;
056
057import java.util.Date;
058import java.util.Iterator;
059import java.util.List;
060import java.util.Objects;
061import java.util.Optional;
062import java.util.Set;
063import java.util.UUID;
064import java.util.function.Consumer;
065import java.util.stream.Collectors;
066import java.util.stream.Stream;
067import javax.annotation.Nonnull;
068import javax.annotation.Nullable;
069import javax.persistence.EntityManager;
070import javax.persistence.LockModeType;
071import javax.persistence.Query;
072
073import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
074import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
075import static org.apache.commons.lang3.StringUtils.isBlank;
076
077public class JpaJobPersistenceImpl implements IJobPersistence {
078        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
079        public static final String CREATE_TIME = "myCreateTime";
080
081        private final IBatch2JobInstanceRepository myJobInstanceRepository;
082        private final IBatch2WorkChunkRepository myWorkChunkRepository;
083        private final EntityManager myEntityManager;
084        private final IHapiTransactionService myTransactionService;
085
086        /**
087         * Constructor
088         */
089        public JpaJobPersistenceImpl(
090                        IBatch2JobInstanceRepository theJobInstanceRepository,
091                        IBatch2WorkChunkRepository theWorkChunkRepository,
092                        IHapiTransactionService theTransactionService,
093                        EntityManager theEntityManager) {
094                Validate.notNull(theJobInstanceRepository);
095                Validate.notNull(theWorkChunkRepository);
096                myJobInstanceRepository = theJobInstanceRepository;
097                myWorkChunkRepository = theWorkChunkRepository;
098                myTransactionService = theTransactionService;
099                myEntityManager = theEntityManager;
100        }
101
102        @Override
103        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
104                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
105                entity.setId(UUID.randomUUID().toString());
106                entity.setSequence(theBatchWorkChunk.sequence);
107                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
108                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
109                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
110                entity.setInstanceId(theBatchWorkChunk.instanceId);
111                entity.setSerializedData(theBatchWorkChunk.serializedData);
112                entity.setCreateTime(new Date());
113                entity.setStartTime(new Date());
114                entity.setStatus(WorkChunkStatusEnum.QUEUED);
115                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
116                ourLog.trace(
117                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
118                myWorkChunkRepository.save(entity);
119                return entity.getId();
120        }
121
122        @Override
123        @Transactional(propagation = Propagation.REQUIRED)
124        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
125                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
126                // But how does re-run happen if k8s kills a processor mid run?
127                List<WorkChunkStatusEnum> priorStates =
128                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
129                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
130                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
131                if (rowsModified == 0) {
132                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
133                        return Optional.empty();
134                } else {
135                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
136                        return chunk.map(this::toChunk);
137                }
138        }
139
140        @Override
141        @Transactional(propagation = Propagation.REQUIRED)
142        public String storeNewInstance(JobInstance theInstance) {
143                Validate.isTrue(isBlank(theInstance.getInstanceId()));
144
145                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
146                entity.setId(UUID.randomUUID().toString());
147                entity.setDefinitionId(theInstance.getJobDefinitionId());
148                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
149                entity.setStatus(theInstance.getStatus());
150                entity.setParams(theInstance.getParameters());
151                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
152                entity.setFastTracking(theInstance.isFastTracking());
153                entity.setCreateTime(new Date());
154                entity.setStartTime(new Date());
155                entity.setReport(theInstance.getReport());
156
157                entity = myJobInstanceRepository.save(entity);
158                return entity.getId();
159        }
160
161        @Override
162        @Transactional(propagation = Propagation.REQUIRES_NEW)
163        public List<JobInstance> fetchInstances(
164                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
165                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
166                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
167        }
168
169        @Override
170        @Transactional(propagation = Propagation.REQUIRES_NEW)
171        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
172                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
173                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
174                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
175                                theJobDefinitionId, theRequestedStatuses, pageRequest));
176        }
177
178        @Override
179        @Transactional(propagation = Propagation.REQUIRES_NEW)
180        public List<JobInstance> fetchInstancesByJobDefinitionId(
181                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
182                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
183                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
184        }
185
186        @Override
187        @Transactional(propagation = Propagation.REQUIRES_NEW)
188        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
189                PageRequest pageRequest =
190                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
191
192                String jobStatus = theRequest.getJobStatus();
193                if (Objects.equals(jobStatus, "")) {
194                        Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest);
195                        return pageOfEntities.map(this::toInstance);
196                }
197
198                StatusEnum status = StatusEnum.valueOf(jobStatus);
199                List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest));
200                Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status);
201                return new PageImpl<>(jobs, pageRequest, jobsOfStatus);
202        }
203
204        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
205                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
206        }
207
208        @Override
209        @Nonnull
210        public Optional<JobInstance> fetchInstance(String theInstanceId) {
211                return myTransactionService
212                                .withSystemRequest()
213                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
214        }
215
216        @Override
217        @Transactional(propagation = Propagation.REQUIRES_NEW)
218        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
219                String definitionId = theRequest.getJobDefinition();
220                String params = theRequest.getParameters();
221                Set<StatusEnum> statuses = theRequest.getStatuses();
222
223                Pageable pageable = PageRequest.of(thePage, theBatchSize);
224
225                List<Batch2JobInstanceEntity> instanceEntities;
226
227                if (statuses != null && !statuses.isEmpty()) {
228                        if (definitionId.equals(Batch2JobDefinitionConstants.BULK_EXPORT)) {
229                                if (originalRequestUrlTruncation(params) != null) {
230                                        params = originalRequestUrlTruncation(params);
231                                }
232                        }
233                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
234                                        definitionId, params, statuses, pageable);
235                } else {
236                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
237                }
238                return toInstanceList(instanceEntities);
239        }
240
241        private String originalRequestUrlTruncation(String theParams) {
242                try {
243                        ObjectMapper mapper = new ObjectMapper();
244                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
245                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
246                        JsonNode rootNode = mapper.readTree(theParams);
247                        String originalUrl = "originalRequestUrl";
248
249                        if (rootNode instanceof ObjectNode) {
250                                ObjectNode objectNode = (ObjectNode) rootNode;
251
252                                if (objectNode.has(originalUrl)) {
253                                        String url = objectNode.get(originalUrl).asText();
254                                        if (url.contains("?")) {
255                                                objectNode.put(originalUrl, url.split("\\?")[0]);
256                                        }
257                                }
258                                return mapper.writeValueAsString(objectNode);
259                        }
260                } catch (Exception e) {
261                        ourLog.info("Error Truncating Original Request Url", e);
262                }
263                return null;
264        }
265
266        @Override
267        @Transactional(propagation = Propagation.REQUIRES_NEW)
268        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
269                // default sort is myCreateTime Asc
270                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
271                return myJobInstanceRepository.findAll(pageRequest).stream()
272                                .map(this::toInstance)
273                                .collect(Collectors.toList());
274        }
275
276        @Override
277        @Transactional(propagation = Propagation.REQUIRES_NEW)
278        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
279                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
280                return myJobInstanceRepository.findAll(pageRequest).stream()
281                                .map(this::toInstance)
282                                .collect(Collectors.toList());
283        }
284
285        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
286                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
287        }
288
289        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
290                return JobInstanceUtil.fromEntityToInstance(theEntity);
291        }
292
293        @Override
294        @Transactional(propagation = Propagation.REQUIRES_NEW)
295        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
296                String chunkId = theParameters.getChunkId();
297                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
298                int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
299                                chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
300                Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);
301
302                Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
303                                + ",myErrorMessage = CONCAT('Too many errors: ',  myErrorCount, '. Last error msg was ', myErrorMessage) "
304                                + "where myId = :chunkId and myErrorCount > :maxCount");
305                query.setParameter("chunkId", chunkId);
306                query.setParameter("failed", WorkChunkStatusEnum.FAILED);
307                query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
308                int failChangeCount = query.executeUpdate();
309
310                if (failChangeCount > 0) {
311                        return WorkChunkStatusEnum.FAILED;
312                } else {
313                        return WorkChunkStatusEnum.ERRORED;
314                }
315        }
316
317        @Override
318        @Transactional
319        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
320                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
321                String errorMessage = truncateErrorMessage(theErrorMessage);
322                myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
323                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED);
324        }
325
326        @Override
327        @Transactional
328        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
329                myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
330                                theEvent.getChunkId(),
331                                new Date(),
332                                theEvent.getRecordsProcessed(),
333                                theEvent.getRecoveredErrorCount(),
334                                WorkChunkStatusEnum.COMPLETED,
335                                theEvent.getRecoveredWarningMessage());
336        }
337
338        @Nullable
339        private static String truncateErrorMessage(String theErrorMessage) {
340                String errorMessage;
341                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
342                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
343                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
344                } else {
345                        errorMessage = theErrorMessage;
346                }
347                return errorMessage;
348        }
349
350        @Override
351        public void markWorkChunksWithStatusAndWipeData(
352                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
353                assert TransactionSynchronizationManager.isActualTransactionActive();
354
355                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
356                String errorMessage = truncateErrorMessage(theErrorMessage);
357                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
358                for (List<String> idList : listOfListOfIds) {
359                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
360                                        idList, new Date(), theStatus, errorMessage);
361                }
362        }
363
364        @Override
365        @Transactional(propagation = Propagation.REQUIRES_NEW)
366        public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
367                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
368                if (instance.isEmpty()) {
369                        return false;
370                }
371                if (instance.get().getStatus().isEnded()) {
372                        return false;
373                }
374                Set<WorkChunkStatusEnum> statusesForStep =
375                                myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
376
377                ourLog.debug(
378                                "Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]",
379                                theInstanceId,
380                                theCurrentStepId,
381                                statusesForStep);
382                return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
383        }
384
385        private void fetchChunks(
386                        String theInstanceId,
387                        boolean theIncludeData,
388                        int thePageSize,
389                        int thePageIndex,
390                        Consumer<WorkChunk> theConsumer) {
391                myTransactionService
392                                .withSystemRequest()
393                                .withPropagation(Propagation.REQUIRES_NEW)
394                                .execute(() -> {
395                                        List<Batch2WorkChunkEntity> chunks;
396                                        if (theIncludeData) {
397                                                chunks = myWorkChunkRepository.fetchChunks(
398                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
399                                        } else {
400                                                chunks = myWorkChunkRepository.fetchChunksNoData(
401                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
402                                        }
403                                        for (Batch2WorkChunkEntity chunk : chunks) {
404                                                theConsumer.accept(toChunk(chunk));
405                                        }
406                                });
407        }
408
409        @Override
410        public List<String> fetchAllChunkIdsForStepWithStatus(
411                        String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum) {
412                return myTransactionService
413                                .withSystemRequest()
414                                .withPropagation(Propagation.REQUIRES_NEW)
415                                .execute(() -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus(
416                                                theInstanceId, theStepId, theStatusEnum));
417        }
418
419        @Override
420        public void updateInstanceUpdateTime(String theInstanceId) {
421                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
422        }
423
424        /**
425         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
426         */
427        @Override
428        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
429                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
430                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
431        }
432
433        @Override
434        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
435                return myWorkChunkRepository
436                                .fetchChunksForStep(theInstanceId, theStepId)
437                                .map(this::toChunk);
438        }
439
440        @Override
441        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
442                Batch2JobInstanceEntity instanceEntity =
443                                myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE);
444                if (null == instanceEntity) {
445                        ourLog.error("No instance found with Id {}", theInstanceId);
446                        return false;
447                }
448                // convert to JobInstance for public api
449                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
450
451                // run the modification callback
452                boolean wasModified = theModifier.doUpdate(jobInstance);
453
454                if (wasModified) {
455                        // copy fields back for flush.
456                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
457                }
458
459                return wasModified;
460        }
461
462        @Override
463        @Transactional(propagation = Propagation.REQUIRES_NEW)
464        public void deleteInstanceAndChunks(String theInstanceId) {
465                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
466                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
467                myJobInstanceRepository.deleteById(theInstanceId);
468        }
469
470        @Override
471        @Transactional(propagation = Propagation.REQUIRES_NEW)
472        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
473                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
474                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
475                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
476                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
477        }
478
479        @Override
480        public boolean markInstanceAsStatusWhenStatusIn(
481                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
482                int recordsChanged =
483                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
484                ourLog.debug(
485                                "Update job {} to status {} if in status {}: {}",
486                                theInstanceId,
487                                theStatusEnum,
488                                thePriorStates,
489                                recordsChanged > 0);
490                return recordsChanged > 0;
491        }
492
493        @Override
494        @Transactional(propagation = Propagation.REQUIRES_NEW)
495        public JobOperationResultJson cancelInstance(String theInstanceId) {
496                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
497                String operationString = "Cancel job instance " + theInstanceId;
498
499                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
500                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
501                String messagePrefix = "Job instance <" + theInstanceId + ">";
502                if (recordsChanged > 0) {
503                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
504                } else {
505                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
506                        if (instance.isPresent()) {
507                                return JobOperationResultJson.newFailure(
508                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
509                        } else {
510                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
511                        }
512                }
513        }
514}