001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.jobs.step;
021
022import ca.uhn.fhir.batch2.api.IFirstJobStepWorker;
023import ca.uhn.fhir.batch2.api.IJobDataSink;
024import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
025import ca.uhn.fhir.batch2.api.RunOutcome;
026import ca.uhn.fhir.batch2.api.StepExecutionDetails;
027import ca.uhn.fhir.batch2.api.VoidModel;
028import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
029import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
030import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
031import ca.uhn.fhir.util.Logs;
032import jakarta.annotation.Nonnull;
033import org.slf4j.Logger;
034import org.thymeleaf.util.StringUtils;
035
036import java.util.Date;
037import java.util.List;
038
039import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
040
041public class GenerateRangeChunksStep<PT extends PartitionedUrlJobParameters>
042                implements IFirstJobStepWorker<PT, ChunkRangeJson> {
043        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
044
045        @Nonnull
046        @Override
047        public RunOutcome run(
048                        @Nonnull StepExecutionDetails<PT, VoidModel> theStepExecutionDetails,
049                        @Nonnull IJobDataSink<ChunkRangeJson> theDataSink)
050                        throws JobExecutionFailedException {
051                PT params = theStepExecutionDetails.getParameters();
052
053                Date start = BATCH_START_DATE;
054                Date end = new Date();
055
056                List<PartitionedUrl> partitionedUrls = params.getPartitionedUrls();
057
058                if (!partitionedUrls.isEmpty()) {
059                        partitionedUrls.forEach(partitionedUrl -> {
060                                ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end)
061                                                .setUrl(partitionedUrl.getUrl())
062                                                .setPartitionId(partitionedUrl.getRequestPartitionId());
063                                sendChunk(chunkRangeJson, theDataSink);
064                        });
065                        return RunOutcome.SUCCESS;
066                }
067
068                ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end);
069                sendChunk(chunkRangeJson, theDataSink);
070                return RunOutcome.SUCCESS;
071        }
072
073        private void sendChunk(ChunkRangeJson theData, IJobDataSink<ChunkRangeJson> theDataSink) {
074                String url = theData.getUrl();
075                ourLog.trace(
076                                "Creating chunks for [{}] from {} to {} for partition {}",
077                                !StringUtils.isEmpty(url) ? url : "everything",
078                                theData.getStart(),
079                                theData.getEnd(),
080                                theData.getPartitionId());
081                theDataSink.accept(theData);
082        }
083}