001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.coordinator;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
024import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
025import ca.uhn.fhir.batch2.model.JobWorkCursor;
026import ca.uhn.fhir.batch2.model.StatusEnum;
027import ca.uhn.fhir.batch2.model.WorkChunkData;
028import ca.uhn.fhir.batch2.progress.InstanceProgress;
029import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
030import ca.uhn.fhir.i18n.Msg;
031import ca.uhn.fhir.model.api.IModelJson;
032import ca.uhn.fhir.util.JsonUtil;
033import ca.uhn.fhir.util.Logs;
034import org.apache.commons.lang3.Validate;
035import org.slf4j.Logger;
036
037import java.util.Date;
038
039public class ReductionStepDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
040                extends BaseDataSink<PT, IT, OT> {
041        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
042
043        private final IJobPersistence myJobPersistence;
044        private final JobDefinitionRegistry myJobDefinitionRegistry;
045
046        public ReductionStepDataSink(
047                        String theInstanceId,
048                        JobWorkCursor<PT, IT, OT> theJobWorkCursor,
049                        IJobPersistence thePersistence,
050                        JobDefinitionRegistry theJobDefinitionRegistry) {
051                super(theInstanceId, theJobWorkCursor);
052                myJobPersistence = thePersistence;
053                myJobDefinitionRegistry = theJobDefinitionRegistry;
054        }
055
056        @Override
057        public void accept(WorkChunkData<OT> theData) {
058                String instanceId = getInstanceId();
059                OT data = theData.getData();
060                String dataString = JsonUtil.serialize(data, false);
061                JobChunkProgressAccumulator progressAccumulator = new JobChunkProgressAccumulator();
062                JobInstanceProgressCalculator myJobInstanceProgressCalculator =
063                                new JobInstanceProgressCalculator(myJobPersistence, progressAccumulator, myJobDefinitionRegistry);
064
065                InstanceProgress progress = myJobInstanceProgressCalculator.calculateInstanceProgress(instanceId);
066                boolean changed = myJobPersistence.updateInstance(instanceId, instance -> {
067                        Validate.validState(
068                                        StatusEnum.FINALIZE.equals(instance.getStatus()),
069                                        "Job %s must be in FINALIZE state.  In %s",
070                                        instanceId,
071                                        instance.getStatus());
072
073                        if (instance.getReport() != null) {
074                                // last in wins - so we won't throw
075                                ourLog.error("Report has already been set. Now it is being overwritten. Last in will win!");
076                        }
077
078                        /*
079                         * For jobs without a reduction step at the end, the maintenance service marks the job instance
080                         * as COMPLETE when all chunks are complete, and calculates the final counts and progress.
081                         * However, for jobs with a reduction step at the end the maintenance service stops working
082                         * on the job while the job is in FINALIZE state, and this sink is ultimately responsible
083                         * for marking the instance as COMPLETE at the end of the reduction.
084                         *
085                         * So, make sure we update the stats and counts before marking as complete here.
086                         *
087                         * I could envision a better setup where the stuff that the maintenance service touches
088                         * is moved into separate DB tables or transactions away from the stuff that the
089                         * reducer touches. If the two could never collide we wouldn't need this duplication
090                         * here. Until then though, this is safer.
091                         */
092
093                        progress.updateInstanceForReductionStep(instance);
094
095                        instance.setReport(dataString);
096                        instance.setStatus(StatusEnum.COMPLETED);
097                        instance.setEndTime(new Date());
098
099                        ourLog.info(
100                                        "Finalizing job instance {} with report length {} chars",
101                                        instance.getInstanceId(),
102                                        dataString.length());
103                        if (ourLog.isTraceEnabled()) {
104                                ourLog.trace("New instance state: {}", JsonUtil.serialize(instance));
105                        }
106
107                        return true;
108                });
109
110                if (!changed) {
111                        ourLog.error("No instance found with Id {} in FINALIZE state", instanceId);
112
113                        throw new JobExecutionFailedException(Msg.code(2097) + ("No instance found with Id " + instanceId));
114                }
115        }
116
117        @Override
118        public int getWorkChunkCount() {
119                return 0;
120        }
121}