001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.bulk.export.svc;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.context.RuntimeResourceDefinition;
024import ca.uhn.fhir.context.RuntimeSearchParam;
025import ca.uhn.fhir.fhirpath.IFhirPath;
026import ca.uhn.fhir.i18n.Msg;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
030import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
031import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap;
032import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
033import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor;
034import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters;
035import ca.uhn.fhir.jpa.dao.IResultIterator;
036import ca.uhn.fhir.jpa.dao.ISearchBuilder;
037import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
038import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
039import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
040import ca.uhn.fhir.jpa.model.dao.JpaPid;
041import ca.uhn.fhir.jpa.model.search.SearchBuilderLoadIncludesParameters;
042import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
043import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
044import ca.uhn.fhir.jpa.util.QueryChunker;
045import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
046import ca.uhn.fhir.mdm.dao.IMdmLinkDao;
047import ca.uhn.fhir.mdm.model.MdmPidTuple;
048import ca.uhn.fhir.model.api.Include;
049import ca.uhn.fhir.model.primitive.IdDt;
050import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
051import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters;
052import ca.uhn.fhir.rest.param.HasOrListParam;
053import ca.uhn.fhir.rest.param.HasParam;
054import ca.uhn.fhir.rest.param.ReferenceOrListParam;
055import ca.uhn.fhir.rest.param.ReferenceParam;
056import ca.uhn.fhir.util.ExtensionUtil;
057import ca.uhn.fhir.util.HapiExtensions;
058import ca.uhn.fhir.util.Logs;
059import ca.uhn.fhir.util.SearchParameterUtil;
060import org.apache.commons.lang3.StringUtils;
061import org.hl7.fhir.instance.model.api.IBaseExtension;
062import org.hl7.fhir.instance.model.api.IBaseReference;
063import org.hl7.fhir.instance.model.api.IBaseResource;
064import org.hl7.fhir.instance.model.api.IIdType;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067import org.springframework.beans.factory.annotation.Autowired;
068
069import java.io.IOException;
070import java.util.ArrayList;
071import java.util.HashMap;
072import java.util.HashSet;
073import java.util.Iterator;
074import java.util.LinkedHashSet;
075import java.util.List;
076import java.util.Map;
077import java.util.Optional;
078import java.util.Set;
079import java.util.stream.Collectors;
080import javax.annotation.Nonnull;
081import javax.persistence.EntityManager;
082
083import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS;
084import static ca.uhn.fhir.rest.api.Constants.PARAM_ID;
085
086public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
087        private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class);
088
089        public static final int QUERY_CHUNK_SIZE = 100;
090        public static final List<String> PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES =
091                        List.of("Practitioner", "Organization");
092
093        @Autowired
094        private FhirContext myContext;
095
096        @Autowired
097        private BulkExportHelperService myBulkExportHelperSvc;
098
099        @Autowired
100        private JpaStorageSettings myStorageSettings;
101
102        @Autowired
103        private DaoRegistry myDaoRegistry;
104
105        @Autowired
106        protected SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
107
108        @Autowired
109        private IIdHelperService<JpaPid> myIdHelperService;
110
111        @SuppressWarnings("rawtypes")
112        @Autowired
113        private IMdmLinkDao myMdmLinkDao;
114
115        @Autowired
116        private MdmExpansionCacheSvc myMdmExpansionCacheSvc;
117
118        @Autowired
119        private EntityManager myEntityManager;
120
121        @Autowired
122        private IHapiTransactionService myHapiTransactionService;
123
124        private IFhirPath myFhirPath;
125
126        @Override
127        public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters theParams) {
128                return myHapiTransactionService
129                                .withSystemRequest()
130                                .withRequestPartitionId(theParams.getPartitionIdOrAllPartitions())
131                                .readOnly()
132                                .execute(() -> {
133                                        String resourceType = theParams.getResourceType();
134                                        String jobId = theParams.getInstanceId();
135                                        String chunkId = theParams.getChunkId();
136                                        RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
137
138                                        LinkedHashSet<JpaPid> pids;
139                                        if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.PATIENT) {
140                                                pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, chunkId, def);
141                                        } else if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.GROUP) {
142                                                pids = getPidsForGroupStyleExport(theParams, resourceType, def);
143                                        } else {
144                                                pids = getPidsForSystemStyleExport(theParams, jobId, chunkId, def);
145                                        }
146
147                                        ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size());
148                                        return pids.iterator();
149                                });
150        }
151
152        @SuppressWarnings("unchecked")
153        private LinkedHashSet<JpaPid> getPidsForPatientStyleExport(
154                        ExportPIDIteratorParameters theParams,
155                        String resourceType,
156                        String theJobId,
157                        String theChunkId,
158                        RuntimeResourceDefinition def)
159                        throws IOException {
160                LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
161                // Patient
162                if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) {
163                        String errorMessage =
164                                        "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export";
165                        ourLog.error(errorMessage);
166                        throw new IllegalStateException(Msg.code(797) + errorMessage);
167                }
168
169                Set<String> patientSearchParams =
170                                SearchParameterUtil.getPatientSearchParamsForResourceType(myContext, theParams.getResourceType());
171
172                for (String patientSearchParam : patientSearchParams) {
173                        List<SearchParameterMap> maps =
174                                        myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams, false);
175                        for (SearchParameterMap map : maps) {
176                                // Ensure users did not monkey with the patient compartment search parameter.
177                                validateSearchParametersForPatient(map, theParams);
178
179                                ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
180
181                                filterBySpecificPatient(theParams, resourceType, patientSearchParam, map);
182
183                                SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, theJobId);
184
185                                Logs.getBatchTroubleshootingLog()
186                                                .debug(
187                                                                "Executing query for bulk export job[{}] chunk[{}]: {}",
188                                                                theJobId,
189                                                                theChunkId,
190                                                                map.toNormalizedQueryString(myContext));
191
192                                try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(
193                                                map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPartitions())) {
194                                        int pidCount = 0;
195                                        while (resultIterator.hasNext()) {
196                                                if (pidCount % 10000 == 0) {
197                                                        Logs.getBatchTroubleshootingLog()
198                                                                        .debug(
199                                                                                        "Bulk export job[{}] chunk[{}] has loaded {} pids",
200                                                                                        theJobId,
201                                                                                        theChunkId,
202                                                                                        pidCount);
203                                                }
204                                                pidCount++;
205                                                pids.add(resultIterator.next());
206                                        }
207                                }
208                        }
209                }
210                return pids;
211        }
212
213        private static void filterBySpecificPatient(
214                        ExportPIDIteratorParameters theParams,
215                        String resourceType,
216                        String patientSearchParam,
217                        SearchParameterMap map) {
218                if (resourceType.equalsIgnoreCase("Patient")) {
219                        if (theParams.getPatientIds() != null) {
220                                ReferenceOrListParam referenceOrListParam = getReferenceOrListParam(theParams);
221                                map.add(PARAM_ID, referenceOrListParam);
222                        }
223                } else {
224                        if (theParams.getPatientIds() != null) {
225                                ReferenceOrListParam referenceOrListParam = getReferenceOrListParam(theParams);
226                                map.add(patientSearchParam, referenceOrListParam);
227                        } else {
228                                map.add(patientSearchParam, new ReferenceParam().setMissing(false));
229                        }
230                }
231        }
232
233        @Nonnull
234        private static ReferenceOrListParam getReferenceOrListParam(ExportPIDIteratorParameters theParams) {
235                ReferenceOrListParam referenceOrListParam = new ReferenceOrListParam();
236                for (String patientId : theParams.getPatientIds()) {
237                        referenceOrListParam.addOr(new ReferenceParam(patientId));
238                }
239                return referenceOrListParam;
240        }
241
242        @SuppressWarnings("unchecked")
243        private LinkedHashSet<JpaPid> getPidsForSystemStyleExport(
244                        ExportPIDIteratorParameters theParams, String theJobId, String theChunkId, RuntimeResourceDefinition theDef)
245                        throws IOException {
246                LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
247                // System
248                List<SearchParameterMap> maps =
249                                myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true);
250                ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
251
252                for (SearchParameterMap map : maps) {
253                        Logs.getBatchTroubleshootingLog()
254                                        .debug(
255                                                        "Executing query for bulk export job[{}] chunk[{}]: {}",
256                                                        theJobId,
257                                                        theChunkId,
258                                                        map.toNormalizedQueryString(myContext));
259
260                        // requires a transaction
261                        try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(
262                                        map, new SearchRuntimeDetails(null, theJobId), null, theParams.getPartitionIdOrAllPartitions())) {
263                                int pidCount = 0;
264                                while (resultIterator.hasNext()) {
265                                        if (pidCount % 10000 == 0) {
266                                                Logs.getBatchTroubleshootingLog()
267                                                                .debug(
268                                                                                "Bulk export job[{}] chunk[{}] has loaded {} pids",
269                                                                                theJobId,
270                                                                                theChunkId,
271                                                                                pidCount);
272                                        }
273                                        pidCount++;
274                                        pids.add(resultIterator.next());
275                                }
276                        }
277                }
278                return pids;
279        }
280
281        private LinkedHashSet<JpaPid> getPidsForGroupStyleExport(
282                        ExportPIDIteratorParameters theParams, String theResourceType, RuntimeResourceDefinition theDef)
283                        throws IOException {
284                LinkedHashSet<JpaPid> pids;
285
286                if (theResourceType.equalsIgnoreCase("Patient")) {
287                        ourLog.info("Expanding Patients of a Group Bulk Export.");
288                        pids = getExpandedPatientList(theParams);
289                        ourLog.info("Obtained {} PIDs", pids.size());
290                } else if (theResourceType.equalsIgnoreCase("Group")) {
291                        pids = getSingletonGroupList(theParams);
292                } else {
293                        pids = getRelatedResourceTypePids(theParams, theDef);
294                }
295                return pids;
296        }
297
298        private LinkedHashSet<JpaPid> getRelatedResourceTypePids(
299                        ExportPIDIteratorParameters theParams, RuntimeResourceDefinition theDef) throws IOException {
300                LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
301                // expand the group pid -> list of patients in that group (list of patient pids)
302                Set<JpaPid> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams);
303                assert !expandedMemberResourceIds.isEmpty();
304                Logs.getBatchTroubleshootingLog()
305                                .debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds);
306
307                // for each patient pid ->
308                //      search for the target resources, with their correct patient references, chunked.
309                // The results will be jammed into myReadPids
310                QueryChunker<JpaPid> queryChunker = new QueryChunker<>();
311                queryChunker.chunk(expandedMemberResourceIds, QUERY_CHUNK_SIZE, (idChunk) -> {
312                        try {
313                                queryResourceTypeWithReferencesToPatients(pids, idChunk, theParams, theDef);
314                        } catch (IOException ex) {
315                                // we will never see this;
316                                // SearchBuilder#QueryIterator does not (nor can ever) throw
317                                // an IOException... but Java requires the check,
318                                // so we'll put a log here (just in the off chance)
319                                ourLog.error("Couldn't close query iterator ", ex);
320                                throw new RuntimeException(Msg.code(2346) + "Couldn't close query iterator", ex);
321                        }
322                });
323                return pids;
324        }
325
326        private LinkedHashSet<JpaPid> getSingletonGroupList(ExportPIDIteratorParameters theParams) {
327                RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions();
328                IBaseResource group = myDaoRegistry
329                                .getResourceDao("Group")
330                                .read(new IdDt(theParams.getGroupId()), new SystemRequestDetails().setRequestPartitionId(partitionId));
331                JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group);
332                LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
333                pids.add(pidOrNull);
334                return pids;
335        }
336
337        /**
338         * Get a ISearchBuilder for the given resource type.
339         */
340        protected ISearchBuilder<JpaPid> getSearchBuilderForResourceType(String theResourceType) {
341                IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType);
342                RuntimeResourceDefinition def = myContext.getResourceDefinition(theResourceType);
343                Class<? extends IBaseResource> typeClass = def.getImplementingClass();
344                return mySearchBuilderFactory.newSearchBuilder(dao, theResourceType, typeClass);
345        }
346
347        protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType(String theResourceType) {
348                RuntimeSearchParam searchParam = null;
349                Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType =
350                                SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResourceType);
351                if (onlyPatientSearchParamForResourceType.isPresent()) {
352                        searchParam = onlyPatientSearchParamForResourceType.get();
353                }
354                return searchParam;
355        }
356
357        @Override
358        public void expandMdmResources(List<IBaseResource> theResources) {
359                for (IBaseResource resource : theResources) {
360                        if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(resource.fhirType())) {
361                                annotateBackwardsReferences(resource);
362                        }
363                }
364        }
365
366        /**
367         * For Patient
368         **/
369        private RuntimeSearchParam validateSearchParametersForPatient(
370                        SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
371                RuntimeSearchParam runtimeSearchParam =
372                                getPatientSearchParamForCurrentResourceType(theParams.getResourceType());
373                if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
374                        throw new IllegalArgumentException(Msg.code(796)
375                                        + String.format(
376                                                        "Patient Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!",
377                                                        runtimeSearchParam.getName()));
378                }
379                return runtimeSearchParam;
380        }
381
382        /**
383         * for group exports
384         **/
385        private void validateSearchParametersForGroup(SearchParameterMap expandedSpMap, String theResourceType) {
386                // we only validate for certain types
387                if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theResourceType)) {
388                        RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType(theResourceType);
389                        if (expandedSpMap.get(runtimeSearchParam.getName()) != null) {
390                                throw new IllegalArgumentException(Msg.code(792)
391                                                + String.format(
392                                                                "Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!",
393                                                                runtimeSearchParam.getName()));
394                        }
395                }
396        }
397
398        /**
399         * In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members,
400         * possibly expanded by MDM, and don't have to go and fetch other resource DAOs.
401         */
402        @SuppressWarnings("unchecked")
403        private LinkedHashSet<JpaPid> getExpandedPatientList(ExportPIDIteratorParameters theParameters) throws IOException {
404                List<JpaPid> members = getMembersFromGroupWithFilter(theParameters, true);
405                List<IIdType> ids =
406                                members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList());
407                ourLog.info("While extracting patients from a group, we found {} patients.", ids.size());
408                ourLog.info("Found patients: {}", ids.stream().map(id -> id.getValue()).collect(Collectors.joining(", ")));
409
410                List<JpaPid> pidsOrThrowException = members;
411                LinkedHashSet<JpaPid> patientPidsToExport = new LinkedHashSet<>(pidsOrThrowException);
412
413                if (theParameters.isExpandMdm()) {
414                        RequestPartitionId partitionId = theParameters.getPartitionIdOrAllPartitions();
415                        SystemRequestDetails srd = new SystemRequestDetails().setRequestPartitionId(partitionId);
416                        IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParameters.getGroupId()), srd);
417                        JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group);
418                        List<MdmPidTuple<JpaPid>> goldenPidSourcePidTuple =
419                                        myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
420                        goldenPidSourcePidTuple.forEach(tuple -> {
421                                patientPidsToExport.add(tuple.getGoldenPid());
422                                patientPidsToExport.add(tuple.getSourcePid());
423                        });
424                        populateMdmResourceCache(goldenPidSourcePidTuple);
425                }
426                return patientPidsToExport;
427        }
428
429        /**
430         * Given the parameters, find all members' patient references in the group with the typeFilter applied.
431         *
432         * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"]
433         */
434        @SuppressWarnings("unchecked")
435        private List<JpaPid> getMembersFromGroupWithFilter(
436                        ExportPIDIteratorParameters theParameters, boolean theConsiderSince) throws IOException {
437                RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient");
438                List<JpaPid> resPids = new ArrayList<>();
439
440                List<SearchParameterMap> maps =
441                                myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParameters, theConsiderSince);
442
443                maps.forEach(map -> addMembershipToGroupClause(map, theParameters.getGroupId()));
444
445                for (SearchParameterMap map : maps) {
446                        ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType("Patient");
447                        ourLog.debug(
448                                        "Searching for members of group {} with job instance {} with map {}",
449                                        theParameters.getGroupId(),
450                                        theParameters.getInstanceId(),
451                                        map);
452                        try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(
453                                        map,
454                                        new SearchRuntimeDetails(null, theParameters.getInstanceId()),
455                                        null,
456                                        theParameters.getPartitionIdOrAllPartitions())) {
457
458                                while (resultIterator.hasNext()) {
459                                        resPids.add(resultIterator.next());
460                                }
461                        }
462                }
463                return resPids;
464        }
465
466        /**
467         * This method takes an {@link SearchParameterMap} and adds a clause to it that will filter the search results to only
468         * return members of the defined group.
469         *
470         * @param theMap     the map to add the clause to.
471         * @param theGroupId the group ID to filter by.
472         */
473        private void addMembershipToGroupClause(SearchParameterMap theMap, String theGroupId) {
474                HasOrListParam hasOrListParam = new HasOrListParam();
475                hasOrListParam.addOr(new HasParam("Group", "member", "_id", theGroupId));
476                theMap.add(PARAM_HAS, hasOrListParam);
477        }
478
479        /**
480         * @param thePidTuples
481         */
482        @SuppressWarnings({"unchecked", "rawtypes"})
483        private void populateMdmResourceCache(List<MdmPidTuple<JpaPid>> thePidTuples) {
484                if (myMdmExpansionCacheSvc.hasBeenPopulated()) {
485                        return;
486                }
487                // First, convert this zipped set of tuples to a map of
488                // {
489                //   patient/gold-1 -> [patient/1, patient/2]
490                //   patient/gold-2 -> [patient/3, patient/4]
491                // }
492                Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>();
493                extract(thePidTuples, goldenResourceToSourcePidMap);
494
495                // Next, lets convert it to an inverted index for fast lookup
496                // {
497                //   patient/1 -> patient/gold-1
498                //   patient/2 -> patient/gold-1
499                //   patient/3 -> patient/gold-2
500                //   patient/4 -> patient/gold-2
501                // }
502                Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>();
503                goldenResourceToSourcePidMap.forEach((key, value) -> {
504                        String goldenResourceId =
505                                        myIdHelperService.translatePidIdToForcedIdWithCache(key).orElse(key.toString());
506                        PersistentIdToForcedIdMap pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value);
507
508                        Set<String> sourceResourceIds = pidsToForcedIds.getResolvedResourceIds();
509
510                        sourceResourceIds.forEach(
511                                        sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId));
512                });
513
514                // Now that we have built our cached expansion, store it.
515                myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap);
516        }
517
518        private void extract(
519                        List<MdmPidTuple<JpaPid>> theGoldenPidTargetPidTuples,
520                        Map<JpaPid, Set<JpaPid>> theGoldenResourceToSourcePidMap) {
521                for (MdmPidTuple<JpaPid> goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) {
522                        JpaPid goldenPid = goldenPidTargetPidTuple.getGoldenPid();
523                        JpaPid sourcePid = goldenPidTargetPidTuple.getSourcePid();
524                        theGoldenResourceToSourcePidMap
525                                        .computeIfAbsent(goldenPid, key -> new HashSet<>())
526                                        .add(sourcePid);
527                }
528        }
529
530        // gets all the resources related to each patient provided in the list of thePatientPids
531        @SuppressWarnings("unchecked")
532        private void queryResourceTypeWithReferencesToPatients(
533                        Set<JpaPid> theReadPids,
534                        List<JpaPid> thePatientPids,
535                        ExportPIDIteratorParameters theParams,
536                        RuntimeResourceDefinition theDef)
537                        throws IOException {
538
539                // Convert Resource Persistent IDs to actual client IDs.
540                Set<JpaPid> pidSet = new HashSet<>(thePatientPids);
541                Set<String> patientIds = myIdHelperService.translatePidsToFhirResourceIds(pidSet);
542
543                // Build SP map
544                // First, inject the _typeFilters and _since from the export job
545                List<SearchParameterMap> expandedSpMaps =
546                                myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true);
547                for (SearchParameterMap expandedSpMap : expandedSpMaps) {
548
549                        // Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we
550                        // need to manually set that.
551                        validateSearchParametersForGroup(expandedSpMap, theParams.getResourceType());
552
553                        // Fetch and cache a search builder for this resource type
554                        // filter by ResourceType
555                        ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
556
557                        // Now, further filter the query with patient references defined by the chunk of IDs we have.
558                        // filter by PatientIds
559                        if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theParams.getResourceType())) {
560                                filterSearchByHasParam(patientIds, expandedSpMap, theParams);
561                        } else {
562                                filterSearchByResourceIds(patientIds, expandedSpMap, theParams);
563                        }
564
565                        // Execute query and all found pids to our local iterator.
566                        RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions();
567                        try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(
568                                        expandedSpMap, new SearchRuntimeDetails(null, theParams.getInstanceId()), null, partitionId)) {
569                                while (resultIterator.hasNext()) {
570                                        theReadPids.add(resultIterator.next());
571                                }
572                        }
573
574                        // Construct our Includes filter
575                        // We use this to recursively fetch resources of interest
576                        // (but should only request those the user has requested/can see)
577                        Set<Include> includes = new HashSet<>();
578                        for (String resourceType : theParams.getRequestedResourceTypes()) {
579                                includes.add(new Include(resourceType + ":*", true));
580                        }
581
582                        SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId);
583                        SearchBuilderLoadIncludesParameters<JpaPid> loadIncludesParameters =
584                                        new SearchBuilderLoadIncludesParameters<>();
585                        loadIncludesParameters.setFhirContext(myContext);
586                        loadIncludesParameters.setMatches(theReadPids);
587                        loadIncludesParameters.setEntityManager(myEntityManager);
588                        loadIncludesParameters.setRequestDetails(requestDetails);
589                        loadIncludesParameters.setIncludeFilters(includes);
590                        loadIncludesParameters.setReverseMode(false);
591                        loadIncludesParameters.setLastUpdated(expandedSpMap.getLastUpdated());
592                        loadIncludesParameters.setSearchIdOrDescription(theParams.getInstanceId());
593                        loadIncludesParameters.setDesiredResourceTypes(theParams.getRequestedResourceTypes());
594                        Set<JpaPid> includeIds = searchBuilder.loadIncludes(loadIncludesParameters);
595
596                        // gets rid of the Patient duplicates
597                        theReadPids.addAll(includeIds.stream()
598                                        .filter((id) -> !id.getResourceType().equals("Patient"))
599                                        .collect(Collectors.toSet()));
600                }
601        }
602
603        /**
604         * Must not be called for resources types listed in PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES
605         *
606         * @param idChunk
607         * @param expandedSpMap
608         * @param theParams
609         */
610        private void filterSearchByResourceIds(
611                        Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
612                ReferenceOrListParam orList = new ReferenceOrListParam();
613                idChunk.forEach(id -> orList.add(new ReferenceParam(id)));
614                RuntimeSearchParam patientSearchParamForCurrentResourceType =
615                                getPatientSearchParamForCurrentResourceType(theParams.getResourceType());
616                expandedSpMap.add(patientSearchParamForCurrentResourceType.getName(), orList);
617        }
618
619        /**
620         * @param idChunk
621         * @param expandedSpMap
622         */
623        private void filterSearchByHasParam(
624                        Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) {
625                HasOrListParam hasOrListParam = new HasOrListParam();
626                idChunk.stream().forEach(id -> hasOrListParam.addOr(buildHasParam(id, theParams.getResourceType())));
627                expandedSpMap.add("_has", hasOrListParam);
628        }
629
630        private HasParam buildHasParam(String theResourceId, String theResourceType) {
631                if ("Practitioner".equalsIgnoreCase(theResourceType)) {
632                        return new HasParam("Patient", "general-practitioner", "_id", theResourceId);
633                } else if ("Organization".equalsIgnoreCase(theResourceType)) {
634                        return new HasParam("Patient", "organization", "_id", theResourceId);
635                } else {
636                        throw new IllegalArgumentException(
637                                        Msg.code(2077) + " We can't handle forward references onto type " + theResourceType);
638                }
639        }
640
641        /**
642         * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients.
643         * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched
644         * patients.
645         *
646         * @return a Set of Strings representing the resource IDs of all members of a group.
647         */
648        private Set<JpaPid> expandAllPatientPidsFromGroup(ExportPIDIteratorParameters theParams) throws IOException {
649                Set<JpaPid> expandedIds = new HashSet<>();
650                RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions();
651                SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId);
652                IBaseResource group =
653                                myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), requestDetails);
654                JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group);
655
656                // Attempt to perform MDM Expansion of membership
657                if (theParams.isExpandMdm()) {
658                        expandedIds.addAll(performMembershipExpansionViaMdmTable(pidOrNull));
659                }
660
661                // Now manually add the members of the group (its possible even with mdm expansion that some members dont have
662                // MDM matches,
663                // so would be otherwise skipped
664                List<JpaPid> membersFromGroupWithFilter = getMembersFromGroupWithFilter(theParams, false);
665                ourLog.debug("Group with ID [{}] has been expanded to: {}", theParams.getGroupId(), membersFromGroupWithFilter);
666                expandedIds.addAll(membersFromGroupWithFilter);
667
668                return expandedIds;
669        }
670
671        @SuppressWarnings({"rawtypes", "unchecked"})
672        private Set<JpaPid> performMembershipExpansionViaMdmTable(JpaPid pidOrNull) {
673                List<MdmPidTuple<JpaPid>> goldenPidTargetPidTuples =
674                                myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH);
675                // Now lets translate these pids into resource IDs
676                Set<JpaPid> uniquePids = new HashSet<>();
677                goldenPidTargetPidTuples.forEach(tuple -> {
678                        uniquePids.add(tuple.getGoldenPid());
679                        uniquePids.add(tuple.getSourcePid());
680                });
681                PersistentIdToForcedIdMap pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids);
682
683                Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>();
684                extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap);
685                populateMdmResourceCache(goldenPidTargetPidTuples);
686
687                return uniquePids;
688        }
689
690        /* Mdm Expansion */
691
692        private RuntimeSearchParam getRuntimeSearchParam(IBaseResource theResource) {
693                Optional<RuntimeSearchParam> oPatientSearchParam =
694                                SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResource.fhirType());
695                if (!oPatientSearchParam.isPresent()) {
696                        String errorMessage = String.format(
697                                        "[%s] has  no search parameters that are for patients, so it is invalid for Group Bulk Export!",
698                                        theResource.fhirType());
699                        throw new IllegalArgumentException(Msg.code(2242) + errorMessage);
700                } else {
701                        return oPatientSearchParam.get();
702                }
703        }
704
705        private void annotateBackwardsReferences(IBaseResource iBaseResource) {
706                Optional<String> patientReference = getPatientReference(iBaseResource);
707                if (patientReference.isPresent()) {
708                        addGoldenResourceExtension(iBaseResource, patientReference.get());
709                } else {
710                        ourLog.error(
711                                        "Failed to find the patient reference information for resource {}. This is a bug, "
712                                                        + "as all resources which can be exported via Group Bulk Export must reference a patient.",
713                                        iBaseResource);
714                }
715        }
716
717        private Optional<String> getPatientReference(IBaseResource iBaseResource) {
718                String fhirPath;
719
720                RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource);
721                fhirPath = getPatientFhirPath(runtimeSearchParam);
722
723                if (iBaseResource.fhirType().equalsIgnoreCase("Patient")) {
724                        return Optional.of(iBaseResource.getIdElement().getIdPart());
725                } else {
726                        Optional<IBaseReference> optionalReference =
727                                        getFhirParser().evaluateFirst(iBaseResource, fhirPath, IBaseReference.class);
728                        if (optionalReference.isPresent()) {
729                                return optionalReference.map(theIBaseReference ->
730                                                theIBaseReference.getReferenceElement().getIdPart());
731                        } else {
732                                return Optional.empty();
733                        }
734                }
735        }
736
737        private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) {
738                String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId);
739                IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension(
740                                iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL);
741                if (!StringUtils.isBlank(goldenResourceId)) {
742                        ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId));
743                }
744        }
745
746        private String prefixPatient(String theResourceId) {
747                return "Patient/" + theResourceId;
748        }
749
750        private IFhirPath getFhirParser() {
751                if (myFhirPath == null) {
752                        myFhirPath = myContext.newFhirPath();
753                }
754                return myFhirPath;
755        }
756
757        private String getPatientFhirPath(RuntimeSearchParam theRuntimeParam) {
758                String path = theRuntimeParam.getPath();
759                // GGG: Yes this is a stupid hack, but by default this runtime search param will return stuff like
760                // Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play
761                // nicely with
762                // our FHIRPath evaluator.
763                if (path.contains(".where")) {
764                        path = path.substring(0, path.indexOf(".where"));
765                }
766                return path;
767        }
768}