001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.coordinator; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.JobExecutionFailedException; 024import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator; 025import ca.uhn.fhir.batch2.model.JobWorkCursor; 026import ca.uhn.fhir.batch2.model.StatusEnum; 027import ca.uhn.fhir.batch2.model.WorkChunkData; 028import ca.uhn.fhir.batch2.progress.InstanceProgress; 029import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; 030import ca.uhn.fhir.i18n.Msg; 031import ca.uhn.fhir.model.api.IModelJson; 032import ca.uhn.fhir.util.JsonUtil; 033import ca.uhn.fhir.util.Logs; 034import org.apache.commons.lang3.Validate; 035import org.slf4j.Logger; 036 037import java.util.Date; 038 039public class ReductionStepDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> 040 extends BaseDataSink<PT, IT, OT> { 041 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 042 043 private final IJobPersistence myJobPersistence; 044 private final JobDefinitionRegistry myJobDefinitionRegistry; 045 046 public ReductionStepDataSink( 047 String theInstanceId, 048 JobWorkCursor<PT, IT, OT> theJobWorkCursor, 049 IJobPersistence thePersistence, 050 JobDefinitionRegistry theJobDefinitionRegistry) { 051 super(theInstanceId, theJobWorkCursor); 052 myJobPersistence = thePersistence; 053 myJobDefinitionRegistry = theJobDefinitionRegistry; 054 } 055 056 @Override 057 public void accept(WorkChunkData<OT> theData) { 058 String instanceId = getInstanceId(); 059 OT data = theData.getData(); 060 String dataString = JsonUtil.serialize(data, false); 061 JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator(); 062 JobInstanceProgressCalculator myJobInstanceProgressCalculator = 063 new JobInstanceProgressCalculator(myJobPersistence, progressAccumulator, myJobDefinitionRegistry); 064 065 InstanceProgress progress = myJobInstanceProgressCalculator.calculateInstanceProgress(instanceId); 066 boolean changed = myJobPersistence.updateInstance(instanceId, instance -> { 067 Validate.validState( 068 StatusEnum.FINALIZE.equals(instance.getStatus()), 069 "Job %s must be in FINALIZE state. In %s", 070 instanceId, 071 instance.getStatus()); 072 073 if (instance.getReport() != null) { 074 // last in wins - so we won't throw 075 ourLog.error("Report has already been set. Now it is being overwritten. Last in will win!"); 076 } 077 078 /* 079 * For jobs without a reduction step at the end, the maintenance service marks the job instance 080 * as COMPLETE when all chunks are complete, and calculates the final counts and progress. 081 * However, for jobs with a reduction step at the end the maintenance service stops working 082 * on the job while the job is in FINALIZE state, and this sink is ultimately responsible 083 * for marking the instance as COMPLETE at the end of the reduction. 084 * 085 * So, make sure we update the stats and counts before marking as complete here. 086 * 087 * I could envision a better setup where the stuff that the maintenance service touches 088 * is moved into separate DB tables or transactions away from the stuff that the 089 * reducer touches. If the two could never collide we wouldn't need this duplication 090 * here. Until then though, this is safer. 091 */ 092 093 progress.updateInstanceForReductionStep(instance); 094 095 instance.setReport(dataString); 096 instance.setStatus(StatusEnum.COMPLETED); 097 instance.setEndTime(new Date()); 098 099 ourLog.info( 100 "Finalizing job instance {} with report length {} chars", 101 instance.getInstanceId(), 102 dataString.length()); 103 if (ourLog.isTraceEnabled()) { 104 ourLog.trace("New instance state: {}", JsonUtil.serialize(instance)); 105 } 106 107 return true; 108 }); 109 110 if (!changed) { 111 ourLog.error("No instance found with Id {} in FINALIZE state", instanceId); 112 113 throw new JobExecutionFailedException(Msg.code(2097) + ("No instance found with Id " + instanceId)); 114 } 115 } 116 117 @Override 118 public int getWorkChunkCount() { 119 return 0; 120 } 121}