001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.builder.tasks;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.api.dao.IDao;
030import ca.uhn.fhir.jpa.dao.IResultIterator;
031import ca.uhn.fhir.jpa.dao.ISearchBuilder;
032import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
033import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
034import ca.uhn.fhir.jpa.entity.Search;
035import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
036import ca.uhn.fhir.jpa.model.dao.JpaPid;
037import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
038import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
039import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
040import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
041import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
042import ca.uhn.fhir.jpa.util.QueryParameterUtils;
043import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator;
044import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
045import ca.uhn.fhir.rest.api.server.RequestDetails;
046import ca.uhn.fhir.rest.server.IPagingProvider;
047import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
048import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
049import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
050import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
051import ca.uhn.fhir.system.HapiSystemProperties;
052import ca.uhn.fhir.util.AsyncUtil;
053import ca.uhn.fhir.util.StopWatch;
054import co.elastic.apm.api.ElasticApm;
055import co.elastic.apm.api.Span;
056import co.elastic.apm.api.Transaction;
057import org.apache.commons.lang3.Validate;
058import org.apache.commons.lang3.exception.ExceptionUtils;
059import org.hl7.fhir.instance.model.api.IBaseResource;
060import org.springframework.transaction.annotation.Isolation;
061import org.springframework.transaction.annotation.Propagation;
062
063import java.io.IOException;
064import java.util.ArrayList;
065import java.util.Iterator;
066import java.util.List;
067import java.util.concurrent.Callable;
068import java.util.concurrent.CountDownLatch;
069import java.util.concurrent.TimeUnit;
070import java.util.function.Consumer;
071import javax.annotation.Nonnull;
072
073import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
074import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
075import static java.util.Objects.nonNull;
076import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
077
078/**
079 * A search task is a Callable task that runs in
080 * a thread pool to handle an individual search. One instance
081 * is created for any requested search and runs from the
082 * beginning to the end of the search.
083 * <p>
084 * Understand:
085 * This class executes in its own thread separate from the
086 * web server client thread that made the request. We do that
087 * so that we can return to the client as soon as possible,
088 * but keep the search going in the background (and have
089 * the next page of results ready to go when the client asks).
090 */
091public class SearchTask implements Callable<Void> {
092
093        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class);
094        // injected beans
095        protected final HapiTransactionService myTxService;
096        protected final FhirContext myContext;
097        protected final ISearchResultCacheSvc mySearchResultCacheSvc;
098        private final SearchParameterMap myParams;
099        private final IDao myCallingDao;
100        private final String myResourceType;
101        private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>();
102        private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
103        private final CountDownLatch myCompletionLatch;
104        private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>();
105        private final RequestDetails myRequest;
106        private final RequestPartitionId myRequestPartitionId;
107        private final SearchRuntimeDetails mySearchRuntimeDetails;
108        private final Transaction myParentTransaction;
109        private final Consumer<String> myOnRemove;
110        private final int mySyncSize;
111        private final Integer myLoadingThrottleForUnitTests;
112        private final IInterceptorBroadcaster myInterceptorBroadcaster;
113        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
114        private final JpaStorageSettings myStorageSettings;
115        private final ISearchCacheSvc mySearchCacheSvc;
116        private final IPagingProvider myPagingProvider;
117        private Search mySearch;
118        private boolean myAbortRequested;
119        private int myCountSavedTotal = 0;
120        private int myCountSavedThisPass = 0;
121        private int myCountBlockedThisPass = 0;
122        private boolean myAdditionalPrefetchThresholdsRemaining;
123        private List<JpaPid> myPreviouslyAddedResourcePids;
124        private Integer myMaxResultsToFetch;
125        /**
126         * Constructor
127         */
128        public SearchTask(
129                        SearchTaskParameters theCreationParams,
130                        HapiTransactionService theManagedTxManager,
131                        FhirContext theContext,
132                        IInterceptorBroadcaster theInterceptorBroadcaster,
133                        SearchBuilderFactory theSearchBuilderFactory,
134                        ISearchResultCacheSvc theSearchResultCacheSvc,
135                        JpaStorageSettings theStorageSettings,
136                        ISearchCacheSvc theSearchCacheSvc,
137                        IPagingProvider thePagingProvider) {
138                // beans
139                myTxService = theManagedTxManager;
140                myContext = theContext;
141                myInterceptorBroadcaster = theInterceptorBroadcaster;
142                mySearchBuilderFactory = theSearchBuilderFactory;
143                mySearchResultCacheSvc = theSearchResultCacheSvc;
144                myStorageSettings = theStorageSettings;
145                mySearchCacheSvc = theSearchCacheSvc;
146                myPagingProvider = thePagingProvider;
147
148                // values
149                myOnRemove = theCreationParams.OnRemove;
150                mySearch = theCreationParams.Search;
151                myCallingDao = theCreationParams.CallingDao;
152                myParams = theCreationParams.Params;
153                myResourceType = theCreationParams.ResourceType;
154                myRequest = theCreationParams.Request;
155                myCompletionLatch = new CountDownLatch(1);
156                mySyncSize = theCreationParams.SyncSize;
157                myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests();
158
159                mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid());
160                mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myCallingDao.getContext()));
161                myRequestPartitionId = theCreationParams.RequestPartitionId;
162                myParentTransaction = ElasticApm.currentTransaction();
163        }
164
165        protected RequestPartitionId getRequestPartitionId() {
166                return myRequestPartitionId;
167        }
168
169        /**
170         * This method is called by the server HTTP thread, and
171         * will block until at least one page of results have been
172         * fetched from the DB, and will never block after that.
173         */
174        public Integer awaitInitialSync() {
175                ourLog.trace("Awaiting initial sync");
176                do {
177                        ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted());
178                        if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(
179                                        getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) {
180                                break;
181                        }
182                } while (getSearch().getStatus() == SearchStatusEnum.LOADING);
183                ourLog.trace("Initial sync completed");
184
185                return getSearch().getTotalCount();
186        }
187
188        public Search getSearch() {
189                return mySearch;
190        }
191
192        public CountDownLatch getInitialCollectionLatch() {
193                return myInitialCollectionLatch;
194        }
195
196        public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) {
197                myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
198                myCountSavedTotal = myPreviouslyAddedResourcePids.size();
199        }
200
201        private ISearchBuilder newSearchBuilder() {
202                Class<? extends IBaseResource> resourceTypeClass =
203                                myContext.getResourceDefinition(myResourceType).getImplementingClass();
204                return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass);
205        }
206
207        @Nonnull
208        public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) {
209                ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
210
211                boolean keepWaiting;
212                do {
213                        synchronized (mySyncedPids) {
214                                ourLog.trace("Search status is {}", mySearch.getStatus());
215                                boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
216                                if (!haveEnoughResults) {
217                                        switch (mySearch.getStatus()) {
218                                                case LOADING:
219                                                        keepWaiting = true;
220                                                        break;
221                                                case PASSCMPLET:
222                                                        /*
223                                                         * If we get here, it means that the user requested resources that crossed the
224                                                         * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
225                                                         * user has requested resources 0-60, then they would get 0-50 back but the search
226                                                         * coordinator would then stop searching.SearchCoordinatorSvcImplTest
227                                                         */
228                                                        keepWaiting = false;
229                                                        break;
230                                                case FAILED:
231                                                case FINISHED:
232                                                case GONE:
233                                                default:
234                                                        keepWaiting = false;
235                                                        break;
236                                        }
237                                } else {
238                                        keepWaiting = false;
239                                }
240                        }
241
242                        if (keepWaiting) {
243                                ourLog.info(
244                                                "Waiting as we only have {} results - Search status: {}",
245                                                mySyncedPids.size(),
246                                                mySearch.getStatus());
247                                AsyncUtil.sleep(500L);
248                        }
249                } while (keepWaiting);
250
251                ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
252
253                ArrayList<JpaPid> retVal = new ArrayList<>();
254                synchronized (mySyncedPids) {
255                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
256
257                        int toIndex = theToIndex;
258                        if (mySyncedPids.size() < toIndex) {
259                                toIndex = mySyncedPids.size();
260                        }
261                        for (int i = theFromIndex; i < toIndex; i++) {
262                                retVal.add(mySyncedPids.get(i));
263                        }
264                }
265
266                ourLog.trace(
267                                "Done syncing results - Wanted {}-{} and returning {} of {}",
268                                theFromIndex,
269                                theToIndex,
270                                retVal.size(),
271                                mySyncedPids.size());
272
273                return retVal;
274        }
275
276        public void saveSearch() {
277                myTxService
278                                .withRequest(myRequest)
279                                .withRequestPartitionId(myRequestPartitionId)
280                                .withPropagation(Propagation.REQUIRES_NEW)
281                                .execute(() -> doSaveSearch());
282        }
283
284        private void saveUnsynced(final IResultIterator theResultIter) {
285                myTxService
286                                .withRequest(myRequest)
287                                .withRequestPartitionId(myRequestPartitionId)
288                                .execute(() -> {
289                                        if (mySearch.getId() == null) {
290                                                doSaveSearch();
291                                        }
292
293                                        ArrayList<JpaPid> unsyncedPids = myUnsyncedPids;
294                                        int countBlocked = 0;
295
296                                        // Interceptor call: STORAGE_PREACCESS_RESOURCES
297                                        // This can be used to remove results from the search result details before
298                                        // the user has a chance to know that they were in the results
299                                        if (mySearchRuntimeDetails.getRequestDetails() != null && unsyncedPids.isEmpty() == false) {
300                                                JpaPreResourceAccessDetails accessDetails =
301                                                                new JpaPreResourceAccessDetails(unsyncedPids, () -> newSearchBuilder());
302                                                HookParams params = new HookParams()
303                                                                .add(IPreResourceAccessDetails.class, accessDetails)
304                                                                .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
305                                                                .addIfMatchesType(
306                                                                                ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails());
307                                                CompositeInterceptorBroadcaster.doCallHooks(
308                                                                myInterceptorBroadcaster, myRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
309
310                                                for (int i = unsyncedPids.size() - 1; i >= 0; i--) {
311                                                        if (accessDetails.isDontReturnResourceAtIndex(i)) {
312                                                                unsyncedPids.remove(i);
313                                                                myCountBlockedThisPass++;
314                                                                myCountSavedTotal++;
315                                                                countBlocked++;
316                                                        }
317                                                }
318                                        }
319
320                                        // Actually store the results in the query cache storage
321                                        myCountSavedTotal += unsyncedPids.size();
322                                        myCountSavedThisPass += unsyncedPids.size();
323                                        mySearchResultCacheSvc.storeResults(
324                                                        mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId());
325
326                                        synchronized (mySyncedPids) {
327                                                int numSyncedThisPass = unsyncedPids.size();
328                                                ourLog.trace(
329                                                                "Syncing {} search results - Have more: {}",
330                                                                numSyncedThisPass,
331                                                                theResultIter.hasNext());
332                                                mySyncedPids.addAll(unsyncedPids);
333                                                unsyncedPids.clear();
334
335                                                if (theResultIter.hasNext() == false) {
336                                                        int skippedCount = theResultIter.getSkippedCount();
337                                                        int nonSkippedCount = theResultIter.getNonSkippedCount();
338                                                        int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
339                                                        ourLog.trace(
340                                                                        "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]",
341                                                                        myMaxResultsToFetch,
342                                                                        skippedCount,
343                                                                        myCountSavedThisPass,
344                                                                        myCountSavedTotal,
345                                                                        myAdditionalPrefetchThresholdsRemaining);
346
347                                                        if (nonSkippedCount == 0
348                                                                        || (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch)) {
349                                                                ourLog.trace("Setting search status to FINISHED");
350                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
351                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
352                                                        } else if (myAdditionalPrefetchThresholdsRemaining) {
353                                                                ourLog.trace("Setting search status to PASSCMPLET");
354                                                                mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
355                                                                mySearch.setSearchParameterMap(myParams);
356                                                        } else {
357                                                                ourLog.trace("Setting search status to FINISHED");
358                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
359                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
360                                                        }
361                                                }
362                                        }
363
364                                        mySearch.setNumFound(myCountSavedTotal);
365                                        mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked);
366
367                                        int numSynced;
368                                        synchronized (mySyncedPids) {
369                                                numSynced = mySyncedPids.size();
370                                        }
371
372                                        if (myStorageSettings.getCountSearchResultsUpTo() == null
373                                                        || myStorageSettings.getCountSearchResultsUpTo() <= 0
374                                                        || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) {
375                                                myInitialCollectionLatch.countDown();
376                                        }
377
378                                        doSaveSearch();
379
380                                        ourLog.trace("saveUnsynced() - pre-commit");
381                                });
382                ourLog.trace("saveUnsynced() - post-commit");
383        }
384
385        public boolean isNotAborted() {
386                return myAbortRequested == false;
387        }
388
389        public void markComplete() {
390                myCompletionLatch.countDown();
391        }
392
393        public CountDownLatch getCompletionLatch() {
394                return myCompletionLatch;
395        }
396
397        /**
398         * Request that the task abort as soon as possible
399         */
400        public void requestImmediateAbort() {
401                myAbortRequested = true;
402        }
403
404        /**
405         * This is the method which actually performs the search.
406         * It is called automatically by the thread pool.
407         */
408        @Override
409        public Void call() {
410                StopWatch sw = new StopWatch();
411                Span span = myParentTransaction.startSpan("db", "query", "search");
412                span.setName("FHIR Database Search");
413                try {
414                        // Create an initial search in the DB and give it an ID
415                        saveSearch();
416
417                        myTxService
418                                        .withRequest(myRequest)
419                                        .withRequestPartitionId(myRequestPartitionId)
420                                        .withIsolation(Isolation.READ_COMMITTED)
421                                        .execute(() -> doSearch());
422
423                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
424                        if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
425                                HookParams params = new HookParams()
426                                                .add(RequestDetails.class, myRequest)
427                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
428                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
429                                CompositeInterceptorBroadcaster.doCallHooks(
430                                                myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
431                        } else {
432                                HookParams params = new HookParams()
433                                                .add(RequestDetails.class, myRequest)
434                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
435                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
436                                CompositeInterceptorBroadcaster.doCallHooks(
437                                                myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
438                        }
439
440                        ourLog.trace(
441                                        "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}",
442                                        mySearch.getResourceType(),
443                                        mySearch.getSearchQueryString(),
444                                        mySyncedPids.size(),
445                                        sw.getMillis(),
446                                        mySearch.getStatus());
447
448                } catch (Throwable t) {
449
450                        /*
451                         * Don't print a stack trace for client errors (i.e. requests that
452                         * aren't valid because the client screwed up).. that's just noise
453                         * in the logs and who needs that.
454                         */
455                        boolean logged = false;
456                        if (t instanceof BaseServerResponseException) {
457                                BaseServerResponseException exception = (BaseServerResponseException) t;
458                                if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
459                                        logged = true;
460                                        ourLog.warn("Failed during search due to invalid request: {}", t.toString());
461                                }
462                        }
463
464                        if (!logged) {
465                                ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
466                        }
467                        myUnsyncedPids.clear();
468                        Throwable rootCause = ExceptionUtils.getRootCause(t);
469                        rootCause = defaultIfNull(rootCause, t);
470
471                        String failureMessage = rootCause.getMessage();
472
473                        int failureCode = InternalErrorException.STATUS_CODE;
474                        if (t instanceof BaseServerResponseException) {
475                                failureCode = ((BaseServerResponseException) t).getStatusCode();
476                        }
477
478                        if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) {
479                                failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause);
480                        }
481
482                        mySearch.setFailureMessage(failureMessage);
483                        mySearch.setFailureCode(failureCode);
484                        mySearch.setStatus(SearchStatusEnum.FAILED);
485
486                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
487                        HookParams params = new HookParams()
488                                        .add(RequestDetails.class, myRequest)
489                                        .addIfMatchesType(ServletRequestDetails.class, myRequest)
490                                        .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
491                        CompositeInterceptorBroadcaster.doCallHooks(
492                                        myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
493
494                        saveSearch();
495                        span.captureException(t);
496                } finally {
497                        myOnRemove.accept(mySearch.getUuid());
498
499                        myInitialCollectionLatch.countDown();
500                        markComplete();
501                        span.end();
502                }
503                return null;
504        }
505
506        private void doSaveSearch() {
507                Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId);
508
509                // mySearchDao.save is not supposed to return null, but in unit tests
510                // it can if the mock search dao isn't set up to handle that
511                if (newSearch != null) {
512                        mySearch = newSearch;
513                }
514        }
515
516        /**
517         * This method actually creates the database query to perform the
518         * search, and starts it.
519         */
520        private void doSearch() {
521                /*
522                 * If the user has explicitly requested a _count, perform a
523                 *
524                 * SELECT COUNT(*) ....
525                 *
526                 * before doing anything else.
527                 */
528                boolean myParamWantOnlyCount = isWantOnlyCount(myParams);
529                boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode())
530                                ? isWantCount(myParams)
531                                : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode());
532
533                if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
534                        ourLog.trace("Performing count");
535                        ISearchBuilder sb = newSearchBuilder();
536
537                        /*
538                         * createCountQuery
539                         * NB: (see createQuery below)
540                         * Because FulltextSearchSvcImpl will (internally)
541                         * mutate the myParams (searchmap),
542                         * (specifically removing the _content and _text filters)
543                         * we will have to clone those parameters here so that
544                         * the "correct" params are used in createQuery below
545                         */
546                        Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
547
548                        ourLog.trace("Got count {}", count);
549
550                        myTxService
551                                        .withRequest(myRequest)
552                                        .withRequestPartitionId(myRequestPartitionId)
553                                        .execute(() -> {
554                                                mySearch.setTotalCount(count.intValue());
555                                                if (myParamWantOnlyCount) {
556                                                        mySearch.setStatus(SearchStatusEnum.FINISHED);
557                                                }
558                                                doSaveSearch();
559                                        });
560                        if (myParamWantOnlyCount) {
561                                return;
562                        }
563                }
564
565                ourLog.trace("Done count");
566                ISearchBuilder sb = newSearchBuilder();
567
568                /*
569                 * Figure out how many results we're actually going to fetch from the
570                 * database in this pass. This calculation takes into consideration the
571                 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds()
572                 * as well as the value of the _count parameter.
573                 */
574                int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
575                int minWanted = 0;
576                if (myParams.getCount() != null) {
577                        minWanted = myParams.getCount() + 1; // Always fetch one past this page, so we know if there is a next page.
578                        minWanted = Math.min(minWanted, myPagingProvider.getMaximumPageSize());
579                        minWanted += currentlyLoaded;
580                }
581
582                for (Iterator<Integer> iter =
583                                                myStorageSettings.getSearchPreFetchThresholds().iterator();
584                                iter.hasNext(); ) {
585                        int next = iter.next();
586                        if (next != -1 && next <= currentlyLoaded) {
587                                continue;
588                        }
589
590                        if (next == -1) {
591                                sb.setMaxResultsToFetch(null);
592                        } else {
593                                myMaxResultsToFetch = Math.max(next, minWanted);
594                                sb.setMaxResultsToFetch(myMaxResultsToFetch);
595                        }
596
597                        if (iter.hasNext()) {
598                                myAdditionalPrefetchThresholdsRemaining = true;
599                        }
600
601                        // If we get here's we've found an appropriate threshold
602                        break;
603                }
604
605                /*
606                 * Provide any PID we loaded in previous search passes to the
607                 * SearchBuilder so that we don't get duplicates coming from running
608                 * the same query again.
609                 *
610                 * We could possibly accomplish this in a different way by using sorted
611                 * results in our SQL query and specifying an offset. I don't actually
612                 * know if that would be faster or not. At some point should test this
613                 * idea.
614                 */
615                if (myPreviouslyAddedResourcePids != null) {
616                        sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
617                        mySyncedPids.addAll(myPreviouslyAddedResourcePids);
618                }
619
620                /*
621                 * createQuery
622                 * Construct the SQL query we'll be sending to the database
623                 *
624                 * NB: (See createCountQuery above)
625                 * We will pass the original myParams here (not a copy)
626                 * because we actually _want_ the mutation of the myParams to happen.
627                 * Specifically because SearchBuilder itself will _expect_
628                 * not to have these parameters when dumping back
629                 * to our DB.
630                 *
631                 * This is an odd implementation behaviour, but the change
632                 * for this will require a lot more handling at higher levels
633                 */
634                try (IResultIterator<JpaPid> resultIterator =
635                                sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
636                        assert (resultIterator != null);
637
638                        /*
639                         * The following loop actually loads the PIDs of the resources
640                         * matching the search off of the disk and into memory. After
641                         * every X results, we commit to the HFJ_SEARCH table.
642                         */
643                        int syncSize = mySyncSize;
644                        while (resultIterator.hasNext()) {
645                                myUnsyncedPids.add(resultIterator.next());
646
647                                boolean shouldSync = myUnsyncedPids.size() >= syncSize;
648
649                                if (myStorageSettings.getCountSearchResultsUpTo() != null
650                                                && myStorageSettings.getCountSearchResultsUpTo() > 0
651                                                && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
652                                        shouldSync = false;
653                                }
654
655                                if (myUnsyncedPids.size() > 50000) {
656                                        shouldSync = true;
657                                }
658
659                                // If no abort was requested, bail out
660                                Validate.isTrue(isNotAborted(), "Abort has been requested");
661
662                                if (shouldSync) {
663                                        saveUnsynced(resultIterator);
664                                }
665
666                                if (myLoadingThrottleForUnitTests != null) {
667                                        AsyncUtil.sleep(myLoadingThrottleForUnitTests);
668                                }
669                        }
670
671                        // If no abort was requested, bail out
672                        Validate.isTrue(isNotAborted(), "Abort has been requested");
673
674                        saveUnsynced(resultIterator);
675
676                } catch (IOException e) {
677                        ourLog.error("IO failure during database access", e);
678                        throw new InternalErrorException(Msg.code(1166) + e);
679                }
680        }
681}