001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.tasks; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.IDao; 030import ca.uhn.fhir.jpa.dao.IResultIterator; 031import ca.uhn.fhir.jpa.dao.ISearchBuilder; 032import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 033import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 034import ca.uhn.fhir.jpa.entity.Search; 035import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails; 036import ca.uhn.fhir.jpa.model.dao.JpaPid; 037import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 038import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 039import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 040import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 041import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 042import ca.uhn.fhir.jpa.util.QueryParameterUtils; 043import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator; 044import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; 045import ca.uhn.fhir.rest.api.server.RequestDetails; 046import ca.uhn.fhir.rest.server.IPagingProvider; 047import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 048import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 049import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 050import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 051import ca.uhn.fhir.system.HapiSystemProperties; 052import ca.uhn.fhir.util.AsyncUtil; 053import ca.uhn.fhir.util.StopWatch; 054import co.elastic.apm.api.ElasticApm; 055import co.elastic.apm.api.Span; 056import co.elastic.apm.api.Transaction; 057import org.apache.commons.lang3.Validate; 058import org.apache.commons.lang3.exception.ExceptionUtils; 059import org.hl7.fhir.instance.model.api.IBaseResource; 060import org.springframework.transaction.annotation.Isolation; 061import org.springframework.transaction.annotation.Propagation; 062 063import java.io.IOException; 064import java.util.ArrayList; 065import java.util.Iterator; 066import java.util.List; 067import java.util.concurrent.Callable; 068import java.util.concurrent.CountDownLatch; 069import java.util.concurrent.TimeUnit; 070import java.util.function.Consumer; 071import javax.annotation.Nonnull; 072 073import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount; 074import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount; 075import static java.util.Objects.nonNull; 076import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 077 078/** 079 * A search task is a Callable task that runs in 080 * a thread pool to handle an individual search. One instance 081 * is created for any requested search and runs from the 082 * beginning to the end of the search. 083 * <p> 084 * Understand: 085 * This class executes in its own thread separate from the 086 * web server client thread that made the request. We do that 087 * so that we can return to the client as soon as possible, 088 * but keep the search going in the background (and have 089 * the next page of results ready to go when the client asks). 090 */ 091public class SearchTask implements Callable<Void> { 092 093 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class); 094 // injected beans 095 protected final HapiTransactionService myTxService; 096 protected final FhirContext myContext; 097 protected final ISearchResultCacheSvc mySearchResultCacheSvc; 098 private final SearchParameterMap myParams; 099 private final IDao myCallingDao; 100 private final String myResourceType; 101 private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>(); 102 private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); 103 private final CountDownLatch myCompletionLatch; 104 private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>(); 105 private final RequestDetails myRequest; 106 private final RequestPartitionId myRequestPartitionId; 107 private final SearchRuntimeDetails mySearchRuntimeDetails; 108 private final Transaction myParentTransaction; 109 private final Consumer<String> myOnRemove; 110 private final int mySyncSize; 111 private final Integer myLoadingThrottleForUnitTests; 112 private final IInterceptorBroadcaster myInterceptorBroadcaster; 113 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 114 private final JpaStorageSettings myStorageSettings; 115 private final ISearchCacheSvc mySearchCacheSvc; 116 private final IPagingProvider myPagingProvider; 117 private Search mySearch; 118 private boolean myAbortRequested; 119 private int myCountSavedTotal = 0; 120 private int myCountSavedThisPass = 0; 121 private int myCountBlockedThisPass = 0; 122 private boolean myAdditionalPrefetchThresholdsRemaining; 123 private List<JpaPid> myPreviouslyAddedResourcePids; 124 private Integer myMaxResultsToFetch; 125 /** 126 * Constructor 127 */ 128 public SearchTask( 129 SearchTaskParameters theCreationParams, 130 HapiTransactionService theManagedTxManager, 131 FhirContext theContext, 132 IInterceptorBroadcaster theInterceptorBroadcaster, 133 SearchBuilderFactory theSearchBuilderFactory, 134 ISearchResultCacheSvc theSearchResultCacheSvc, 135 JpaStorageSettings theStorageSettings, 136 ISearchCacheSvc theSearchCacheSvc, 137 IPagingProvider thePagingProvider) { 138 // beans 139 myTxService = theManagedTxManager; 140 myContext = theContext; 141 myInterceptorBroadcaster = theInterceptorBroadcaster; 142 mySearchBuilderFactory = theSearchBuilderFactory; 143 mySearchResultCacheSvc = theSearchResultCacheSvc; 144 myStorageSettings = theStorageSettings; 145 mySearchCacheSvc = theSearchCacheSvc; 146 myPagingProvider = thePagingProvider; 147 148 // values 149 myOnRemove = theCreationParams.OnRemove; 150 mySearch = theCreationParams.Search; 151 myCallingDao = theCreationParams.CallingDao; 152 myParams = theCreationParams.Params; 153 myResourceType = theCreationParams.ResourceType; 154 myRequest = theCreationParams.Request; 155 myCompletionLatch = new CountDownLatch(1); 156 mySyncSize = theCreationParams.SyncSize; 157 myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests(); 158 159 mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid()); 160 mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myCallingDao.getContext())); 161 myRequestPartitionId = theCreationParams.RequestPartitionId; 162 myParentTransaction = ElasticApm.currentTransaction(); 163 } 164 165 protected RequestPartitionId getRequestPartitionId() { 166 return myRequestPartitionId; 167 } 168 169 /** 170 * This method is called by the server HTTP thread, and 171 * will block until at least one page of results have been 172 * fetched from the DB, and will never block after that. 173 */ 174 public Integer awaitInitialSync() { 175 ourLog.trace("Awaiting initial sync"); 176 do { 177 ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted()); 178 if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt( 179 getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) { 180 break; 181 } 182 } while (getSearch().getStatus() == SearchStatusEnum.LOADING); 183 ourLog.trace("Initial sync completed"); 184 185 return getSearch().getTotalCount(); 186 } 187 188 public Search getSearch() { 189 return mySearch; 190 } 191 192 public CountDownLatch getInitialCollectionLatch() { 193 return myInitialCollectionLatch; 194 } 195 196 public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) { 197 myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids; 198 myCountSavedTotal = myPreviouslyAddedResourcePids.size(); 199 } 200 201 private ISearchBuilder newSearchBuilder() { 202 Class<? extends IBaseResource> resourceTypeClass = 203 myContext.getResourceDefinition(myResourceType).getImplementingClass(); 204 return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass); 205 } 206 207 @Nonnull 208 public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) { 209 ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); 210 211 boolean keepWaiting; 212 do { 213 synchronized (mySyncedPids) { 214 ourLog.trace("Search status is {}", mySearch.getStatus()); 215 boolean haveEnoughResults = mySyncedPids.size() >= theToIndex; 216 if (!haveEnoughResults) { 217 switch (mySearch.getStatus()) { 218 case LOADING: 219 keepWaiting = true; 220 break; 221 case PASSCMPLET: 222 /* 223 * If we get here, it means that the user requested resources that crossed the 224 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the 225 * user has requested resources 0-60, then they would get 0-50 back but the search 226 * coordinator would then stop searching.SearchCoordinatorSvcImplTest 227 */ 228 keepWaiting = false; 229 break; 230 case FAILED: 231 case FINISHED: 232 case GONE: 233 default: 234 keepWaiting = false; 235 break; 236 } 237 } else { 238 keepWaiting = false; 239 } 240 } 241 242 if (keepWaiting) { 243 ourLog.info( 244 "Waiting as we only have {} results - Search status: {}", 245 mySyncedPids.size(), 246 mySearch.getStatus()); 247 AsyncUtil.sleep(500L); 248 } 249 } while (keepWaiting); 250 251 ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size()); 252 253 ArrayList<JpaPid> retVal = new ArrayList<>(); 254 synchronized (mySyncedPids) { 255 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch); 256 257 int toIndex = theToIndex; 258 if (mySyncedPids.size() < toIndex) { 259 toIndex = mySyncedPids.size(); 260 } 261 for (int i = theFromIndex; i < toIndex; i++) { 262 retVal.add(mySyncedPids.get(i)); 263 } 264 } 265 266 ourLog.trace( 267 "Done syncing results - Wanted {}-{} and returning {} of {}", 268 theFromIndex, 269 theToIndex, 270 retVal.size(), 271 mySyncedPids.size()); 272 273 return retVal; 274 } 275 276 public void saveSearch() { 277 myTxService 278 .withRequest(myRequest) 279 .withRequestPartitionId(myRequestPartitionId) 280 .withPropagation(Propagation.REQUIRES_NEW) 281 .execute(() -> doSaveSearch()); 282 } 283 284 private void saveUnsynced(final IResultIterator theResultIter) { 285 myTxService 286 .withRequest(myRequest) 287 .withRequestPartitionId(myRequestPartitionId) 288 .execute(() -> { 289 if (mySearch.getId() == null) { 290 doSaveSearch(); 291 } 292 293 ArrayList<JpaPid> unsyncedPids = myUnsyncedPids; 294 int countBlocked = 0; 295 296 // Interceptor call: STORAGE_PREACCESS_RESOURCES 297 // This can be used to remove results from the search result details before 298 // the user has a chance to know that they were in the results 299 if (mySearchRuntimeDetails.getRequestDetails() != null && unsyncedPids.isEmpty() == false) { 300 JpaPreResourceAccessDetails accessDetails = 301 new JpaPreResourceAccessDetails(unsyncedPids, () -> newSearchBuilder()); 302 HookParams params = new HookParams() 303 .add(IPreResourceAccessDetails.class, accessDetails) 304 .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails()) 305 .addIfMatchesType( 306 ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails()); 307 CompositeInterceptorBroadcaster.doCallHooks( 308 myInterceptorBroadcaster, myRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params); 309 310 for (int i = unsyncedPids.size() - 1; i >= 0; i--) { 311 if (accessDetails.isDontReturnResourceAtIndex(i)) { 312 unsyncedPids.remove(i); 313 myCountBlockedThisPass++; 314 myCountSavedTotal++; 315 countBlocked++; 316 } 317 } 318 } 319 320 // Actually store the results in the query cache storage 321 myCountSavedTotal += unsyncedPids.size(); 322 myCountSavedThisPass += unsyncedPids.size(); 323 mySearchResultCacheSvc.storeResults( 324 mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId()); 325 326 synchronized (mySyncedPids) { 327 int numSyncedThisPass = unsyncedPids.size(); 328 ourLog.trace( 329 "Syncing {} search results - Have more: {}", 330 numSyncedThisPass, 331 theResultIter.hasNext()); 332 mySyncedPids.addAll(unsyncedPids); 333 unsyncedPids.clear(); 334 335 if (theResultIter.hasNext() == false) { 336 int skippedCount = theResultIter.getSkippedCount(); 337 int nonSkippedCount = theResultIter.getNonSkippedCount(); 338 int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; 339 ourLog.trace( 340 "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", 341 myMaxResultsToFetch, 342 skippedCount, 343 myCountSavedThisPass, 344 myCountSavedTotal, 345 myAdditionalPrefetchThresholdsRemaining); 346 347 if (nonSkippedCount == 0 348 || (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch)) { 349 ourLog.trace("Setting search status to FINISHED"); 350 mySearch.setStatus(SearchStatusEnum.FINISHED); 351 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 352 } else if (myAdditionalPrefetchThresholdsRemaining) { 353 ourLog.trace("Setting search status to PASSCMPLET"); 354 mySearch.setStatus(SearchStatusEnum.PASSCMPLET); 355 mySearch.setSearchParameterMap(myParams); 356 } else { 357 ourLog.trace("Setting search status to FINISHED"); 358 mySearch.setStatus(SearchStatusEnum.FINISHED); 359 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 360 } 361 } 362 } 363 364 mySearch.setNumFound(myCountSavedTotal); 365 mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked); 366 367 int numSynced; 368 synchronized (mySyncedPids) { 369 numSynced = mySyncedPids.size(); 370 } 371 372 if (myStorageSettings.getCountSearchResultsUpTo() == null 373 || myStorageSettings.getCountSearchResultsUpTo() <= 0 374 || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) { 375 myInitialCollectionLatch.countDown(); 376 } 377 378 doSaveSearch(); 379 380 ourLog.trace("saveUnsynced() - pre-commit"); 381 }); 382 ourLog.trace("saveUnsynced() - post-commit"); 383 } 384 385 public boolean isNotAborted() { 386 return myAbortRequested == false; 387 } 388 389 public void markComplete() { 390 myCompletionLatch.countDown(); 391 } 392 393 public CountDownLatch getCompletionLatch() { 394 return myCompletionLatch; 395 } 396 397 /** 398 * Request that the task abort as soon as possible 399 */ 400 public void requestImmediateAbort() { 401 myAbortRequested = true; 402 } 403 404 /** 405 * This is the method which actually performs the search. 406 * It is called automatically by the thread pool. 407 */ 408 @Override 409 public Void call() { 410 StopWatch sw = new StopWatch(); 411 Span span = myParentTransaction.startSpan("db", "query", "search"); 412 span.setName("FHIR Database Search"); 413 try { 414 // Create an initial search in the DB and give it an ID 415 saveSearch(); 416 417 myTxService 418 .withRequest(myRequest) 419 .withRequestPartitionId(myRequestPartitionId) 420 .withIsolation(Isolation.READ_COMMITTED) 421 .execute(() -> doSearch()); 422 423 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 424 if (mySearch.getStatus() == SearchStatusEnum.FINISHED) { 425 HookParams params = new HookParams() 426 .add(RequestDetails.class, myRequest) 427 .addIfMatchesType(ServletRequestDetails.class, myRequest) 428 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 429 CompositeInterceptorBroadcaster.doCallHooks( 430 myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params); 431 } else { 432 HookParams params = new HookParams() 433 .add(RequestDetails.class, myRequest) 434 .addIfMatchesType(ServletRequestDetails.class, myRequest) 435 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 436 CompositeInterceptorBroadcaster.doCallHooks( 437 myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params); 438 } 439 440 ourLog.trace( 441 "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", 442 mySearch.getResourceType(), 443 mySearch.getSearchQueryString(), 444 mySyncedPids.size(), 445 sw.getMillis(), 446 mySearch.getStatus()); 447 448 } catch (Throwable t) { 449 450 /* 451 * Don't print a stack trace for client errors (i.e. requests that 452 * aren't valid because the client screwed up).. that's just noise 453 * in the logs and who needs that. 454 */ 455 boolean logged = false; 456 if (t instanceof BaseServerResponseException) { 457 BaseServerResponseException exception = (BaseServerResponseException) t; 458 if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) { 459 logged = true; 460 ourLog.warn("Failed during search due to invalid request: {}", t.toString()); 461 } 462 } 463 464 if (!logged) { 465 ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t); 466 } 467 myUnsyncedPids.clear(); 468 Throwable rootCause = ExceptionUtils.getRootCause(t); 469 rootCause = defaultIfNull(rootCause, t); 470 471 String failureMessage = rootCause.getMessage(); 472 473 int failureCode = InternalErrorException.STATUS_CODE; 474 if (t instanceof BaseServerResponseException) { 475 failureCode = ((BaseServerResponseException) t).getStatusCode(); 476 } 477 478 if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) { 479 failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause); 480 } 481 482 mySearch.setFailureMessage(failureMessage); 483 mySearch.setFailureCode(failureCode); 484 mySearch.setStatus(SearchStatusEnum.FAILED); 485 486 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 487 HookParams params = new HookParams() 488 .add(RequestDetails.class, myRequest) 489 .addIfMatchesType(ServletRequestDetails.class, myRequest) 490 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 491 CompositeInterceptorBroadcaster.doCallHooks( 492 myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params); 493 494 saveSearch(); 495 span.captureException(t); 496 } finally { 497 myOnRemove.accept(mySearch.getUuid()); 498 499 myInitialCollectionLatch.countDown(); 500 markComplete(); 501 span.end(); 502 } 503 return null; 504 } 505 506 private void doSaveSearch() { 507 Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId); 508 509 // mySearchDao.save is not supposed to return null, but in unit tests 510 // it can if the mock search dao isn't set up to handle that 511 if (newSearch != null) { 512 mySearch = newSearch; 513 } 514 } 515 516 /** 517 * This method actually creates the database query to perform the 518 * search, and starts it. 519 */ 520 private void doSearch() { 521 /* 522 * If the user has explicitly requested a _count, perform a 523 * 524 * SELECT COUNT(*) .... 525 * 526 * before doing anything else. 527 */ 528 boolean myParamWantOnlyCount = isWantOnlyCount(myParams); 529 boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) 530 ? isWantCount(myParams) 531 : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode()); 532 533 if (myParamWantOnlyCount || myParamOrDefaultWantCount) { 534 ourLog.trace("Performing count"); 535 ISearchBuilder sb = newSearchBuilder(); 536 537 /* 538 * createCountQuery 539 * NB: (see createQuery below) 540 * Because FulltextSearchSvcImpl will (internally) 541 * mutate the myParams (searchmap), 542 * (specifically removing the _content and _text filters) 543 * we will have to clone those parameters here so that 544 * the "correct" params are used in createQuery below 545 */ 546 Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); 547 548 ourLog.trace("Got count {}", count); 549 550 myTxService 551 .withRequest(myRequest) 552 .withRequestPartitionId(myRequestPartitionId) 553 .execute(() -> { 554 mySearch.setTotalCount(count.intValue()); 555 if (myParamWantOnlyCount) { 556 mySearch.setStatus(SearchStatusEnum.FINISHED); 557 } 558 doSaveSearch(); 559 }); 560 if (myParamWantOnlyCount) { 561 return; 562 } 563 } 564 565 ourLog.trace("Done count"); 566 ISearchBuilder sb = newSearchBuilder(); 567 568 /* 569 * Figure out how many results we're actually going to fetch from the 570 * database in this pass. This calculation takes into consideration the 571 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds() 572 * as well as the value of the _count parameter. 573 */ 574 int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0); 575 int minWanted = 0; 576 if (myParams.getCount() != null) { 577 minWanted = myParams.getCount() + 1; // Always fetch one past this page, so we know if there is a next page. 578 minWanted = Math.min(minWanted, myPagingProvider.getMaximumPageSize()); 579 minWanted += currentlyLoaded; 580 } 581 582 for (Iterator<Integer> iter = 583 myStorageSettings.getSearchPreFetchThresholds().iterator(); 584 iter.hasNext(); ) { 585 int next = iter.next(); 586 if (next != -1 && next <= currentlyLoaded) { 587 continue; 588 } 589 590 if (next == -1) { 591 sb.setMaxResultsToFetch(null); 592 } else { 593 myMaxResultsToFetch = Math.max(next, minWanted); 594 sb.setMaxResultsToFetch(myMaxResultsToFetch); 595 } 596 597 if (iter.hasNext()) { 598 myAdditionalPrefetchThresholdsRemaining = true; 599 } 600 601 // If we get here's we've found an appropriate threshold 602 break; 603 } 604 605 /* 606 * Provide any PID we loaded in previous search passes to the 607 * SearchBuilder so that we don't get duplicates coming from running 608 * the same query again. 609 * 610 * We could possibly accomplish this in a different way by using sorted 611 * results in our SQL query and specifying an offset. I don't actually 612 * know if that would be faster or not. At some point should test this 613 * idea. 614 */ 615 if (myPreviouslyAddedResourcePids != null) { 616 sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids); 617 mySyncedPids.addAll(myPreviouslyAddedResourcePids); 618 } 619 620 /* 621 * createQuery 622 * Construct the SQL query we'll be sending to the database 623 * 624 * NB: (See createCountQuery above) 625 * We will pass the original myParams here (not a copy) 626 * because we actually _want_ the mutation of the myParams to happen. 627 * Specifically because SearchBuilder itself will _expect_ 628 * not to have these parameters when dumping back 629 * to our DB. 630 * 631 * This is an odd implementation behaviour, but the change 632 * for this will require a lot more handling at higher levels 633 */ 634 try (IResultIterator<JpaPid> resultIterator = 635 sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) { 636 assert (resultIterator != null); 637 638 /* 639 * The following loop actually loads the PIDs of the resources 640 * matching the search off of the disk and into memory. After 641 * every X results, we commit to the HFJ_SEARCH table. 642 */ 643 int syncSize = mySyncSize; 644 while (resultIterator.hasNext()) { 645 myUnsyncedPids.add(resultIterator.next()); 646 647 boolean shouldSync = myUnsyncedPids.size() >= syncSize; 648 649 if (myStorageSettings.getCountSearchResultsUpTo() != null 650 && myStorageSettings.getCountSearchResultsUpTo() > 0 651 && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) { 652 shouldSync = false; 653 } 654 655 if (myUnsyncedPids.size() > 50000) { 656 shouldSync = true; 657 } 658 659 // If no abort was requested, bail out 660 Validate.isTrue(isNotAborted(), "Abort has been requested"); 661 662 if (shouldSync) { 663 saveUnsynced(resultIterator); 664 } 665 666 if (myLoadingThrottleForUnitTests != null) { 667 AsyncUtil.sleep(myLoadingThrottleForUnitTests); 668 } 669 } 670 671 // If no abort was requested, bail out 672 Validate.isTrue(isNotAborted(), "Abort has been requested"); 673 674 saveUnsynced(resultIterator); 675 676 } catch (IOException e) { 677 ourLog.error("IO failure during database access", e); 678 throw new InternalErrorException(Msg.code(1166) + e); 679 } 680 } 681}