001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.jobs.step; 021 022import ca.uhn.fhir.batch2.api.IFirstJobStepWorker; 023import ca.uhn.fhir.batch2.api.IJobDataSink; 024import ca.uhn.fhir.batch2.api.JobExecutionFailedException; 025import ca.uhn.fhir.batch2.api.RunOutcome; 026import ca.uhn.fhir.batch2.api.StepExecutionDetails; 027import ca.uhn.fhir.batch2.api.VoidModel; 028import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; 029import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; 030import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters; 031import ca.uhn.fhir.util.Logs; 032import jakarta.annotation.Nonnull; 033import org.slf4j.Logger; 034import org.thymeleaf.util.StringUtils; 035 036import java.util.Date; 037import java.util.List; 038 039import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE; 040 041public class GenerateRangeChunksStep<PT extends PartitionedUrlJobParameters> 042 implements IFirstJobStepWorker<PT, ChunkRangeJson> { 043 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 044 045 @Nonnull 046 @Override 047 public RunOutcome run( 048 @Nonnull StepExecutionDetails<PT, VoidModel> theStepExecutionDetails, 049 @Nonnull IJobDataSink<ChunkRangeJson> theDataSink) 050 throws JobExecutionFailedException { 051 PT params = theStepExecutionDetails.getParameters(); 052 053 Date start = BATCH_START_DATE; 054 Date end = new Date(); 055 056 List<PartitionedUrl> partitionedUrls = params.getPartitionedUrls(); 057 058 if (!partitionedUrls.isEmpty()) { 059 partitionedUrls.forEach(partitionedUrl -> { 060 ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end) 061 .setUrl(partitionedUrl.getUrl()) 062 .setPartitionId(partitionedUrl.getRequestPartitionId()); 063 sendChunk(chunkRangeJson, theDataSink); 064 }); 065 return RunOutcome.SUCCESS; 066 } 067 068 ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end); 069 sendChunk(chunkRangeJson, theDataSink); 070 return RunOutcome.SUCCESS; 071 } 072 073 private void sendChunk(ChunkRangeJson theData, IJobDataSink<ChunkRangeJson> theDataSink) { 074 String url = theData.getUrl(); 075 ourLog.trace( 076 "Creating chunks for [{}] from {} to {} for partition {}", 077 !StringUtils.isEmpty(url) ? url : "everything", 078 theData.getStart(), 079 theData.getEnd(), 080 theData.getPartitionId()); 081 theDataSink.accept(theData); 082 } 083}