001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.bulk.export.svc; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.context.RuntimeSearchParam; 025import ca.uhn.fhir.fhirpath.IFhirPath; 026import ca.uhn.fhir.i18n.Msg; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 031import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap; 032import ca.uhn.fhir.jpa.api.svc.IIdHelperService; 033import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; 034import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; 035import ca.uhn.fhir.jpa.dao.IResultIterator; 036import ca.uhn.fhir.jpa.dao.ISearchBuilder; 037import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 038import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; 039import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchBuilderLoadIncludesParameters; 042import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 043import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 044import ca.uhn.fhir.jpa.util.QueryChunker; 045import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; 046import ca.uhn.fhir.mdm.dao.IMdmLinkDao; 047import ca.uhn.fhir.mdm.model.MdmPidTuple; 048import ca.uhn.fhir.model.api.Include; 049import ca.uhn.fhir.model.primitive.IdDt; 050import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 051import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; 052import ca.uhn.fhir.rest.param.HasOrListParam; 053import ca.uhn.fhir.rest.param.HasParam; 054import ca.uhn.fhir.rest.param.ReferenceOrListParam; 055import ca.uhn.fhir.rest.param.ReferenceParam; 056import ca.uhn.fhir.util.ExtensionUtil; 057import ca.uhn.fhir.util.HapiExtensions; 058import ca.uhn.fhir.util.Logs; 059import ca.uhn.fhir.util.SearchParameterUtil; 060import org.apache.commons.lang3.StringUtils; 061import org.hl7.fhir.instance.model.api.IBaseExtension; 062import org.hl7.fhir.instance.model.api.IBaseReference; 063import org.hl7.fhir.instance.model.api.IBaseResource; 064import org.hl7.fhir.instance.model.api.IIdType; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067import org.springframework.beans.factory.annotation.Autowired; 068 069import java.io.IOException; 070import java.util.ArrayList; 071import java.util.HashMap; 072import java.util.HashSet; 073import java.util.Iterator; 074import java.util.LinkedHashSet; 075import java.util.List; 076import java.util.Map; 077import java.util.Optional; 078import java.util.Set; 079import java.util.stream.Collectors; 080import javax.annotation.Nonnull; 081import javax.persistence.EntityManager; 082 083import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS; 084import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; 085 086public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> { 087 private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class); 088 089 public static final int QUERY_CHUNK_SIZE = 100; 090 public static final List<String> PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES = 091 List.of("Practitioner", "Organization"); 092 093 @Autowired 094 private FhirContext myContext; 095 096 @Autowired 097 private BulkExportHelperService myBulkExportHelperSvc; 098 099 @Autowired 100 private JpaStorageSettings myStorageSettings; 101 102 @Autowired 103 private DaoRegistry myDaoRegistry; 104 105 @Autowired 106 protected SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 107 108 @Autowired 109 private IIdHelperService<JpaPid> myIdHelperService; 110 111 @SuppressWarnings("rawtypes") 112 @Autowired 113 private IMdmLinkDao myMdmLinkDao; 114 115 @Autowired 116 private MdmExpansionCacheSvc myMdmExpansionCacheSvc; 117 118 @Autowired 119 private EntityManager myEntityManager; 120 121 @Autowired 122 private IHapiTransactionService myHapiTransactionService; 123 124 private IFhirPath myFhirPath; 125 126 @Override 127 public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters theParams) { 128 return myHapiTransactionService 129 .withSystemRequest() 130 .withRequestPartitionId(theParams.getPartitionIdOrAllPartitions()) 131 .readOnly() 132 .execute(() -> { 133 String resourceType = theParams.getResourceType(); 134 String jobId = theParams.getInstanceId(); 135 String chunkId = theParams.getChunkId(); 136 RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType); 137 138 LinkedHashSet<JpaPid> pids; 139 if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.PATIENT) { 140 pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, chunkId, def); 141 } else if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.GROUP) { 142 pids = getPidsForGroupStyleExport(theParams, resourceType, def); 143 } else { 144 pids = getPidsForSystemStyleExport(theParams, jobId, chunkId, def); 145 } 146 147 ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size()); 148 return pids.iterator(); 149 }); 150 } 151 152 @SuppressWarnings("unchecked") 153 private LinkedHashSet<JpaPid> getPidsForPatientStyleExport( 154 ExportPIDIteratorParameters theParams, 155 String resourceType, 156 String theJobId, 157 String theChunkId, 158 RuntimeResourceDefinition def) 159 throws IOException { 160 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 161 // Patient 162 if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) { 163 String errorMessage = 164 "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export"; 165 ourLog.error(errorMessage); 166 throw new IllegalStateException(Msg.code(797) + errorMessage); 167 } 168 169 Set<String> patientSearchParams = 170 SearchParameterUtil.getPatientSearchParamsForResourceType(myContext, theParams.getResourceType()); 171 172 for (String patientSearchParam : patientSearchParams) { 173 List<SearchParameterMap> maps = 174 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams, false); 175 for (SearchParameterMap map : maps) { 176 // Ensure users did not monkey with the patient compartment search parameter. 177 validateSearchParametersForPatient(map, theParams); 178 179 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 180 181 filterBySpecificPatient(theParams, resourceType, patientSearchParam, map); 182 183 SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, theJobId); 184 185 Logs.getBatchTroubleshootingLog() 186 .debug( 187 "Executing query for bulk export job[{}] chunk[{}]: {}", 188 theJobId, 189 theChunkId, 190 map.toNormalizedQueryString(myContext)); 191 192 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 193 map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPartitions())) { 194 int pidCount = 0; 195 while (resultIterator.hasNext()) { 196 if (pidCount % 10000 == 0) { 197 Logs.getBatchTroubleshootingLog() 198 .debug( 199 "Bulk export job[{}] chunk[{}] has loaded {} pids", 200 theJobId, 201 theChunkId, 202 pidCount); 203 } 204 pidCount++; 205 pids.add(resultIterator.next()); 206 } 207 } 208 } 209 } 210 return pids; 211 } 212 213 private static void filterBySpecificPatient( 214 ExportPIDIteratorParameters theParams, 215 String resourceType, 216 String patientSearchParam, 217 SearchParameterMap map) { 218 if (resourceType.equalsIgnoreCase("Patient")) { 219 if (theParams.getPatientIds() != null) { 220 ReferenceOrListParam referenceOrListParam = getReferenceOrListParam(theParams); 221 map.add(PARAM_ID, referenceOrListParam); 222 } 223 } else { 224 if (theParams.getPatientIds() != null) { 225 ReferenceOrListParam referenceOrListParam = getReferenceOrListParam(theParams); 226 map.add(patientSearchParam, referenceOrListParam); 227 } else { 228 map.add(patientSearchParam, new ReferenceParam().setMissing(false)); 229 } 230 } 231 } 232 233 @Nonnull 234 private static ReferenceOrListParam getReferenceOrListParam(ExportPIDIteratorParameters theParams) { 235 ReferenceOrListParam referenceOrListParam = new ReferenceOrListParam(); 236 for (String patientId : theParams.getPatientIds()) { 237 referenceOrListParam.addOr(new ReferenceParam(patientId)); 238 } 239 return referenceOrListParam; 240 } 241 242 @SuppressWarnings("unchecked") 243 private LinkedHashSet<JpaPid> getPidsForSystemStyleExport( 244 ExportPIDIteratorParameters theParams, String theJobId, String theChunkId, RuntimeResourceDefinition theDef) 245 throws IOException { 246 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 247 // System 248 List<SearchParameterMap> maps = 249 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true); 250 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 251 252 for (SearchParameterMap map : maps) { 253 Logs.getBatchTroubleshootingLog() 254 .debug( 255 "Executing query for bulk export job[{}] chunk[{}]: {}", 256 theJobId, 257 theChunkId, 258 map.toNormalizedQueryString(myContext)); 259 260 // requires a transaction 261 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 262 map, new SearchRuntimeDetails(null, theJobId), null, theParams.getPartitionIdOrAllPartitions())) { 263 int pidCount = 0; 264 while (resultIterator.hasNext()) { 265 if (pidCount % 10000 == 0) { 266 Logs.getBatchTroubleshootingLog() 267 .debug( 268 "Bulk export job[{}] chunk[{}] has loaded {} pids", 269 theJobId, 270 theChunkId, 271 pidCount); 272 } 273 pidCount++; 274 pids.add(resultIterator.next()); 275 } 276 } 277 } 278 return pids; 279 } 280 281 private LinkedHashSet<JpaPid> getPidsForGroupStyleExport( 282 ExportPIDIteratorParameters theParams, String theResourceType, RuntimeResourceDefinition theDef) 283 throws IOException { 284 LinkedHashSet<JpaPid> pids; 285 286 if (theResourceType.equalsIgnoreCase("Patient")) { 287 ourLog.info("Expanding Patients of a Group Bulk Export."); 288 pids = getExpandedPatientList(theParams); 289 ourLog.info("Obtained {} PIDs", pids.size()); 290 } else if (theResourceType.equalsIgnoreCase("Group")) { 291 pids = getSingletonGroupList(theParams); 292 } else { 293 pids = getRelatedResourceTypePids(theParams, theDef); 294 } 295 return pids; 296 } 297 298 private LinkedHashSet<JpaPid> getRelatedResourceTypePids( 299 ExportPIDIteratorParameters theParams, RuntimeResourceDefinition theDef) throws IOException { 300 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 301 // expand the group pid -> list of patients in that group (list of patient pids) 302 Set<JpaPid> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams); 303 assert !expandedMemberResourceIds.isEmpty(); 304 Logs.getBatchTroubleshootingLog() 305 .debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds); 306 307 // for each patient pid -> 308 // search for the target resources, with their correct patient references, chunked. 309 // The results will be jammed into myReadPids 310 QueryChunker<JpaPid> queryChunker = new QueryChunker<>(); 311 queryChunker.chunk(expandedMemberResourceIds, QUERY_CHUNK_SIZE, (idChunk) -> { 312 try { 313 queryResourceTypeWithReferencesToPatients(pids, idChunk, theParams, theDef); 314 } catch (IOException ex) { 315 // we will never see this; 316 // SearchBuilder#QueryIterator does not (nor can ever) throw 317 // an IOException... but Java requires the check, 318 // so we'll put a log here (just in the off chance) 319 ourLog.error("Couldn't close query iterator ", ex); 320 throw new RuntimeException(Msg.code(2346) + "Couldn't close query iterator", ex); 321 } 322 }); 323 return pids; 324 } 325 326 private LinkedHashSet<JpaPid> getSingletonGroupList(ExportPIDIteratorParameters theParams) { 327 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 328 IBaseResource group = myDaoRegistry 329 .getResourceDao("Group") 330 .read(new IdDt(theParams.getGroupId()), new SystemRequestDetails().setRequestPartitionId(partitionId)); 331 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 332 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 333 pids.add(pidOrNull); 334 return pids; 335 } 336 337 /** 338 * Get a ISearchBuilder for the given resource type. 339 */ 340 protected ISearchBuilder<JpaPid> getSearchBuilderForResourceType(String theResourceType) { 341 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType); 342 RuntimeResourceDefinition def = myContext.getResourceDefinition(theResourceType); 343 Class<? extends IBaseResource> typeClass = def.getImplementingClass(); 344 return mySearchBuilderFactory.newSearchBuilder(dao, theResourceType, typeClass); 345 } 346 347 protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType(String theResourceType) { 348 RuntimeSearchParam searchParam = null; 349 Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType = 350 SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResourceType); 351 if (onlyPatientSearchParamForResourceType.isPresent()) { 352 searchParam = onlyPatientSearchParamForResourceType.get(); 353 } 354 return searchParam; 355 } 356 357 @Override 358 public void expandMdmResources(List<IBaseResource> theResources) { 359 for (IBaseResource resource : theResources) { 360 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(resource.fhirType())) { 361 annotateBackwardsReferences(resource); 362 } 363 } 364 } 365 366 /** 367 * For Patient 368 **/ 369 private RuntimeSearchParam validateSearchParametersForPatient( 370 SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 371 RuntimeSearchParam runtimeSearchParam = 372 getPatientSearchParamForCurrentResourceType(theParams.getResourceType()); 373 if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { 374 throw new IllegalArgumentException(Msg.code(796) 375 + String.format( 376 "Patient Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", 377 runtimeSearchParam.getName())); 378 } 379 return runtimeSearchParam; 380 } 381 382 /** 383 * for group exports 384 **/ 385 private void validateSearchParametersForGroup(SearchParameterMap expandedSpMap, String theResourceType) { 386 // we only validate for certain types 387 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theResourceType)) { 388 RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType(theResourceType); 389 if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { 390 throw new IllegalArgumentException(Msg.code(792) 391 + String.format( 392 "Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", 393 runtimeSearchParam.getName())); 394 } 395 } 396 } 397 398 /** 399 * In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members, 400 * possibly expanded by MDM, and don't have to go and fetch other resource DAOs. 401 */ 402 @SuppressWarnings("unchecked") 403 private LinkedHashSet<JpaPid> getExpandedPatientList(ExportPIDIteratorParameters theParameters) throws IOException { 404 List<JpaPid> members = getMembersFromGroupWithFilter(theParameters, true); 405 List<IIdType> ids = 406 members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); 407 ourLog.info("While extracting patients from a group, we found {} patients.", ids.size()); 408 ourLog.info("Found patients: {}", ids.stream().map(id -> id.getValue()).collect(Collectors.joining(", "))); 409 410 List<JpaPid> pidsOrThrowException = members; 411 LinkedHashSet<JpaPid> patientPidsToExport = new LinkedHashSet<>(pidsOrThrowException); 412 413 if (theParameters.isExpandMdm()) { 414 RequestPartitionId partitionId = theParameters.getPartitionIdOrAllPartitions(); 415 SystemRequestDetails srd = new SystemRequestDetails().setRequestPartitionId(partitionId); 416 IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParameters.getGroupId()), srd); 417 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 418 List<MdmPidTuple<JpaPid>> goldenPidSourcePidTuple = 419 myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); 420 goldenPidSourcePidTuple.forEach(tuple -> { 421 patientPidsToExport.add(tuple.getGoldenPid()); 422 patientPidsToExport.add(tuple.getSourcePid()); 423 }); 424 populateMdmResourceCache(goldenPidSourcePidTuple); 425 } 426 return patientPidsToExport; 427 } 428 429 /** 430 * Given the parameters, find all members' patient references in the group with the typeFilter applied. 431 * 432 * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] 433 */ 434 @SuppressWarnings("unchecked") 435 private List<JpaPid> getMembersFromGroupWithFilter( 436 ExportPIDIteratorParameters theParameters, boolean theConsiderSince) throws IOException { 437 RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); 438 List<JpaPid> resPids = new ArrayList<>(); 439 440 List<SearchParameterMap> maps = 441 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParameters, theConsiderSince); 442 443 maps.forEach(map -> addMembershipToGroupClause(map, theParameters.getGroupId())); 444 445 for (SearchParameterMap map : maps) { 446 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType("Patient"); 447 ourLog.debug( 448 "Searching for members of group {} with job instance {} with map {}", 449 theParameters.getGroupId(), 450 theParameters.getInstanceId(), 451 map); 452 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 453 map, 454 new SearchRuntimeDetails(null, theParameters.getInstanceId()), 455 null, 456 theParameters.getPartitionIdOrAllPartitions())) { 457 458 while (resultIterator.hasNext()) { 459 resPids.add(resultIterator.next()); 460 } 461 } 462 } 463 return resPids; 464 } 465 466 /** 467 * This method takes an {@link SearchParameterMap} and adds a clause to it that will filter the search results to only 468 * return members of the defined group. 469 * 470 * @param theMap the map to add the clause to. 471 * @param theGroupId the group ID to filter by. 472 */ 473 private void addMembershipToGroupClause(SearchParameterMap theMap, String theGroupId) { 474 HasOrListParam hasOrListParam = new HasOrListParam(); 475 hasOrListParam.addOr(new HasParam("Group", "member", "_id", theGroupId)); 476 theMap.add(PARAM_HAS, hasOrListParam); 477 } 478 479 /** 480 * @param thePidTuples 481 */ 482 @SuppressWarnings({"unchecked", "rawtypes"}) 483 private void populateMdmResourceCache(List<MdmPidTuple<JpaPid>> thePidTuples) { 484 if (myMdmExpansionCacheSvc.hasBeenPopulated()) { 485 return; 486 } 487 // First, convert this zipped set of tuples to a map of 488 // { 489 // patient/gold-1 -> [patient/1, patient/2] 490 // patient/gold-2 -> [patient/3, patient/4] 491 // } 492 Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>(); 493 extract(thePidTuples, goldenResourceToSourcePidMap); 494 495 // Next, lets convert it to an inverted index for fast lookup 496 // { 497 // patient/1 -> patient/gold-1 498 // patient/2 -> patient/gold-1 499 // patient/3 -> patient/gold-2 500 // patient/4 -> patient/gold-2 501 // } 502 Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); 503 goldenResourceToSourcePidMap.forEach((key, value) -> { 504 String goldenResourceId = 505 myIdHelperService.translatePidIdToForcedIdWithCache(key).orElse(key.toString()); 506 PersistentIdToForcedIdMap pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value); 507 508 Set<String> sourceResourceIds = pidsToForcedIds.getResolvedResourceIds(); 509 510 sourceResourceIds.forEach( 511 sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); 512 }); 513 514 // Now that we have built our cached expansion, store it. 515 myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); 516 } 517 518 private void extract( 519 List<MdmPidTuple<JpaPid>> theGoldenPidTargetPidTuples, 520 Map<JpaPid, Set<JpaPid>> theGoldenResourceToSourcePidMap) { 521 for (MdmPidTuple<JpaPid> goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) { 522 JpaPid goldenPid = goldenPidTargetPidTuple.getGoldenPid(); 523 JpaPid sourcePid = goldenPidTargetPidTuple.getSourcePid(); 524 theGoldenResourceToSourcePidMap 525 .computeIfAbsent(goldenPid, key -> new HashSet<>()) 526 .add(sourcePid); 527 } 528 } 529 530 // gets all the resources related to each patient provided in the list of thePatientPids 531 @SuppressWarnings("unchecked") 532 private void queryResourceTypeWithReferencesToPatients( 533 Set<JpaPid> theReadPids, 534 List<JpaPid> thePatientPids, 535 ExportPIDIteratorParameters theParams, 536 RuntimeResourceDefinition theDef) 537 throws IOException { 538 539 // Convert Resource Persistent IDs to actual client IDs. 540 Set<JpaPid> pidSet = new HashSet<>(thePatientPids); 541 Set<String> patientIds = myIdHelperService.translatePidsToFhirResourceIds(pidSet); 542 543 // Build SP map 544 // First, inject the _typeFilters and _since from the export job 545 List<SearchParameterMap> expandedSpMaps = 546 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true); 547 for (SearchParameterMap expandedSpMap : expandedSpMaps) { 548 549 // Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we 550 // need to manually set that. 551 validateSearchParametersForGroup(expandedSpMap, theParams.getResourceType()); 552 553 // Fetch and cache a search builder for this resource type 554 // filter by ResourceType 555 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 556 557 // Now, further filter the query with patient references defined by the chunk of IDs we have. 558 // filter by PatientIds 559 if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theParams.getResourceType())) { 560 filterSearchByHasParam(patientIds, expandedSpMap, theParams); 561 } else { 562 filterSearchByResourceIds(patientIds, expandedSpMap, theParams); 563 } 564 565 // Execute query and all found pids to our local iterator. 566 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 567 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 568 expandedSpMap, new SearchRuntimeDetails(null, theParams.getInstanceId()), null, partitionId)) { 569 while (resultIterator.hasNext()) { 570 theReadPids.add(resultIterator.next()); 571 } 572 } 573 574 // Construct our Includes filter 575 // We use this to recursively fetch resources of interest 576 // (but should only request those the user has requested/can see) 577 Set<Include> includes = new HashSet<>(); 578 for (String resourceType : theParams.getRequestedResourceTypes()) { 579 includes.add(new Include(resourceType + ":*", true)); 580 } 581 582 SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId); 583 SearchBuilderLoadIncludesParameters<JpaPid> loadIncludesParameters = 584 new SearchBuilderLoadIncludesParameters<>(); 585 loadIncludesParameters.setFhirContext(myContext); 586 loadIncludesParameters.setMatches(theReadPids); 587 loadIncludesParameters.setEntityManager(myEntityManager); 588 loadIncludesParameters.setRequestDetails(requestDetails); 589 loadIncludesParameters.setIncludeFilters(includes); 590 loadIncludesParameters.setReverseMode(false); 591 loadIncludesParameters.setLastUpdated(expandedSpMap.getLastUpdated()); 592 loadIncludesParameters.setSearchIdOrDescription(theParams.getInstanceId()); 593 loadIncludesParameters.setDesiredResourceTypes(theParams.getRequestedResourceTypes()); 594 Set<JpaPid> includeIds = searchBuilder.loadIncludes(loadIncludesParameters); 595 596 // gets rid of the Patient duplicates 597 theReadPids.addAll(includeIds.stream() 598 .filter((id) -> !id.getResourceType().equals("Patient")) 599 .collect(Collectors.toSet())); 600 } 601 } 602 603 /** 604 * Must not be called for resources types listed in PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES 605 * 606 * @param idChunk 607 * @param expandedSpMap 608 * @param theParams 609 */ 610 private void filterSearchByResourceIds( 611 Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 612 ReferenceOrListParam orList = new ReferenceOrListParam(); 613 idChunk.forEach(id -> orList.add(new ReferenceParam(id))); 614 RuntimeSearchParam patientSearchParamForCurrentResourceType = 615 getPatientSearchParamForCurrentResourceType(theParams.getResourceType()); 616 expandedSpMap.add(patientSearchParamForCurrentResourceType.getName(), orList); 617 } 618 619 /** 620 * @param idChunk 621 * @param expandedSpMap 622 */ 623 private void filterSearchByHasParam( 624 Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 625 HasOrListParam hasOrListParam = new HasOrListParam(); 626 idChunk.stream().forEach(id -> hasOrListParam.addOr(buildHasParam(id, theParams.getResourceType()))); 627 expandedSpMap.add("_has", hasOrListParam); 628 } 629 630 private HasParam buildHasParam(String theResourceId, String theResourceType) { 631 if ("Practitioner".equalsIgnoreCase(theResourceType)) { 632 return new HasParam("Patient", "general-practitioner", "_id", theResourceId); 633 } else if ("Organization".equalsIgnoreCase(theResourceType)) { 634 return new HasParam("Patient", "organization", "_id", theResourceId); 635 } else { 636 throw new IllegalArgumentException( 637 Msg.code(2077) + " We can't handle forward references onto type " + theResourceType); 638 } 639 } 640 641 /** 642 * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients. 643 * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched 644 * patients. 645 * 646 * @return a Set of Strings representing the resource IDs of all members of a group. 647 */ 648 private Set<JpaPid> expandAllPatientPidsFromGroup(ExportPIDIteratorParameters theParams) throws IOException { 649 Set<JpaPid> expandedIds = new HashSet<>(); 650 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 651 SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId); 652 IBaseResource group = 653 myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), requestDetails); 654 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 655 656 // Attempt to perform MDM Expansion of membership 657 if (theParams.isExpandMdm()) { 658 expandedIds.addAll(performMembershipExpansionViaMdmTable(pidOrNull)); 659 } 660 661 // Now manually add the members of the group (its possible even with mdm expansion that some members dont have 662 // MDM matches, 663 // so would be otherwise skipped 664 List<JpaPid> membersFromGroupWithFilter = getMembersFromGroupWithFilter(theParams, false); 665 ourLog.debug("Group with ID [{}] has been expanded to: {}", theParams.getGroupId(), membersFromGroupWithFilter); 666 expandedIds.addAll(membersFromGroupWithFilter); 667 668 return expandedIds; 669 } 670 671 @SuppressWarnings({"rawtypes", "unchecked"}) 672 private Set<JpaPid> performMembershipExpansionViaMdmTable(JpaPid pidOrNull) { 673 List<MdmPidTuple<JpaPid>> goldenPidTargetPidTuples = 674 myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); 675 // Now lets translate these pids into resource IDs 676 Set<JpaPid> uniquePids = new HashSet<>(); 677 goldenPidTargetPidTuples.forEach(tuple -> { 678 uniquePids.add(tuple.getGoldenPid()); 679 uniquePids.add(tuple.getSourcePid()); 680 }); 681 PersistentIdToForcedIdMap pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); 682 683 Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>(); 684 extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap); 685 populateMdmResourceCache(goldenPidTargetPidTuples); 686 687 return uniquePids; 688 } 689 690 /* Mdm Expansion */ 691 692 private RuntimeSearchParam getRuntimeSearchParam(IBaseResource theResource) { 693 Optional<RuntimeSearchParam> oPatientSearchParam = 694 SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResource.fhirType()); 695 if (!oPatientSearchParam.isPresent()) { 696 String errorMessage = String.format( 697 "[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", 698 theResource.fhirType()); 699 throw new IllegalArgumentException(Msg.code(2242) + errorMessage); 700 } else { 701 return oPatientSearchParam.get(); 702 } 703 } 704 705 private void annotateBackwardsReferences(IBaseResource iBaseResource) { 706 Optional<String> patientReference = getPatientReference(iBaseResource); 707 if (patientReference.isPresent()) { 708 addGoldenResourceExtension(iBaseResource, patientReference.get()); 709 } else { 710 ourLog.error( 711 "Failed to find the patient reference information for resource {}. This is a bug, " 712 + "as all resources which can be exported via Group Bulk Export must reference a patient.", 713 iBaseResource); 714 } 715 } 716 717 private Optional<String> getPatientReference(IBaseResource iBaseResource) { 718 String fhirPath; 719 720 RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource); 721 fhirPath = getPatientFhirPath(runtimeSearchParam); 722 723 if (iBaseResource.fhirType().equalsIgnoreCase("Patient")) { 724 return Optional.of(iBaseResource.getIdElement().getIdPart()); 725 } else { 726 Optional<IBaseReference> optionalReference = 727 getFhirParser().evaluateFirst(iBaseResource, fhirPath, IBaseReference.class); 728 if (optionalReference.isPresent()) { 729 return optionalReference.map(theIBaseReference -> 730 theIBaseReference.getReferenceElement().getIdPart()); 731 } else { 732 return Optional.empty(); 733 } 734 } 735 } 736 737 private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) { 738 String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId); 739 IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension( 740 iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); 741 if (!StringUtils.isBlank(goldenResourceId)) { 742 ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); 743 } 744 } 745 746 private String prefixPatient(String theResourceId) { 747 return "Patient/" + theResourceId; 748 } 749 750 private IFhirPath getFhirParser() { 751 if (myFhirPath == null) { 752 myFhirPath = myContext.newFhirPath(); 753 } 754 return myFhirPath; 755 } 756 757 private String getPatientFhirPath(RuntimeSearchParam theRuntimeParam) { 758 String path = theRuntimeParam.getPath(); 759 // GGG: Yes this is a stupid hack, but by default this runtime search param will return stuff like 760 // Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play 761 // nicely with 762 // our FHIRPath evaluator. 763 if (path.contains(".where")) { 764 path = path.substring(0, path.indexOf(".where")); 765 } 766 return path; 767 } 768}