001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.jobs.step; 021 022import ca.uhn.fhir.batch2.api.IJobDataSink; 023import ca.uhn.fhir.batch2.api.IJobStepWorker; 024import ca.uhn.fhir.batch2.api.JobExecutionFailedException; 025import ca.uhn.fhir.batch2.api.RunOutcome; 026import ca.uhn.fhir.batch2.api.StepExecutionDetails; 027import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; 028import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; 029import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson; 030import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters; 031import ca.uhn.fhir.interceptor.model.RequestPartitionId; 032import ca.uhn.fhir.jpa.api.pid.IResourcePidStream; 033import ca.uhn.fhir.util.Logs; 034import jakarta.annotation.Nonnull; 035import org.slf4j.Logger; 036 037import java.util.Collection; 038import java.util.Date; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.stream.Stream; 041 042import static ca.uhn.fhir.util.StreamUtil.partition; 043import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 044 045public class ResourceIdListStep<PT extends PartitionedUrlJobParameters> 046 implements IJobStepWorker<PT, ChunkRangeJson, ResourceIdListWorkChunkJson> { 047 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 048 049 protected static final int MAX_BATCH_OF_IDS = 500; 050 051 private final IIdChunkProducer<ChunkRangeJson> myIdChunkProducer; 052 053 public ResourceIdListStep(IIdChunkProducer<ChunkRangeJson> theIdChunkProducer) { 054 myIdChunkProducer = theIdChunkProducer; 055 } 056 057 @Nonnull 058 @Override 059 public RunOutcome run( 060 @Nonnull StepExecutionDetails<PT, ChunkRangeJson> theStepExecutionDetails, 061 @Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink) 062 throws JobExecutionFailedException { 063 ChunkRangeJson data = theStepExecutionDetails.getData(); 064 065 Date start = data.getStart(); 066 Date end = data.getEnd(); 067 Integer batchSize = theStepExecutionDetails.getParameters().getBatchSize(); 068 069 ourLog.trace( 070 "Beginning to submit chunks in range {} to {} for url {} and partitionId {}", 071 start, 072 end, 073 data.getUrl(), 074 data.getPartitionId()); 075 076 int chunkSize = Math.min(defaultIfNull(batchSize, MAX_BATCH_OF_IDS), MAX_BATCH_OF_IDS); 077 final IResourcePidStream searchResult = 078 myIdChunkProducer.fetchResourceIdStream(theStepExecutionDetails.getData()); 079 080 searchResult.visitStreamNoResult(typedResourcePidStream -> { 081 AtomicInteger totalIdsFound = new AtomicInteger(); 082 AtomicInteger chunkCount = new AtomicInteger(); 083 084 Stream<TypedPidJson> jsonStream = typedResourcePidStream.map(TypedPidJson::new); 085 086 // chunk by size maxBatchId and submit the batches 087 partition(jsonStream, chunkSize).forEach(idBatch -> { 088 totalIdsFound.addAndGet(idBatch.size()); 089 chunkCount.getAndIncrement(); 090 submitWorkChunk(idBatch, searchResult.getRequestPartitionId(), theDataSink); 091 }); 092 ourLog.trace( 093 "Submitted {} chunks with {} resource IDs for url {} and partitionId {}", 094 chunkCount, 095 totalIdsFound, 096 data.getUrl(), 097 data.getPartitionId()); 098 }); 099 100 return RunOutcome.SUCCESS; 101 } 102 103 private void submitWorkChunk( 104 Collection<TypedPidJson> theTypedPids, 105 RequestPartitionId theRequestPartitionId, 106 IJobDataSink<ResourceIdListWorkChunkJson> theDataSink) { 107 if (theTypedPids.isEmpty()) { 108 return; 109 } 110 ourLog.trace("Submitting work chunk in partition {} with {} IDs", theRequestPartitionId, theTypedPids.size()); 111 ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(theTypedPids, theRequestPartitionId); 112 ourLog.trace("IDs are: {}", data); 113 theDataSink.accept(data); 114 } 115}