001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.Include; 052import ca.uhn.fhir.rest.api.CacheControlDirective; 053import ca.uhn.fhir.rest.api.Constants; 054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 055import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 056import ca.uhn.fhir.rest.api.server.IBundleProvider; 057import ca.uhn.fhir.rest.api.server.RequestDetails; 058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 063import ca.uhn.fhir.util.AsyncUtil; 064import ca.uhn.fhir.util.StopWatch; 065import ca.uhn.fhir.util.UrlUtil; 066import com.google.common.annotations.VisibleForTesting; 067import org.apache.commons.lang3.time.DateUtils; 068import org.hl7.fhir.instance.model.api.IBaseResource; 069import org.springframework.beans.factory.BeanFactory; 070import org.springframework.data.domain.PageRequest; 071import org.springframework.data.domain.Pageable; 072import org.springframework.data.domain.Sort; 073import org.springframework.stereotype.Component; 074import org.springframework.transaction.support.TransactionSynchronizationManager; 075 076import java.time.Instant; 077import java.time.temporal.ChronoUnit; 078import java.util.List; 079import java.util.Optional; 080import java.util.Set; 081import java.util.UUID; 082import java.util.concurrent.Callable; 083import java.util.concurrent.ConcurrentHashMap; 084import java.util.concurrent.TimeUnit; 085import java.util.function.Consumer; 086import java.util.stream.Collectors; 087import javax.annotation.Nonnull; 088import javax.annotation.Nullable; 089 090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 091import static org.apache.commons.lang3.StringUtils.isBlank; 092import static org.apache.commons.lang3.StringUtils.isNotBlank; 093 094@Component("mySearchCoordinatorSvc") 095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 096 097 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 098 099 private final FhirContext myContext; 100 private final JpaStorageSettings myStorageSettings; 101 private final IInterceptorBroadcaster myInterceptorBroadcaster; 102 private final HapiTransactionService myTxService; 103 private final ISearchCacheSvc mySearchCacheSvc; 104 private final ISearchResultCacheSvc mySearchResultCacheSvc; 105 private final DaoRegistry myDaoRegistry; 106 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 107 private final ISynchronousSearchSvc mySynchronousSearchSvc; 108 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 109 private final ISearchParamRegistry mySearchParamRegistry; 110 private final SearchStrategyFactory mySearchStrategyFactory; 111 private final ExceptionService myExceptionSvc; 112 private final BeanFactory myBeanFactory; 113 private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 114 115 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 116 117 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 118 private Integer myLoadingThrottleForUnitTests = null; 119 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 120 private boolean myNeverUseLocalSearchForUnitTests; 121 private int mySyncSize = DEFAULT_SYNC_SIZE; 122 123 /** 124 * Constructor 125 */ 126 public SearchCoordinatorSvcImpl( 127 FhirContext theContext, 128 JpaStorageSettings theStorageSettings, 129 IInterceptorBroadcaster theInterceptorBroadcaster, 130 HapiTransactionService theTxService, 131 ISearchCacheSvc theSearchCacheSvc, 132 ISearchResultCacheSvc theSearchResultCacheSvc, 133 DaoRegistry theDaoRegistry, 134 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 135 ISynchronousSearchSvc theSynchronousSearchSvc, 136 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 137 ISearchParamRegistry theSearchParamRegistry, 138 SearchStrategyFactory theSearchStrategyFactory, 139 ExceptionService theExceptionSvc, 140 BeanFactory theBeanFactory) { 141 super(); 142 myContext = theContext; 143 myStorageSettings = theStorageSettings; 144 myInterceptorBroadcaster = theInterceptorBroadcaster; 145 myTxService = theTxService; 146 mySearchCacheSvc = theSearchCacheSvc; 147 mySearchResultCacheSvc = theSearchResultCacheSvc; 148 myDaoRegistry = theDaoRegistry; 149 mySearchBuilderFactory = theSearchBuilderFactory; 150 mySynchronousSearchSvc = theSynchronousSearchSvc; 151 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 152 mySearchParamRegistry = theSearchParamRegistry; 153 mySearchStrategyFactory = theSearchStrategyFactory; 154 myExceptionSvc = theExceptionSvc; 155 myBeanFactory = theBeanFactory; 156 157 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 158 } 159 160 @VisibleForTesting 161 Set<String> getActiveSearchIds() { 162 return myIdToSearchTask.keySet(); 163 } 164 165 @VisibleForTesting 166 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 167 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 168 } 169 170 @VisibleForTesting 171 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 172 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 173 } 174 175 @VisibleForTesting 176 public void setSyncSizeForUnitTests(int theSyncSize) { 177 mySyncSize = theSyncSize; 178 } 179 180 @Override 181 public void cancelAllActiveSearches() { 182 for (SearchTask next : myIdToSearchTask.values()) { 183 ourLog.info( 184 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 185 next.requestImmediateAbort(); 186 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 187 } 188 } 189 190 @SuppressWarnings("SameParameterValue") 191 @VisibleForTesting 192 void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 193 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 194 } 195 196 /** 197 * This method is called by the HTTP client processing thread in order to 198 * fetch resources. 199 * <p> 200 * This method must not be called from inside a transaction. The rationale is that 201 * the {@link Search} entity is treated as a piece of shared state across client threads 202 * accessing the same thread, so we need to be able to update that table in a transaction 203 * and commit it right away in order for that to work. Examples of needing to do this 204 * include if two different clients request the same search and are both paging at the 205 * same time, but also includes clients that are hacking the paging links to 206 * fetch multiple pages of a search result in parallel. In both cases we need to only 207 * let one of them actually activate the search, or we will have conficts. The other thread 208 * just needs to wait until the first one actually fetches more results. 209 */ 210 @Override 211 public List<JpaPid> getResources( 212 final String theUuid, 213 int theFrom, 214 int theTo, 215 @Nullable RequestDetails theRequestDetails, 216 RequestPartitionId theRequestPartitionId) { 217 assert !TransactionSynchronizationManager.isActualTransactionActive(); 218 219 // If we're actively searching right now, don't try to do anything until at least one batch has been 220 // persisted in the DB 221 SearchTask searchTask = myIdToSearchTask.get(theUuid); 222 if (searchTask != null) { 223 searchTask.awaitInitialSync(); 224 } 225 226 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 227 228 Search search; 229 StopWatch sw = new StopWatch(); 230 while (true) { 231 232 if (myNeverUseLocalSearchForUnitTests == false) { 233 if (searchTask != null) { 234 ourLog.trace("Local search found"); 235 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 236 ourLog.trace( 237 "Local search returned {} pids, wanted {}-{} - Search: {}", 238 resourcePids.size(), 239 theFrom, 240 theTo, 241 searchTask.getSearch()); 242 243 /* 244 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 245 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 246 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 247 * task. 248 */ 249 if ((searchTask.getSearch().getNumFound() 250 - searchTask.getSearch().getNumBlocked()) 251 >= theTo 252 || resourcePids.size() == (theTo - theFrom)) { 253 return resourcePids; 254 } 255 } 256 } 257 258 Callable<Search> searchCallback = () -> mySearchCacheSvc 259 .fetchByUuid(theUuid, theRequestPartitionId) 260 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 261 search = myTxService 262 .withRequest(theRequestDetails) 263 .withRequestPartitionId(theRequestPartitionId) 264 .execute(searchCallback); 265 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 266 267 if (search.getStatus() == SearchStatusEnum.FINISHED) { 268 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 269 break; 270 } 271 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 272 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 273 break; 274 } 275 276 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 277 ourLog.error( 278 "Search {} of type {} for {}{} timed out after {}ms", 279 search.getId(), 280 search.getSearchType(), 281 search.getResourceType(), 282 search.getSearchQueryString(), 283 sw.getMillis()); 284 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 285 } 286 287 // If the search was saved in "pass complete mode" it's probably time to 288 // start a new pass 289 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 290 ourLog.trace("Going to try to start next search"); 291 Optional<Search> newSearch = 292 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 293 if (newSearch.isPresent()) { 294 ourLog.trace("Launching new search"); 295 search = newSearch.get(); 296 String resourceType = search.getResourceType(); 297 SearchParameterMap params = search.getSearchParameterMap() 298 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 299 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 300 301 SearchTaskParameters parameters = new SearchTaskParameters( 302 search, 303 resourceDao, 304 params, 305 resourceType, 306 theRequestDetails, 307 theRequestPartitionId, 308 myOnRemoveSearchTask, 309 mySyncSize); 310 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 311 SearchContinuationTask task = 312 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 313 myIdToSearchTask.put(search.getUuid(), task); 314 task.call(); 315 } 316 } 317 318 if (!search.getStatus().isDone()) { 319 AsyncUtil.sleep(500); 320 } 321 } 322 323 ourLog.trace("Finished looping"); 324 325 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 326 327 ourLog.trace("Fetched {} results", pids.size()); 328 329 return pids; 330 } 331 332 @Nonnull 333 private List<JpaPid> fetchResultPids( 334 String theUuid, 335 int theFrom, 336 int theTo, 337 @Nullable RequestDetails theRequestDetails, 338 Search theSearch, 339 RequestPartitionId theRequestPartitionId) { 340 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 341 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 342 if (pids == null) { 343 throw myExceptionSvc.newUnknownSearchException(theUuid); 344 } 345 return pids; 346 } 347 348 @Override 349 public IBundleProvider registerSearch( 350 final IFhirResourceDao<?> theCallingDao, 351 final SearchParameterMap theParams, 352 String theResourceType, 353 CacheControlDirective theCacheControlDirective, 354 RequestDetails theRequestDetails, 355 RequestPartitionId theRequestPartitionId) { 356 final String searchUuid = UUID.randomUUID().toString(); 357 358 final String queryString = theParams.toNormalizedQueryString(myContext); 359 ourLog.debug("Registering new search {}", searchUuid); 360 361 Search search = new Search(); 362 QueryParameterUtils.populateSearchEntity( 363 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 364 365 myStorageInterceptorHooks.callStoragePresearchRegistered( 366 theRequestDetails, theParams, search, theRequestPartitionId); 367 368 validateSearch(theParams); 369 370 Class<? extends IBaseResource> resourceTypeClass = 371 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 372 final ISearchBuilder<JpaPid> sb = 373 mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass); 374 sb.setFetchSize(mySyncSize); 375 376 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 377 boolean isOffsetQuery = theParams.isOffsetQuery(); 378 379 // todo someday - not today. 380 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 381 // theParams, theRequestDetails); 382 // return searchStrategy.get(); 383 384 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 385 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 386 ourLog.info("Search {} is using direct load strategy", searchUuid); 387 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 388 searchUuid, theResourceType, theParams, theRequestDetails); 389 390 try { 391 return direct.get(); 392 393 } catch (ResourceNotFoundInIndexException theE) { 394 // some resources were not found in index, so we will inform this and resort to JPA search 395 ourLog.warn( 396 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 397 } 398 } 399 400 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 401 return mySynchronousSearchSvc.executeQuery( 402 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 403 } 404 405 /* 406 * See if there are any cached searches whose results we can return 407 * instead 408 */ 409 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 410 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 411 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 412 } 413 414 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 415 if (theParams.getEverythingMode() == null) { 416 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 417 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 418 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 419 if (foundSearchProvider != null) { 420 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 421 return foundSearchProvider; 422 } 423 } 424 } 425 } 426 427 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 428 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 429 retVal.setCacheStatus(cacheStatus); 430 return retVal; 431 } 432 433 private void validateSearch(SearchParameterMap theParams) { 434 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 435 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 436 } 437 438 private void validateIncludes(Set<Include> includes, String name) { 439 for (Include next : includes) { 440 String value = next.getValue(); 441 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 442 continue; 443 } 444 445 String paramType = next.getParamType(); 446 String paramName = next.getParamName(); 447 String paramTargetType = next.getParamTargetType(); 448 449 if (isBlank(paramType) || isBlank(paramName)) { 450 String msg = myContext 451 .getLocalizer() 452 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 453 throw new InvalidRequestException(Msg.code(2018) + msg); 454 } 455 456 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 457 String resourceTypeMsg = myContext 458 .getLocalizer() 459 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 460 String msg = myContext 461 .getLocalizer() 462 .getMessage( 463 SearchCoordinatorSvcImpl.class, 464 "invalidInclude", 465 UrlUtil.sanitizeUrlPart(name), 466 UrlUtil.sanitizeUrlPart(value), 467 resourceTypeMsg); // last param is pre-sanitized 468 throw new InvalidRequestException(Msg.code(2017) + msg); 469 } 470 471 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 472 String resourceTypeMsg = myContext 473 .getLocalizer() 474 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 475 String msg = myContext 476 .getLocalizer() 477 .getMessage( 478 SearchCoordinatorSvcImpl.class, 479 "invalidInclude", 480 UrlUtil.sanitizeUrlPart(name), 481 UrlUtil.sanitizeUrlPart(value), 482 resourceTypeMsg); // last param is pre-sanitized 483 throw new InvalidRequestException(Msg.code(2016) + msg); 484 } 485 486 if (!Constants.INCLUDE_STAR.equals(paramName) 487 && mySearchParamRegistry.getActiveSearchParam(paramType, paramName) == null) { 488 List<String> validNames = mySearchParamRegistry.getActiveSearchParams(paramType).values().stream() 489 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 490 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 491 .sorted() 492 .collect(Collectors.toList()); 493 String searchParamMessage = myContext 494 .getLocalizer() 495 .getMessage( 496 BaseStorageDao.class, 497 "invalidSearchParameter", 498 UrlUtil.sanitizeUrlPart(paramName), 499 UrlUtil.sanitizeUrlPart(paramType), 500 validNames); 501 String msg = myContext 502 .getLocalizer() 503 .getMessage( 504 SearchCoordinatorSvcImpl.class, 505 "invalidInclude", 506 UrlUtil.sanitizeUrlPart(name), 507 UrlUtil.sanitizeUrlPart(value), 508 searchParamMessage); // last param is pre-sanitized 509 throw new InvalidRequestException(Msg.code(2015) + msg); 510 } 511 } 512 } 513 514 @Override 515 public Optional<Integer> getSearchTotal( 516 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 517 SearchTask task = myIdToSearchTask.get(theUuid); 518 if (task != null) { 519 return Optional.ofNullable(task.awaitInitialSync()); 520 } 521 522 /* 523 * In case there is no running search, if the total is listed as accurate we know one is coming 524 * so let's wait a bit for it to show up 525 */ 526 Optional<Search> search = myTxService 527 .withRequest(theRequestDetails) 528 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 529 if (search.isPresent()) { 530 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 531 if (searchParameterMap.isPresent() 532 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 533 for (int i = 0; i < 10; i++) { 534 if (search.isPresent()) { 535 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 536 if (search.get().getTotalCount() != null) { 537 return Optional.of(search.get().getTotalCount()); 538 } 539 } 540 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 541 } 542 } 543 } 544 545 return Optional.empty(); 546 } 547 548 @Nonnull 549 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 550 IDao theCallingDao, 551 SearchParameterMap theParams, 552 String theResourceType, 553 RequestDetails theRequestDetails, 554 ISearchBuilder<JpaPid> theSb, 555 RequestPartitionId theRequestPartitionId, 556 Search theSearch) { 557 StopWatch w = new StopWatch(); 558 559 SearchTaskParameters stp = new SearchTaskParameters( 560 theSearch, 561 theCallingDao, 562 theParams, 563 theResourceType, 564 theRequestDetails, 565 theRequestPartitionId, 566 myOnRemoveSearchTask, 567 mySyncSize); 568 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 569 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 570 myIdToSearchTask.put(theSearch.getUuid(), task); 571 task.call(); 572 573 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 574 theRequestDetails, theSearch, task, theSb, theRequestPartitionId); 575 576 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 577 return retVal; 578 } 579 580 @Nullable 581 private PersistedJpaBundleProvider findCachedQuery( 582 SearchParameterMap theParams, 583 String theResourceType, 584 RequestDetails theRequestDetails, 585 String theQueryString, 586 RequestPartitionId theRequestPartitionId) { 587 // May be null 588 return myTxService 589 .withRequest(theRequestDetails) 590 .withRequestPartitionId(theRequestPartitionId) 591 .execute(() -> { 592 593 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 594 HookParams params = new HookParams() 595 .add(SearchParameterMap.class, theParams) 596 .add(RequestDetails.class, theRequestDetails) 597 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 598 Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject( 599 myInterceptorBroadcaster, 600 theRequestDetails, 601 Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, 602 params); 603 if (Boolean.FALSE.equals(outcome)) { 604 return null; 605 } 606 607 // Check for a search matching the given hash 608 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 609 if (searchToUse == null) { 610 return null; 611 } 612 613 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 614 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 615 params = new HookParams() 616 .add(SearchParameterMap.class, theParams) 617 .add(RequestDetails.class, theRequestDetails) 618 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 619 CompositeInterceptorBroadcaster.doCallHooks( 620 myInterceptorBroadcaster, 621 theRequestDetails, 622 Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, 623 params); 624 625 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 626 }); 627 } 628 629 @Nullable 630 private Search findSearchToUseOrNull( 631 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 632 // createdCutoff is in recent past 633 final Instant createdCutoff = 634 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 635 636 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 637 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 638 return candidate.orElse(null); 639 } 640 641 @Nullable 642 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 643 final Integer loadSynchronousUpTo; 644 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 645 if (theCacheControlDirective.getMaxResults() != null) { 646 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 647 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 648 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 649 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 650 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 651 } 652 } else { 653 loadSynchronousUpTo = 100; 654 } 655 } else { 656 loadSynchronousUpTo = null; 657 } 658 return loadSynchronousUpTo; 659 } 660 661 /** 662 * Creates a {@link Pageable} using a start and end index 663 */ 664 @SuppressWarnings("WeakerAccess") 665 @Nullable 666 public static Pageable toPage(final int theFromIndex, int theToIndex) { 667 int pageSize = theToIndex - theFromIndex; 668 if (pageSize < 1) { 669 return null; 670 } 671 672 int pageIndex = theFromIndex / pageSize; 673 674 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 675 private static final long serialVersionUID = 1L; 676 677 @Override 678 public long getOffset() { 679 return theFromIndex; 680 } 681 }; 682 } 683}