001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%family
019 */
020package ca.uhn.fhir.jpa.search;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
030import ca.uhn.fhir.jpa.api.dao.IDao;
031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
033import ca.uhn.fhir.jpa.config.SearchConfig;
034import ca.uhn.fhir.jpa.dao.BaseStorageDao;
035import ca.uhn.fhir.jpa.dao.ISearchBuilder;
036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException;
038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
039import ca.uhn.fhir.jpa.entity.Search;
040import ca.uhn.fhir.jpa.model.dao.JpaPid;
041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade;
043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask;
044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask;
045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters;
046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
050import ca.uhn.fhir.jpa.util.QueryParameterUtils;
051import ca.uhn.fhir.model.api.Include;
052import ca.uhn.fhir.rest.api.CacheControlDirective;
053import ca.uhn.fhir.rest.api.Constants;
054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
055import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
056import ca.uhn.fhir.rest.api.server.IBundleProvider;
057import ca.uhn.fhir.rest.api.server.RequestDetails;
058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
063import ca.uhn.fhir.util.AsyncUtil;
064import ca.uhn.fhir.util.StopWatch;
065import ca.uhn.fhir.util.UrlUtil;
066import com.google.common.annotations.VisibleForTesting;
067import org.apache.commons.lang3.time.DateUtils;
068import org.hl7.fhir.instance.model.api.IBaseResource;
069import org.springframework.beans.factory.BeanFactory;
070import org.springframework.data.domain.PageRequest;
071import org.springframework.data.domain.Pageable;
072import org.springframework.data.domain.Sort;
073import org.springframework.stereotype.Component;
074import org.springframework.transaction.support.TransactionSynchronizationManager;
075
076import java.time.Instant;
077import java.time.temporal.ChronoUnit;
078import java.util.List;
079import java.util.Optional;
080import java.util.Set;
081import java.util.UUID;
082import java.util.concurrent.Callable;
083import java.util.concurrent.ConcurrentHashMap;
084import java.util.concurrent.TimeUnit;
085import java.util.function.Consumer;
086import java.util.stream.Collectors;
087import javax.annotation.Nonnull;
088import javax.annotation.Nullable;
089
090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE;
091import static org.apache.commons.lang3.StringUtils.isBlank;
092import static org.apache.commons.lang3.StringUtils.isNotBlank;
093
094@Component("mySearchCoordinatorSvc")
095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> {
096
097        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
098
099        private final FhirContext myContext;
100        private final JpaStorageSettings myStorageSettings;
101        private final IInterceptorBroadcaster myInterceptorBroadcaster;
102        private final HapiTransactionService myTxService;
103        private final ISearchCacheSvc mySearchCacheSvc;
104        private final ISearchResultCacheSvc mySearchResultCacheSvc;
105        private final DaoRegistry myDaoRegistry;
106        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
107        private final ISynchronousSearchSvc mySynchronousSearchSvc;
108        private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory;
109        private final ISearchParamRegistry mySearchParamRegistry;
110        private final SearchStrategyFactory mySearchStrategyFactory;
111        private final ExceptionService myExceptionSvc;
112        private final BeanFactory myBeanFactory;
113        private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
114
115        private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove;
116
117        private final StorageInterceptorHooksFacade myStorageInterceptorHooks;
118        private Integer myLoadingThrottleForUnitTests = null;
119        private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
120        private boolean myNeverUseLocalSearchForUnitTests;
121        private int mySyncSize = DEFAULT_SYNC_SIZE;
122
123        /**
124         * Constructor
125         */
126        public SearchCoordinatorSvcImpl(
127                        FhirContext theContext,
128                        JpaStorageSettings theStorageSettings,
129                        IInterceptorBroadcaster theInterceptorBroadcaster,
130                        HapiTransactionService theTxService,
131                        ISearchCacheSvc theSearchCacheSvc,
132                        ISearchResultCacheSvc theSearchResultCacheSvc,
133                        DaoRegistry theDaoRegistry,
134                        SearchBuilderFactory<JpaPid> theSearchBuilderFactory,
135                        ISynchronousSearchSvc theSynchronousSearchSvc,
136                        PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory,
137                        ISearchParamRegistry theSearchParamRegistry,
138                        SearchStrategyFactory theSearchStrategyFactory,
139                        ExceptionService theExceptionSvc,
140                        BeanFactory theBeanFactory) {
141                super();
142                myContext = theContext;
143                myStorageSettings = theStorageSettings;
144                myInterceptorBroadcaster = theInterceptorBroadcaster;
145                myTxService = theTxService;
146                mySearchCacheSvc = theSearchCacheSvc;
147                mySearchResultCacheSvc = theSearchResultCacheSvc;
148                myDaoRegistry = theDaoRegistry;
149                mySearchBuilderFactory = theSearchBuilderFactory;
150                mySynchronousSearchSvc = theSynchronousSearchSvc;
151                myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory;
152                mySearchParamRegistry = theSearchParamRegistry;
153                mySearchStrategyFactory = theSearchStrategyFactory;
154                myExceptionSvc = theExceptionSvc;
155                myBeanFactory = theBeanFactory;
156
157                myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster);
158        }
159
160        @VisibleForTesting
161        Set<String> getActiveSearchIds() {
162                return myIdToSearchTask.keySet();
163        }
164
165        @VisibleForTesting
166        public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
167                myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
168        }
169
170        @VisibleForTesting
171        public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
172                myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
173        }
174
175        @VisibleForTesting
176        public void setSyncSizeForUnitTests(int theSyncSize) {
177                mySyncSize = theSyncSize;
178        }
179
180        @Override
181        public void cancelAllActiveSearches() {
182                for (SearchTask next : myIdToSearchTask.values()) {
183                        ourLog.info(
184                                        "Requesting immediate abort of search: {}", next.getSearch().getUuid());
185                        next.requestImmediateAbort();
186                        AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS);
187                }
188        }
189
190        @SuppressWarnings("SameParameterValue")
191        @VisibleForTesting
192        void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
193                myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
194        }
195
196        /**
197         * This method is called by the HTTP client processing thread in order to
198         * fetch resources.
199         * <p>
200         * This method must not be called from inside a transaction. The rationale is that
201         * the {@link Search} entity is treated as a piece of shared state across client threads
202         * accessing the same thread, so we need to be able to update that table in a transaction
203         * and commit it right away in order for that to work. Examples of needing to do this
204         * include if two different clients request the same search and are both paging at the
205         * same time, but also includes clients that are hacking the paging links to
206         * fetch multiple pages of a search result in parallel. In both cases we need to only
207         * let one of them actually activate the search, or we will have conficts. The other thread
208         * just needs to wait until the first one actually fetches more results.
209         */
210        @Override
211        public List<JpaPid> getResources(
212                        final String theUuid,
213                        int theFrom,
214                        int theTo,
215                        @Nullable RequestDetails theRequestDetails,
216                        RequestPartitionId theRequestPartitionId) {
217                assert !TransactionSynchronizationManager.isActualTransactionActive();
218
219                // If we're actively searching right now, don't try to do anything until at least one batch has been
220                // persisted in the DB
221                SearchTask searchTask = myIdToSearchTask.get(theUuid);
222                if (searchTask != null) {
223                        searchTask.awaitInitialSync();
224                }
225
226                ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo);
227
228                Search search;
229                StopWatch sw = new StopWatch();
230                while (true) {
231
232                        if (myNeverUseLocalSearchForUnitTests == false) {
233                                if (searchTask != null) {
234                                        ourLog.trace("Local search found");
235                                        List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo);
236                                        ourLog.trace(
237                                                        "Local search returned {} pids, wanted {}-{} - Search: {}",
238                                                        resourcePids.size(),
239                                                        theFrom,
240                                                        theTo,
241                                                        searchTask.getSearch());
242
243                                        /*
244                                         * Generally, if a search task is open, the fastest possible thing is to just return its results. This
245                                         * will work most of the time, but can fail if the task hit a search threshold and the client is requesting
246                                         * results beyond that threashold. In that case, we'll keep going below, since that will trigger another
247                                         * task.
248                                         */
249                                        if ((searchTask.getSearch().getNumFound()
250                                                                                        - searchTask.getSearch().getNumBlocked())
251                                                                        >= theTo
252                                                        || resourcePids.size() == (theTo - theFrom)) {
253                                                return resourcePids;
254                                        }
255                                }
256                        }
257
258                        Callable<Search> searchCallback = () -> mySearchCacheSvc
259                                        .fetchByUuid(theUuid, theRequestPartitionId)
260                                        .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid));
261                        search = myTxService
262                                        .withRequest(theRequestDetails)
263                                        .withRequestPartitionId(theRequestPartitionId)
264                                        .execute(searchCallback);
265                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search);
266
267                        if (search.getStatus() == SearchStatusEnum.FINISHED) {
268                                ourLog.trace("Search entity marked as finished with {} results", search.getNumFound());
269                                break;
270                        }
271                        if ((search.getNumFound() - search.getNumBlocked()) >= theTo) {
272                                ourLog.trace("Search entity has {} results so far", search.getNumFound());
273                                break;
274                        }
275
276                        if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
277                                ourLog.error(
278                                                "Search {} of type {} for {}{} timed out after {}ms",
279                                                search.getId(),
280                                                search.getSearchType(),
281                                                search.getResourceType(),
282                                                search.getSearchQueryString(),
283                                                sw.getMillis());
284                                throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms");
285                        }
286
287                        // If the search was saved in "pass complete mode" it's probably time to
288                        // start a new pass
289                        if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
290                                ourLog.trace("Going to try to start next search");
291                                Optional<Search> newSearch =
292                                                mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId);
293                                if (newSearch.isPresent()) {
294                                        ourLog.trace("Launching new search");
295                                        search = newSearch.get();
296                                        String resourceType = search.getResourceType();
297                                        SearchParameterMap params = search.getSearchParameterMap()
298                                                        .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search"));
299                                        IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
300
301                                        SearchTaskParameters parameters = new SearchTaskParameters(
302                                                        search,
303                                                        resourceDao,
304                                                        params,
305                                                        resourceType,
306                                                        theRequestDetails,
307                                                        theRequestPartitionId,
308                                                        myOnRemoveSearchTask,
309                                                        mySyncSize);
310                                        parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
311                                        SearchContinuationTask task =
312                                                        (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters);
313                                        myIdToSearchTask.put(search.getUuid(), task);
314                                        task.call();
315                                }
316                        }
317
318                        if (!search.getStatus().isDone()) {
319                                AsyncUtil.sleep(500);
320                        }
321                }
322
323                ourLog.trace("Finished looping");
324
325                List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId);
326
327                ourLog.trace("Fetched {} results", pids.size());
328
329                return pids;
330        }
331
332        @Nonnull
333        private List<JpaPid> fetchResultPids(
334                        String theUuid,
335                        int theFrom,
336                        int theTo,
337                        @Nullable RequestDetails theRequestDetails,
338                        Search theSearch,
339                        RequestPartitionId theRequestPartitionId) {
340                List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids(
341                                theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId);
342                if (pids == null) {
343                        throw myExceptionSvc.newUnknownSearchException(theUuid);
344                }
345                return pids;
346        }
347
348        @Override
349        public IBundleProvider registerSearch(
350                        final IFhirResourceDao<?> theCallingDao,
351                        final SearchParameterMap theParams,
352                        String theResourceType,
353                        CacheControlDirective theCacheControlDirective,
354                        RequestDetails theRequestDetails,
355                        RequestPartitionId theRequestPartitionId) {
356                final String searchUuid = UUID.randomUUID().toString();
357
358                final String queryString = theParams.toNormalizedQueryString(myContext);
359                ourLog.debug("Registering new search {}", searchUuid);
360
361                Search search = new Search();
362                QueryParameterUtils.populateSearchEntity(
363                                theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId);
364
365                myStorageInterceptorHooks.callStoragePresearchRegistered(
366                                theRequestDetails, theParams, search, theRequestPartitionId);
367
368                validateSearch(theParams);
369
370                Class<? extends IBaseResource> resourceTypeClass =
371                                myContext.getResourceDefinition(theResourceType).getImplementingClass();
372                final ISearchBuilder<JpaPid> sb =
373                                mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass);
374                sb.setFetchSize(mySyncSize);
375
376                final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective);
377                boolean isOffsetQuery = theParams.isOffsetQuery();
378
379                // todo someday - not today.
380                //              SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType,
381                // theParams, theRequestDetails);
382                //              return searchStrategy.get();
383
384                if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) {
385                        if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) {
386                                ourLog.info("Search {} is using direct load strategy", searchUuid);
387                                SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy(
388                                                searchUuid, theResourceType, theParams, theRequestDetails);
389
390                                try {
391                                        return direct.get();
392
393                                } catch (ResourceNotFoundInIndexException theE) {
394                                        // some resources were not found in index, so we will inform this and resort to JPA search
395                                        ourLog.warn(
396                                                        "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search.");
397                                }
398                        }
399
400                        ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
401                        return mySynchronousSearchSvc.executeQuery(
402                                        theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
403                }
404
405                /*
406                 * See if there are any cached searches whose results we can return
407                 * instead
408                 */
409                SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS;
410                if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) {
411                        cacheStatus = SearchCacheStatusEnum.NOT_TRIED;
412                }
413
414                if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) {
415                        if (theParams.getEverythingMode() == null) {
416                                if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) {
417                                        PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(
418                                                        theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId);
419                                        if (foundSearchProvider != null) {
420                                                foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT);
421                                                return foundSearchProvider;
422                                        }
423                                }
424                        }
425                }
426
427                PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(
428                                theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search);
429                retVal.setCacheStatus(cacheStatus);
430                return retVal;
431        }
432
433        private void validateSearch(SearchParameterMap theParams) {
434                validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE);
435                validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE);
436        }
437
438        private void validateIncludes(Set<Include> includes, String name) {
439                for (Include next : includes) {
440                        String value = next.getValue();
441                        if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) {
442                                continue;
443                        }
444
445                        String paramType = next.getParamType();
446                        String paramName = next.getParamName();
447                        String paramTargetType = next.getParamTargetType();
448
449                        if (isBlank(paramType) || isBlank(paramName)) {
450                                String msg = myContext
451                                                .getLocalizer()
452                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, "");
453                                throw new InvalidRequestException(Msg.code(2018) + msg);
454                        }
455
456                        if (!myDaoRegistry.isResourceTypeSupported(paramType)) {
457                                String resourceTypeMsg = myContext
458                                                .getLocalizer()
459                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType);
460                                String msg = myContext
461                                                .getLocalizer()
462                                                .getMessage(
463                                                                SearchCoordinatorSvcImpl.class,
464                                                                "invalidInclude",
465                                                                UrlUtil.sanitizeUrlPart(name),
466                                                                UrlUtil.sanitizeUrlPart(value),
467                                                                resourceTypeMsg); // last param is pre-sanitized
468                                throw new InvalidRequestException(Msg.code(2017) + msg);
469                        }
470
471                        if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) {
472                                String resourceTypeMsg = myContext
473                                                .getLocalizer()
474                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType);
475                                String msg = myContext
476                                                .getLocalizer()
477                                                .getMessage(
478                                                                SearchCoordinatorSvcImpl.class,
479                                                                "invalidInclude",
480                                                                UrlUtil.sanitizeUrlPart(name),
481                                                                UrlUtil.sanitizeUrlPart(value),
482                                                                resourceTypeMsg); // last param is pre-sanitized
483                                throw new InvalidRequestException(Msg.code(2016) + msg);
484                        }
485
486                        if (!Constants.INCLUDE_STAR.equals(paramName)
487                                        && mySearchParamRegistry.getActiveSearchParam(paramType, paramName) == null) {
488                                List<String> validNames = mySearchParamRegistry.getActiveSearchParams(paramType).values().stream()
489                                                .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE)
490                                                .map(t -> UrlUtil.sanitizeUrlPart(t.getName()))
491                                                .sorted()
492                                                .collect(Collectors.toList());
493                                String searchParamMessage = myContext
494                                                .getLocalizer()
495                                                .getMessage(
496                                                                BaseStorageDao.class,
497                                                                "invalidSearchParameter",
498                                                                UrlUtil.sanitizeUrlPart(paramName),
499                                                                UrlUtil.sanitizeUrlPart(paramType),
500                                                                validNames);
501                                String msg = myContext
502                                                .getLocalizer()
503                                                .getMessage(
504                                                                SearchCoordinatorSvcImpl.class,
505                                                                "invalidInclude",
506                                                                UrlUtil.sanitizeUrlPart(name),
507                                                                UrlUtil.sanitizeUrlPart(value),
508                                                                searchParamMessage); // last param is pre-sanitized
509                                throw new InvalidRequestException(Msg.code(2015) + msg);
510                        }
511                }
512        }
513
514        @Override
515        public Optional<Integer> getSearchTotal(
516                        String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
517                SearchTask task = myIdToSearchTask.get(theUuid);
518                if (task != null) {
519                        return Optional.ofNullable(task.awaitInitialSync());
520                }
521
522                /*
523                 * In case there is no running search, if the total is listed as accurate we know one is coming
524                 * so let's wait a bit for it to show up
525                 */
526                Optional<Search> search = myTxService
527                                .withRequest(theRequestDetails)
528                                .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId));
529                if (search.isPresent()) {
530                        Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap();
531                        if (searchParameterMap.isPresent()
532                                        && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) {
533                                for (int i = 0; i < 10; i++) {
534                                        if (search.isPresent()) {
535                                                QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get());
536                                                if (search.get().getTotalCount() != null) {
537                                                        return Optional.of(search.get().getTotalCount());
538                                                }
539                                        }
540                                        search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId);
541                                }
542                        }
543                }
544
545                return Optional.empty();
546        }
547
548        @Nonnull
549        private PersistedJpaSearchFirstPageBundleProvider submitSearch(
550                        IDao theCallingDao,
551                        SearchParameterMap theParams,
552                        String theResourceType,
553                        RequestDetails theRequestDetails,
554                        ISearchBuilder<JpaPid> theSb,
555                        RequestPartitionId theRequestPartitionId,
556                        Search theSearch) {
557                StopWatch w = new StopWatch();
558
559                SearchTaskParameters stp = new SearchTaskParameters(
560                                theSearch,
561                                theCallingDao,
562                                theParams,
563                                theResourceType,
564                                theRequestDetails,
565                                theRequestPartitionId,
566                                myOnRemoveSearchTask,
567                                mySyncSize);
568                stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
569                SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp);
570                myIdToSearchTask.put(theSearch.getUuid(), task);
571                task.call();
572
573                PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(
574                                theRequestDetails, theSearch, task, theSb, theRequestPartitionId);
575
576                ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
577                return retVal;
578        }
579
580        @Nullable
581        private PersistedJpaBundleProvider findCachedQuery(
582                        SearchParameterMap theParams,
583                        String theResourceType,
584                        RequestDetails theRequestDetails,
585                        String theQueryString,
586                        RequestPartitionId theRequestPartitionId) {
587                // May be null
588                return myTxService
589                                .withRequest(theRequestDetails)
590                                .withRequestPartitionId(theRequestPartitionId)
591                                .execute(() -> {
592
593                                        // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
594                                        HookParams params = new HookParams()
595                                                        .add(SearchParameterMap.class, theParams)
596                                                        .add(RequestDetails.class, theRequestDetails)
597                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
598                                        Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject(
599                                                        myInterceptorBroadcaster,
600                                                        theRequestDetails,
601                                                        Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH,
602                                                        params);
603                                        if (Boolean.FALSE.equals(outcome)) {
604                                                return null;
605                                        }
606
607                                        // Check for a search matching the given hash
608                                        Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId);
609                                        if (searchToUse == null) {
610                                                return null;
611                                        }
612
613                                        ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
614                                        // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
615                                        params = new HookParams()
616                                                        .add(SearchParameterMap.class, theParams)
617                                                        .add(RequestDetails.class, theRequestDetails)
618                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
619                                        CompositeInterceptorBroadcaster.doCallHooks(
620                                                        myInterceptorBroadcaster,
621                                                        theRequestDetails,
622                                                        Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED,
623                                                        params);
624
625                                        return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
626                                });
627        }
628
629        @Nullable
630        private Search findSearchToUseOrNull(
631                        String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) {
632                // createdCutoff is in recent past
633                final Instant createdCutoff =
634                                Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
635
636                Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(
637                                theResourceType, theQueryString, createdCutoff, theRequestPartitionId);
638                return candidate.orElse(null);
639        }
640
641        @Nullable
642        private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
643                final Integer loadSynchronousUpTo;
644                if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
645                        if (theCacheControlDirective.getMaxResults() != null) {
646                                loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
647                                if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) {
648                                        throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header "
649                                                        + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed "
650                                                        + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit());
651                                }
652                        } else {
653                                loadSynchronousUpTo = 100;
654                        }
655                } else {
656                        loadSynchronousUpTo = null;
657                }
658                return loadSynchronousUpTo;
659        }
660
661        /**
662         * Creates a {@link Pageable} using a start and end index
663         */
664        @SuppressWarnings("WeakerAccess")
665        @Nullable
666        public static Pageable toPage(final int theFromIndex, int theToIndex) {
667                int pageSize = theToIndex - theFromIndex;
668                if (pageSize < 1) {
669                        return null;
670                }
671
672                int pageIndex = theFromIndex / pageSize;
673
674                return new PageRequest(pageIndex, pageSize, Sort.unsorted()) {
675                        private static final long serialVersionUID = 1L;
676
677                        @Override
678                        public long getOffset() {
679                                return theFromIndex;
680                        }
681                };
682        }
683}