001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.coordinator; 021 022import ca.uhn.fhir.batch2.api.IJobCoordinator; 023import ca.uhn.fhir.batch2.api.IJobMaintenanceService; 024import ca.uhn.fhir.batch2.api.IJobPersistence; 025import ca.uhn.fhir.batch2.api.JobOperationResultJson; 026import ca.uhn.fhir.batch2.channel.BatchJobSender; 027import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 028import ca.uhn.fhir.batch2.model.JobDefinition; 029import ca.uhn.fhir.batch2.model.JobInstance; 030import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 031import ca.uhn.fhir.batch2.model.StatusEnum; 032import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 033import ca.uhn.fhir.i18n.Msg; 034import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; 035import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 036import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; 037import ca.uhn.fhir.rest.api.server.RequestDetails; 038import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 039import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 040import ca.uhn.fhir.util.Logs; 041import jakarta.annotation.Nonnull; 042import jakarta.annotation.Nullable; 043import jakarta.annotation.PostConstruct; 044import jakarta.annotation.PreDestroy; 045import org.apache.commons.lang3.Validate; 046import org.slf4j.Logger; 047import org.springframework.data.domain.Page; 048import org.springframework.messaging.MessageHandler; 049import org.springframework.transaction.annotation.Propagation; 050 051import java.util.Arrays; 052import java.util.HashSet; 053import java.util.List; 054import java.util.Set; 055 056import static org.apache.commons.lang3.StringUtils.isBlank; 057 058public class JobCoordinatorImpl implements IJobCoordinator { 059 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 060 061 private final IJobPersistence myJobPersistence; 062 private final BatchJobSender myBatchJobSender; 063 private final IChannelReceiver myWorkChannelReceiver; 064 private final JobDefinitionRegistry myJobDefinitionRegistry; 065 private final MessageHandler myReceiverHandler; 066 private final JobQuerySvc myJobQuerySvc; 067 private final JobParameterJsonValidator myJobParameterJsonValidator; 068 private final IHapiTransactionService myTransactionService; 069 070 /** 071 * Constructor 072 */ 073 public JobCoordinatorImpl( 074 @Nonnull BatchJobSender theBatchJobSender, 075 @Nonnull IChannelReceiver theWorkChannelReceiver, 076 @Nonnull IJobPersistence theJobPersistence, 077 @Nonnull JobDefinitionRegistry theJobDefinitionRegistry, 078 @Nonnull WorkChunkProcessor theExecutorSvc, 079 @Nonnull IJobMaintenanceService theJobMaintenanceService, 080 @Nonnull IHapiTransactionService theTransactionService) { 081 Validate.notNull(theJobPersistence); 082 083 myJobPersistence = theJobPersistence; 084 myBatchJobSender = theBatchJobSender; 085 myWorkChannelReceiver = theWorkChannelReceiver; 086 myJobDefinitionRegistry = theJobDefinitionRegistry; 087 088 myReceiverHandler = new WorkChannelMessageHandler( 089 theJobPersistence, 090 theJobDefinitionRegistry, 091 theBatchJobSender, 092 theExecutorSvc, 093 theJobMaintenanceService, 094 theTransactionService); 095 myJobQuerySvc = new JobQuerySvc(theJobPersistence, theJobDefinitionRegistry); 096 myJobParameterJsonValidator = new JobParameterJsonValidator(); 097 myTransactionService = theTransactionService; 098 } 099 100 @Override 101 public Batch2JobStartResponse startInstance( 102 RequestDetails theRequestDetails, JobInstanceStartRequest theStartRequest) { 103 String paramsString = theStartRequest.getParameters(); 104 if (isBlank(paramsString)) { 105 throw new InvalidRequestException(Msg.code(2065) + "No parameters supplied"); 106 } 107 Validate.notBlank(theStartRequest.getJobDefinitionId(), "No job definition ID supplied in start request"); 108 109 // if cache - use that first 110 if (theStartRequest.isUseCache()) { 111 FetchJobInstancesRequest request = new FetchJobInstancesRequest( 112 theStartRequest.getJobDefinitionId(), theStartRequest.getParameters(), getStatesThatTriggerCache()); 113 114 List<JobInstance> existing = myJobPersistence.fetchInstances(request, 0, 1000); 115 if (!existing.isEmpty()) { 116 // we'll look for completed ones first... otherwise, take any of the others 117 existing.sort( 118 (o1, o2) -> -(o1.getStatus().ordinal() - o2.getStatus().ordinal())); 119 120 JobInstance first = existing.stream().findFirst().orElseThrow(); 121 122 Batch2JobStartResponse response = new Batch2JobStartResponse(); 123 response.setInstanceId(first.getInstanceId()); 124 response.setUsesCachedResult(true); 125 126 ourLog.info( 127 "Reusing cached {} job with status {} and id {}", 128 first.getJobDefinitionId(), 129 first.getStatus(), 130 first.getInstanceId()); 131 132 return response; 133 } 134 } 135 136 JobDefinition<?> jobDefinition = myJobDefinitionRegistry 137 .getLatestJobDefinition(theStartRequest.getJobDefinitionId()) 138 .orElseThrow(() -> new IllegalArgumentException( 139 Msg.code(2063) + "Unknown job definition ID: " + theStartRequest.getJobDefinitionId())); 140 141 myJobParameterJsonValidator.validateJobParameters(theRequestDetails, theStartRequest, jobDefinition); 142 143 // we only create the first chunk amd job here 144 // JobMaintenanceServiceImpl.doMaintenancePass will handle the rest 145 IJobPersistence.CreateResult instanceAndFirstChunk = myTransactionService 146 .withSystemRequestOnDefaultPartition() 147 .withPropagation(Propagation.REQUIRES_NEW) 148 .execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters())); 149 150 Batch2JobStartResponse response = new Batch2JobStartResponse(); 151 response.setInstanceId(instanceAndFirstChunk.jobInstanceId); 152 return response; 153 } 154 155 /** 156 * Cache will be used if an identical job is QUEUED or IN_PROGRESS. Otherwise a new one will kickoff. 157 */ 158 private StatusEnum[] getStatesThatTriggerCache() { 159 return new StatusEnum[] {StatusEnum.QUEUED, StatusEnum.IN_PROGRESS}; 160 } 161 162 @Override 163 @Nonnull 164 public JobInstance getInstance(String theInstanceId) { 165 return myJobQuerySvc.fetchInstance(theInstanceId); 166 } 167 168 @Override 169 public List<JobInstance> getInstances(int thePageSize, int thePageIndex) { 170 return myJobQuerySvc.fetchInstances(thePageSize, thePageIndex); 171 } 172 173 @Override 174 public List<JobInstance> getRecentInstances(int theCount, int theStart) { 175 return myJobQuerySvc.fetchRecentInstances(theCount, theStart); 176 } 177 178 @Override 179 public List<JobInstance> getInstancesbyJobDefinitionIdAndEndedStatus( 180 String theJobDefinitionId, @Nullable Boolean theEnded, int theCount, int theStart) { 181 return myJobQuerySvc.getInstancesByJobDefinitionIdAndEndedStatus( 182 theJobDefinitionId, theEnded, theCount, theStart); 183 } 184 185 @Override 186 public List<JobInstance> getJobInstancesByJobDefinitionIdAndStatuses( 187 String theJobDefinitionId, Set<StatusEnum> theStatuses, int theCount, int theStart) { 188 return myJobQuerySvc.getInstancesByJobDefinitionAndStatuses( 189 theJobDefinitionId, theStatuses, theCount, theStart); 190 } 191 192 @Override 193 public List<JobInstance> getJobInstancesByJobDefinitionId(String theJobDefinitionId, int theCount, int theStart) { 194 return getJobInstancesByJobDefinitionIdAndStatuses( 195 theJobDefinitionId, new HashSet<>(Arrays.asList(StatusEnum.values())), theCount, theStart); 196 } 197 198 @Override 199 public Page<JobInstance> fetchAllJobInstances(JobInstanceFetchRequest theFetchRequest) { 200 return myJobQuerySvc.fetchAllInstances(theFetchRequest); 201 } 202 203 // wipmb For 6.8 - Clarify this interface. We currently return a JobOperationResultJson, and don't throw 204 // ResourceNotFoundException 205 @Override 206 public JobOperationResultJson cancelInstance(String theInstanceId) throws ResourceNotFoundException { 207 return myJobPersistence.cancelInstance(theInstanceId); 208 } 209 210 @PostConstruct 211 public void start() { 212 myWorkChannelReceiver.subscribe(myReceiverHandler); 213 } 214 215 @PreDestroy 216 public void stop() { 217 myWorkChannelReceiver.unsubscribe(myReceiverHandler); 218 } 219}