001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.maintenance; 021 022import ca.uhn.fhir.batch2.model.WorkChunk; 023import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 024import ca.uhn.fhir.util.Logs; 025import com.google.common.collect.ArrayListMultimap; 026import com.google.common.collect.Multimap; 027import jakarta.annotation.Nonnull; 028import org.apache.commons.lang3.ArrayUtils; 029import org.slf4j.Logger; 030 031import java.util.Collection; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Set; 035import java.util.stream.Collectors; 036 037import static java.util.Collections.emptyList; 038import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 039 040/** 041 * While performing cleanup, the cleanup job loads all work chunks 042 * to examine their status. This bean collects the counts that 043 * are found, so that they can be reused for maintenance jobs without 044 * needing to hit the database a second time. 045 */ 046public class JobChunkProgressAccumulator { 047 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 048 049 private final Set<String> myConsumedInstanceAndChunkIds = new HashSet<>(); 050 private final Multimap<String, ChunkStatusCountValue> myInstanceIdToChunkStatuses = ArrayListMultimap.create(); 051 052 int getTotalChunkCountForInstanceAndStep(String theInstanceId, String theStepId) { 053 return myInstanceIdToChunkStatuses.get(theInstanceId).stream() 054 .filter(chunkCount -> chunkCount.myStepId.equals(theStepId)) 055 .collect(Collectors.toList()) 056 .size(); 057 } 058 059 public List<String> getChunkIdsWithStatus( 060 String theInstanceId, String theStepId, WorkChunkStatusEnum... theStatuses) { 061 return getChunkStatuses(theInstanceId).stream() 062 .filter(t -> t.myStepId.equals(theStepId)) 063 .filter(t -> ArrayUtils.contains(theStatuses, t.myStatus)) 064 .map(t -> t.myChunkId) 065 .collect(Collectors.toList()); 066 } 067 068 @Nonnull 069 private Collection<ChunkStatusCountValue> getChunkStatuses(String theInstanceId) { 070 Collection<ChunkStatusCountValue> chunkStatuses = myInstanceIdToChunkStatuses.get(theInstanceId); 071 chunkStatuses = defaultIfNull(chunkStatuses, emptyList()); 072 return chunkStatuses; 073 } 074 075 public void addChunk(WorkChunk theChunk) { 076 String instanceId = theChunk.getInstanceId(); 077 String chunkId = theChunk.getId(); 078 // Note: If chunks are being written while we're executing, we may see the same chunk twice. This 079 // check avoids adding it twice. 080 if (myConsumedInstanceAndChunkIds.add(instanceId + " " + chunkId)) { 081 ourLog.debug( 082 "Adding chunk to accumulator. [chunkId={}, instanceId={}, status={}, step={}]", 083 chunkId, 084 instanceId, 085 theChunk.getStatus(), 086 theChunk.getTargetStepId()); 087 myInstanceIdToChunkStatuses.put( 088 instanceId, new ChunkStatusCountValue(chunkId, theChunk.getTargetStepId(), theChunk.getStatus())); 089 } 090 } 091 092 private static class ChunkStatusCountValue { 093 public final String myChunkId; 094 public final String myStepId; 095 public final WorkChunkStatusEnum myStatus; 096 097 private ChunkStatusCountValue(String theChunkId, String theStepId, WorkChunkStatusEnum theStatus) { 098 myChunkId = theChunkId; 099 myStepId = theStepId; 100 myStatus = theStatus; 101 } 102 } 103}