001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.reindex; 021 022import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; 028import ca.uhn.fhir.jpa.dao.data.IForcedIdDao; 029import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao; 030import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; 031import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity; 032import ca.uhn.fhir.jpa.model.entity.ResourceTable; 033import ca.uhn.fhir.jpa.model.sched.HapiJob; 034import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 035import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 036import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 037import ca.uhn.fhir.parser.DataFormatException; 038import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 039import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; 040import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 041import ca.uhn.fhir.util.StopWatch; 042import com.google.common.annotations.VisibleForTesting; 043import org.apache.commons.lang3.Validate; 044import org.apache.commons.lang3.concurrent.BasicThreadFactory; 045import org.apache.commons.lang3.time.DateUtils; 046import org.hl7.fhir.r4.model.InstantType; 047import org.quartz.JobExecutionContext; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050import org.springframework.beans.factory.annotation.Autowired; 051import org.springframework.data.domain.PageRequest; 052import org.springframework.data.domain.Slice; 053import org.springframework.transaction.PlatformTransactionManager; 054import org.springframework.transaction.TransactionDefinition; 055import org.springframework.transaction.annotation.Propagation; 056import org.springframework.transaction.annotation.Transactional; 057import org.springframework.transaction.support.TransactionCallback; 058import org.springframework.transaction.support.TransactionTemplate; 059 060import java.util.Collection; 061import java.util.Date; 062import java.util.List; 063import java.util.concurrent.Callable; 064import java.util.concurrent.Future; 065import java.util.concurrent.LinkedBlockingQueue; 066import java.util.concurrent.RejectedExecutionHandler; 067import java.util.concurrent.ThreadFactory; 068import java.util.concurrent.ThreadPoolExecutor; 069import java.util.concurrent.TimeUnit; 070import java.util.concurrent.atomic.AtomicInteger; 071import java.util.concurrent.locks.ReentrantLock; 072import java.util.stream.Collectors; 073import javax.annotation.Nullable; 074import javax.annotation.PostConstruct; 075import javax.persistence.EntityManager; 076import javax.persistence.PersistenceContext; 077import javax.persistence.PersistenceContextType; 078import javax.persistence.Query; 079 080import static org.apache.commons.lang3.StringUtils.isNotBlank; 081 082/** 083 * @see ca.uhn.fhir.jpa.reindex.job.ReindexJobConfig 084 * @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead. 085 */ 086@Deprecated 087public class ResourceReindexingSvcImpl implements IResourceReindexingSvc, IHasScheduledJobs { 088 089 private static final Date BEGINNING_OF_TIME = new Date(0); 090 private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class); 091 private static final int PASS_SIZE = 25000; 092 private final ReentrantLock myIndexingLock = new ReentrantLock(); 093 094 @Autowired 095 private IResourceReindexJobDao myReindexJobDao; 096 097 @Autowired 098 private JpaStorageSettings myStorageSettings; 099 100 @Autowired 101 private PlatformTransactionManager myTxManager; 102 103 private TransactionTemplate myTxTemplate; 104 private final ThreadFactory myReindexingThreadFactory = 105 new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build(); 106 private ThreadPoolExecutor myTaskExecutor; 107 108 @Autowired 109 private IResourceTableDao myResourceTableDao; 110 111 @Autowired 112 private DaoRegistry myDaoRegistry; 113 114 @Autowired 115 private IForcedIdDao myForcedIdDao; 116 117 @Autowired 118 private FhirContext myContext; 119 120 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 121 private EntityManager myEntityManager; 122 123 @Autowired 124 private ISearchParamRegistry mySearchParamRegistry; 125 126 @Autowired 127 private ResourceReindexer myResourceReindexer; 128 129 @VisibleForTesting 130 void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) { 131 myStorageSettings = theStorageSettings; 132 } 133 134 @VisibleForTesting 135 void setContextForUnitTest(FhirContext theContext) { 136 myContext = theContext; 137 } 138 139 @PostConstruct 140 public void start() { 141 myTxTemplate = new TransactionTemplate(myTxManager); 142 initExecutor(); 143 } 144 145 public void initExecutor() { 146 // Create the threadpool executor used for reindex jobs 147 int reindexThreadCount = myStorageSettings.getReindexThreadCount(); 148 RejectedExecutionHandler rejectHandler = new BlockPolicy(); 149 myTaskExecutor = new ThreadPoolExecutor( 150 0, 151 reindexThreadCount, 152 0L, 153 TimeUnit.MILLISECONDS, 154 new LinkedBlockingQueue<>(100), 155 myReindexingThreadFactory, 156 rejectHandler); 157 } 158 159 @Override 160 public void scheduleJobs(ISchedulerService theSchedulerService) { 161 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 162 jobDetail.setId(getClass().getName()); 163 jobDetail.setJobClass(Job.class); 164 theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); 165 } 166 167 @Override 168 @Transactional(propagation = Propagation.REQUIRED) 169 public Long markAllResourcesForReindexing() { 170 return markAllResourcesForReindexing(null); 171 } 172 173 @Override 174 @Transactional(propagation = Propagation.REQUIRED) 175 public Long markAllResourcesForReindexing(String theType) { 176 177 String typeDesc; 178 if (isNotBlank(theType)) { 179 try { 180 myContext.getResourceType(theType); 181 } catch (DataFormatException e) { 182 throw new InvalidRequestException(Msg.code(1170) + "Unknown resource type: " + theType); 183 } 184 myReindexJobDao.markAllOfTypeAsDeleted(theType); 185 typeDesc = theType; 186 } else { 187 myReindexJobDao.markAllOfTypeAsDeleted(); 188 typeDesc = "(any)"; 189 } 190 191 ResourceReindexJobEntity job = new ResourceReindexJobEntity(); 192 job.setResourceType(theType); 193 job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5)); 194 job = myReindexJobDao.saveAndFlush(job); 195 196 ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId()); 197 return job.getId(); 198 } 199 200 public static class Job implements HapiJob { 201 @Autowired 202 private IResourceReindexingSvc myTarget; 203 204 @Override 205 public void execute(JobExecutionContext theContext) { 206 myTarget.runReindexingPass(); 207 } 208 } 209 210 @VisibleForTesting 211 ReentrantLock getIndexingLockForUnitTest() { 212 return myIndexingLock; 213 } 214 215 @Override 216 @Transactional(propagation = Propagation.NEVER) 217 public Integer runReindexingPass() { 218 if (myStorageSettings.isSchedulingDisabled() || !myStorageSettings.isEnableTaskPreExpandValueSets()) { 219 return null; 220 } 221 if (myIndexingLock.tryLock()) { 222 try { 223 return doReindexingPassInsideLock(); 224 } finally { 225 myIndexingLock.unlock(); 226 } 227 } 228 return null; 229 } 230 231 private int doReindexingPassInsideLock() { 232 expungeJobsMarkedAsDeleted(); 233 return runReindexJobs(); 234 } 235 236 @Override 237 public int forceReindexingPass() { 238 myIndexingLock.lock(); 239 try { 240 return doReindexingPassInsideLock(); 241 } finally { 242 myIndexingLock.unlock(); 243 } 244 } 245 246 @Override 247 public void cancelAndPurgeAllJobs() { 248 ourLog.info("Cancelling and purging all resource reindexing jobs"); 249 myIndexingLock.lock(); 250 try { 251 myTxTemplate.execute(t -> { 252 myReindexJobDao.markAllOfTypeAsDeleted(); 253 return null; 254 }); 255 256 myTaskExecutor.shutdown(); 257 initExecutor(); 258 259 expungeJobsMarkedAsDeleted(); 260 } finally { 261 myIndexingLock.unlock(); 262 } 263 } 264 265 private int runReindexJobs() { 266 Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities(); 267 268 if (jobs.size() > 0) { 269 ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs); 270 } else { 271 ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs); 272 return 0; 273 } 274 275 int count = 0; 276 for (ResourceReindexJobEntity next : jobs) { 277 278 if (next.getThresholdLow() != null 279 && next.getThresholdLow().getTime() 280 >= next.getThresholdHigh().getTime()) { 281 markJobAsDeleted(next); 282 continue; 283 } 284 285 count += runReindexJob(next); 286 } 287 return count; 288 } 289 290 @Override 291 public int countReindexJobs() { 292 return getResourceReindexJobEntities().size(); 293 } 294 295 private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() { 296 Collection<ResourceReindexJobEntity> jobs = 297 myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false)); 298 assert jobs != null; 299 return jobs; 300 } 301 302 private void markJobAsDeleted(ResourceReindexJobEntity theJob) { 303 ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId()); 304 myTxTemplate.execute(t -> { 305 myReindexJobDao.markAsDeletedById(theJob.getId()); 306 return null; 307 }); 308 } 309 310 @VisibleForTesting 311 public void setResourceReindexerForUnitTest(ResourceReindexer theResourceReindexer) { 312 myResourceReindexer = theResourceReindexer; 313 } 314 315 private int runReindexJob(ResourceReindexJobEntity theJob) { 316 if (theJob.getSuspendedUntil() != null) { 317 if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) { 318 return 0; 319 } 320 } 321 322 ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId()); 323 StopWatch sw = new StopWatch(); 324 AtomicInteger counter = new AtomicInteger(); 325 326 /* 327 * On the first time we run a particular reindex job, let's make sure we 328 * have the latest search parameters loaded. A common reason to 329 * be reindexing is that the search parameters have changed in some way, so 330 * this makes sure we're on the latest versions 331 */ 332 if (theJob.getThresholdLow() == null) { 333 mySearchParamRegistry.forceRefresh(); 334 } 335 336 // Calculate range 337 Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME; 338 Date high = theJob.getThresholdHigh(); 339 340 // Query for resources within threshold 341 StopWatch pageSw = new StopWatch(); 342 Slice<Long> range = myTxTemplate.execute(t -> { 343 PageRequest page = PageRequest.of(0, PASS_SIZE); 344 if (isNotBlank(theJob.getResourceType())) { 345 return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest( 346 page, theJob.getResourceType(), low, high); 347 } else { 348 return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high); 349 } 350 }); 351 Validate.notNull(range); 352 int count = range.getNumberOfElements(); 353 ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw); 354 355 // If we didn't find any results at all, mark as deleted 356 if (count == 0) { 357 markJobAsDeleted(theJob); 358 return 0; 359 } 360 361 // Submit each resource requiring reindexing 362 List<Future<Date>> futures = range.stream() 363 .map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter))) 364 .collect(Collectors.toList()); 365 366 Date latestDate = null; 367 for (Future<Date> next : futures) { 368 Date nextDate; 369 try { 370 nextDate = next.get(); 371 } catch (Exception e) { 372 ourLog.error("Failure reindexing", e); 373 Date suspendedUntil = DateUtils.addMinutes(new Date(), 1); 374 myTxTemplate.execute(t -> { 375 myReindexJobDao.setSuspendedUntil(suspendedUntil); 376 return null; 377 }); 378 return counter.get(); 379 } 380 381 if (nextDate != null) { 382 if (latestDate == null || latestDate.getTime() < nextDate.getTime()) { 383 latestDate = new Date(nextDate.getTime()); 384 } 385 } 386 } 387 388 Validate.notNull(latestDate); 389 Date newLow; 390 if (latestDate.getTime() == low.getTime()) { 391 if (count == PASS_SIZE) { 392 // Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really 393 // happen unless there were 10000 resources with the exact same update time down to the 394 // millisecond. 395 ourLog.error( 396 "Final pass time for reindex JOB[{}] has same ending low value: {}", 397 theJob.getId(), 398 latestDate); 399 } 400 401 newLow = new Date(latestDate.getTime() + 1); 402 } else { 403 newLow = latestDate; 404 } 405 406 myTxTemplate.execute(t -> { 407 myReindexJobDao.setThresholdLow(theJob.getId(), newLow); 408 Integer existingCount = 409 myReindexJobDao.getReindexCount(theJob.getId()).orElse(0); 410 int newCount = existingCount + counter.get(); 411 myReindexJobDao.setReindexCount(theJob.getId(), newCount); 412 return null; 413 }); 414 415 ourLog.info( 416 "Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", 417 theJob.getId(), 418 count, 419 sw, 420 sw.formatThroughput(count, TimeUnit.SECONDS), 421 new InstantType(newLow)); 422 return counter.get(); 423 } 424 425 private void expungeJobsMarkedAsDeleted() { 426 myTxTemplate.execute(t -> { 427 Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true); 428 toDelete.forEach(job -> { 429 ourLog.info("Purging deleted job[{}]", job.getId()); 430 myReindexJobDao.deleteById(job.getId()); 431 }); 432 return null; 433 }); 434 } 435 436 private void markResourceAsIndexingFailed(final long theId) { 437 TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); 438 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 439 txTemplate.execute((TransactionCallback<Void>) theStatus -> { 440 ourLog.info("Marking resource with PID {} as indexing_failed", theId); 441 442 myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED); 443 444 Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id"); 445 q.setParameter("id", theId); 446 q.executeUpdate(); 447 448 q = myEntityManager.createQuery( 449 "DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id"); 450 q.setParameter("id", theId); 451 q.executeUpdate(); 452 453 q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id"); 454 q.setParameter("id", theId); 455 q.executeUpdate(); 456 457 q = myEntityManager.createQuery( 458 "DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id"); 459 q.setParameter("id", theId); 460 q.executeUpdate(); 461 462 q = myEntityManager.createQuery( 463 "DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id"); 464 q.setParameter("id", theId); 465 q.executeUpdate(); 466 467 q = myEntityManager.createQuery( 468 "DELETE FROM ResourceIndexedSearchParamQuantityNormalized t WHERE t.myResourcePid = :id"); 469 q.setParameter("id", theId); 470 q.executeUpdate(); 471 472 q = myEntityManager.createQuery( 473 "DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id"); 474 q.setParameter("id", theId); 475 q.executeUpdate(); 476 477 q = myEntityManager.createQuery( 478 "DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id"); 479 q.setParameter("id", theId); 480 q.executeUpdate(); 481 482 q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id"); 483 q.setParameter("id", theId); 484 q.executeUpdate(); 485 486 q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id"); 487 q.setParameter("id", theId); 488 q.executeUpdate(); 489 490 q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id"); 491 q.setParameter("id", theId); 492 q.executeUpdate(); 493 494 return null; 495 }); 496 } 497 498 private class ResourceReindexingTask implements Callable<Date> { 499 private final Long myNextId; 500 private final AtomicInteger myCounter; 501 private Date myUpdated; 502 503 ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) { 504 myNextId = theNextId; 505 myCounter = theCounter; 506 } 507 508 @Override 509 public Date call() { 510 Throwable reindexFailure; 511 512 try { 513 reindexFailure = readResourceAndReindex(); 514 } catch (ResourceVersionConflictException e) { 515 /* 516 * We reindex in multiple threads, so it's technically possible that two threads try 517 * to index resources that cause a constraint error now (i.e. because a unique index has been 518 * added that didn't previously exist). In this case, one of the threads would succeed and 519 * not get this error, so we'll let the other one fail and try 520 * again later. 521 */ 522 ourLog.info( 523 "Failed to reindex because of a version conflict. Leaving in unindexed state: {}", 524 e.getMessage()); 525 reindexFailure = null; 526 } 527 528 if (reindexFailure != null) { 529 ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId); 530 markResourceAsIndexingFailed(myNextId); 531 } 532 533 return myUpdated; 534 } 535 536 @Nullable 537 private Throwable readResourceAndReindex() { 538 Throwable reindexFailure; 539 reindexFailure = myTxTemplate.execute(t -> { 540 ResourceTable resourceTable = 541 myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new); 542 myUpdated = resourceTable.getUpdatedDate(); 543 544 try { 545 myResourceReindexer.reindexResourceEntity(resourceTable); 546 myCounter.incrementAndGet(); 547 return null; 548 549 } catch (Exception e) { 550 ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e, e); 551 t.setRollbackOnly(); 552 return e; 553 } 554 }); 555 return reindexFailure; 556 } 557 } 558}