001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.maintenance;
021
022import ca.uhn.fhir.batch2.model.WorkChunk;
023import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
024import ca.uhn.fhir.util.Logs;
025import com.google.common.collect.ArrayListMultimap;
026import com.google.common.collect.Multimap;
027import jakarta.annotation.Nonnull;
028import org.apache.commons.lang3.ArrayUtils;
029import org.slf4j.Logger;
030
031import java.util.Collection;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Set;
035import java.util.stream.Collectors;
036
037import static java.util.Collections.emptyList;
038import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
039
040/**
041 * While performing cleanup, the cleanup job loads all work chunks
042 * to examine their status. This bean collects the counts that
043 * are found, so that they can be reused for maintenance jobs without
044 * needing to hit the database a second time.
045 */
046public class JobChunkProgressAccumulator {
047        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
048
049        private final Set<String> myConsumedInstanceAndChunkIds = new HashSet<>();
050        private final Multimap<String, ChunkStatusCountValue> myInstanceIdToChunkStatuses = ArrayListMultimap.create();
051
052        int getTotalChunkCountForInstanceAndStep(String theInstanceId, String theStepId) {
053                return myInstanceIdToChunkStatuses.get(theInstanceId).stream()
054                                .filter(chunkCount -> chunkCount.myStepId.equals(theStepId))
055                                .collect(Collectors.toList())
056                                .size();
057        }
058
059        public List<String> getChunkIdsWithStatus(
060                        String theInstanceId, String theStepId, WorkChunkStatusEnum... theStatuses) {
061                return getChunkStatuses(theInstanceId).stream()
062                                .filter(t -> t.myStepId.equals(theStepId))
063                                .filter(t -> ArrayUtils.contains(theStatuses, t.myStatus))
064                                .map(t -> t.myChunkId)
065                                .collect(Collectors.toList());
066        }
067
068        @Nonnull
069        private Collection<ChunkStatusCountValue> getChunkStatuses(String theInstanceId) {
070                Collection<ChunkStatusCountValue> chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId);
071                chunkStatuses = defaultIfNull(chunkStatuses, emptyList());
072                return chunkStatuses;
073        }
074
075        public void addChunk(WorkChunk theChunk) {
076                String instanceId = theChunk.getInstanceId();
077                String chunkId = theChunk.getId();
078                // Note: If chunks are being written while we're executing, we may see the same chunk twice. This
079                // check avoids adding it twice.
080                if (myConsumedInstanceAndChunkIds.add(instanceId + " " + chunkId)) {
081                        ourLog.debug(
082                                        "Adding chunk to accumulator. [chunkId={}, instanceId={}, status={}, step={}]",
083                                        chunkId,
084                                        instanceId,
085                                        theChunk.getStatus(),
086                                        theChunk.getTargetStepId());
087                        myInstanceIdToChunkStatuses.put(
088                                        instanceId, new ChunkStatusCountValue(chunkId, theChunk.getTargetStepId(), theChunk.getStatus()));
089                }
090        }
091
092        private static class ChunkStatusCountValue {
093                public final String myChunkId;
094                public final String myStepId;
095                public final WorkChunkStatusEnum myStatus;
096
097                private ChunkStatusCountValue(String theChunkId, String theStepId, WorkChunkStatusEnum theStatus) {
098                        myChunkId = theChunkId;
099                        myStepId = theStepId;
100                        myStatus = theStatus;
101                }
102        }
103}