001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.reindex;
021
022import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
028import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
029import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
030import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
031import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
032import ca.uhn.fhir.jpa.model.entity.ResourceTable;
033import ca.uhn.fhir.jpa.model.sched.HapiJob;
034import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
035import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
036import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
037import ca.uhn.fhir.parser.DataFormatException;
038import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
039import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
040import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
041import ca.uhn.fhir.util.StopWatch;
042import com.google.common.annotations.VisibleForTesting;
043import org.apache.commons.lang3.Validate;
044import org.apache.commons.lang3.concurrent.BasicThreadFactory;
045import org.apache.commons.lang3.time.DateUtils;
046import org.hl7.fhir.r4.model.InstantType;
047import org.quartz.JobExecutionContext;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050import org.springframework.beans.factory.annotation.Autowired;
051import org.springframework.data.domain.PageRequest;
052import org.springframework.data.domain.Slice;
053import org.springframework.transaction.PlatformTransactionManager;
054import org.springframework.transaction.TransactionDefinition;
055import org.springframework.transaction.annotation.Propagation;
056import org.springframework.transaction.annotation.Transactional;
057import org.springframework.transaction.support.TransactionCallback;
058import org.springframework.transaction.support.TransactionTemplate;
059
060import java.util.Collection;
061import java.util.Date;
062import java.util.List;
063import java.util.concurrent.Callable;
064import java.util.concurrent.Future;
065import java.util.concurrent.LinkedBlockingQueue;
066import java.util.concurrent.RejectedExecutionHandler;
067import java.util.concurrent.ThreadFactory;
068import java.util.concurrent.ThreadPoolExecutor;
069import java.util.concurrent.TimeUnit;
070import java.util.concurrent.atomic.AtomicInteger;
071import java.util.concurrent.locks.ReentrantLock;
072import java.util.stream.Collectors;
073import javax.annotation.Nullable;
074import javax.annotation.PostConstruct;
075import javax.persistence.EntityManager;
076import javax.persistence.PersistenceContext;
077import javax.persistence.PersistenceContextType;
078import javax.persistence.Query;
079
080import static org.apache.commons.lang3.StringUtils.isNotBlank;
081
082/**
083 * @see ca.uhn.fhir.jpa.reindex.job.ReindexJobConfig
084 * @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead.
085 */
086@Deprecated
087public class ResourceReindexingSvcImpl implements IResourceReindexingSvc, IHasScheduledJobs {
088
089        private static final Date BEGINNING_OF_TIME = new Date(0);
090        private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
091        private static final int PASS_SIZE = 25000;
092        private final ReentrantLock myIndexingLock = new ReentrantLock();
093
094        @Autowired
095        private IResourceReindexJobDao myReindexJobDao;
096
097        @Autowired
098        private JpaStorageSettings myStorageSettings;
099
100        @Autowired
101        private PlatformTransactionManager myTxManager;
102
103        private TransactionTemplate myTxTemplate;
104        private final ThreadFactory myReindexingThreadFactory =
105                        new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
106        private ThreadPoolExecutor myTaskExecutor;
107
108        @Autowired
109        private IResourceTableDao myResourceTableDao;
110
111        @Autowired
112        private DaoRegistry myDaoRegistry;
113
114        @Autowired
115        private IForcedIdDao myForcedIdDao;
116
117        @Autowired
118        private FhirContext myContext;
119
120        @PersistenceContext(type = PersistenceContextType.TRANSACTION)
121        private EntityManager myEntityManager;
122
123        @Autowired
124        private ISearchParamRegistry mySearchParamRegistry;
125
126        @Autowired
127        private ResourceReindexer myResourceReindexer;
128
129        @VisibleForTesting
130        void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) {
131                myStorageSettings = theStorageSettings;
132        }
133
134        @VisibleForTesting
135        void setContextForUnitTest(FhirContext theContext) {
136                myContext = theContext;
137        }
138
139        @PostConstruct
140        public void start() {
141                myTxTemplate = new TransactionTemplate(myTxManager);
142                initExecutor();
143        }
144
145        public void initExecutor() {
146                // Create the threadpool executor used for reindex jobs
147                int reindexThreadCount = myStorageSettings.getReindexThreadCount();
148                RejectedExecutionHandler rejectHandler = new BlockPolicy();
149                myTaskExecutor = new ThreadPoolExecutor(
150                                0,
151                                reindexThreadCount,
152                                0L,
153                                TimeUnit.MILLISECONDS,
154                                new LinkedBlockingQueue<>(100),
155                                myReindexingThreadFactory,
156                                rejectHandler);
157        }
158
159        @Override
160        public void scheduleJobs(ISchedulerService theSchedulerService) {
161                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
162                jobDetail.setId(getClass().getName());
163                jobDetail.setJobClass(Job.class);
164                theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
165        }
166
167        @Override
168        @Transactional(propagation = Propagation.REQUIRED)
169        public Long markAllResourcesForReindexing() {
170                return markAllResourcesForReindexing(null);
171        }
172
173        @Override
174        @Transactional(propagation = Propagation.REQUIRED)
175        public Long markAllResourcesForReindexing(String theType) {
176
177                String typeDesc;
178                if (isNotBlank(theType)) {
179                        try {
180                                myContext.getResourceType(theType);
181                        } catch (DataFormatException e) {
182                                throw new InvalidRequestException(Msg.code(1170) + "Unknown resource type: " + theType);
183                        }
184                        myReindexJobDao.markAllOfTypeAsDeleted(theType);
185                        typeDesc = theType;
186                } else {
187                        myReindexJobDao.markAllOfTypeAsDeleted();
188                        typeDesc = "(any)";
189                }
190
191                ResourceReindexJobEntity job = new ResourceReindexJobEntity();
192                job.setResourceType(theType);
193                job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5));
194                job = myReindexJobDao.saveAndFlush(job);
195
196                ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId());
197                return job.getId();
198        }
199
200        public static class Job implements HapiJob {
201                @Autowired
202                private IResourceReindexingSvc myTarget;
203
204                @Override
205                public void execute(JobExecutionContext theContext) {
206                        myTarget.runReindexingPass();
207                }
208        }
209
210        @VisibleForTesting
211        ReentrantLock getIndexingLockForUnitTest() {
212                return myIndexingLock;
213        }
214
215        @Override
216        @Transactional(propagation = Propagation.NEVER)
217        public Integer runReindexingPass() {
218                if (myStorageSettings.isSchedulingDisabled() || !myStorageSettings.isEnableTaskPreExpandValueSets()) {
219                        return null;
220                }
221                if (myIndexingLock.tryLock()) {
222                        try {
223                                return doReindexingPassInsideLock();
224                        } finally {
225                                myIndexingLock.unlock();
226                        }
227                }
228                return null;
229        }
230
231        private int doReindexingPassInsideLock() {
232                expungeJobsMarkedAsDeleted();
233                return runReindexJobs();
234        }
235
236        @Override
237        public int forceReindexingPass() {
238                myIndexingLock.lock();
239                try {
240                        return doReindexingPassInsideLock();
241                } finally {
242                        myIndexingLock.unlock();
243                }
244        }
245
246        @Override
247        public void cancelAndPurgeAllJobs() {
248                ourLog.info("Cancelling and purging all resource reindexing jobs");
249                myIndexingLock.lock();
250                try {
251                        myTxTemplate.execute(t -> {
252                                myReindexJobDao.markAllOfTypeAsDeleted();
253                                return null;
254                        });
255
256                        myTaskExecutor.shutdown();
257                        initExecutor();
258
259                        expungeJobsMarkedAsDeleted();
260                } finally {
261                        myIndexingLock.unlock();
262                }
263        }
264
265        private int runReindexJobs() {
266                Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities();
267
268                if (jobs.size() > 0) {
269                        ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs);
270                } else {
271                        ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs);
272                        return 0;
273                }
274
275                int count = 0;
276                for (ResourceReindexJobEntity next : jobs) {
277
278                        if (next.getThresholdLow() != null
279                                        && next.getThresholdLow().getTime()
280                                                        >= next.getThresholdHigh().getTime()) {
281                                markJobAsDeleted(next);
282                                continue;
283                        }
284
285                        count += runReindexJob(next);
286                }
287                return count;
288        }
289
290        @Override
291        public int countReindexJobs() {
292                return getResourceReindexJobEntities().size();
293        }
294
295        private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() {
296                Collection<ResourceReindexJobEntity> jobs =
297                                myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
298                assert jobs != null;
299                return jobs;
300        }
301
302        private void markJobAsDeleted(ResourceReindexJobEntity theJob) {
303                ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId());
304                myTxTemplate.execute(t -> {
305                        myReindexJobDao.markAsDeletedById(theJob.getId());
306                        return null;
307                });
308        }
309
310        @VisibleForTesting
311        public void setResourceReindexerForUnitTest(ResourceReindexer theResourceReindexer) {
312                myResourceReindexer = theResourceReindexer;
313        }
314
315        private int runReindexJob(ResourceReindexJobEntity theJob) {
316                if (theJob.getSuspendedUntil() != null) {
317                        if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) {
318                                return 0;
319                        }
320                }
321
322                ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId());
323                StopWatch sw = new StopWatch();
324                AtomicInteger counter = new AtomicInteger();
325
326                /*
327                 * On the first time we run a particular reindex job, let's make sure we
328                 * have the latest search parameters loaded. A common reason to
329                 * be reindexing is that the search parameters have changed in some way, so
330                 * this makes sure we're on the latest versions
331                 */
332                if (theJob.getThresholdLow() == null) {
333                        mySearchParamRegistry.forceRefresh();
334                }
335
336                // Calculate range
337                Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME;
338                Date high = theJob.getThresholdHigh();
339
340                // Query for resources within threshold
341                StopWatch pageSw = new StopWatch();
342                Slice<Long> range = myTxTemplate.execute(t -> {
343                        PageRequest page = PageRequest.of(0, PASS_SIZE);
344                        if (isNotBlank(theJob.getResourceType())) {
345                                return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(
346                                                page, theJob.getResourceType(), low, high);
347                        } else {
348                                return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high);
349                        }
350                });
351                Validate.notNull(range);
352                int count = range.getNumberOfElements();
353                ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw);
354
355                // If we didn't find any results at all, mark as deleted
356                if (count == 0) {
357                        markJobAsDeleted(theJob);
358                        return 0;
359                }
360
361                // Submit each resource requiring reindexing
362                List<Future<Date>> futures = range.stream()
363                                .map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter)))
364                                .collect(Collectors.toList());
365
366                Date latestDate = null;
367                for (Future<Date> next : futures) {
368                        Date nextDate;
369                        try {
370                                nextDate = next.get();
371                        } catch (Exception e) {
372                                ourLog.error("Failure reindexing", e);
373                                Date suspendedUntil = DateUtils.addMinutes(new Date(), 1);
374                                myTxTemplate.execute(t -> {
375                                        myReindexJobDao.setSuspendedUntil(suspendedUntil);
376                                        return null;
377                                });
378                                return counter.get();
379                        }
380
381                        if (nextDate != null) {
382                                if (latestDate == null || latestDate.getTime() < nextDate.getTime()) {
383                                        latestDate = new Date(nextDate.getTime());
384                                }
385                        }
386                }
387
388                Validate.notNull(latestDate);
389                Date newLow;
390                if (latestDate.getTime() == low.getTime()) {
391                        if (count == PASS_SIZE) {
392                                // Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
393                                // happen unless there were 10000 resources with the exact same update time down to the
394                                // millisecond.
395                                ourLog.error(
396                                                "Final pass time for reindex JOB[{}] has same ending low value: {}",
397                                                theJob.getId(),
398                                                latestDate);
399                        }
400
401                        newLow = new Date(latestDate.getTime() + 1);
402                } else {
403                        newLow = latestDate;
404                }
405
406                myTxTemplate.execute(t -> {
407                        myReindexJobDao.setThresholdLow(theJob.getId(), newLow);
408                        Integer existingCount =
409                                        myReindexJobDao.getReindexCount(theJob.getId()).orElse(0);
410                        int newCount = existingCount + counter.get();
411                        myReindexJobDao.setReindexCount(theJob.getId(), newCount);
412                        return null;
413                });
414
415                ourLog.info(
416                                "Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}",
417                                theJob.getId(),
418                                count,
419                                sw,
420                                sw.formatThroughput(count, TimeUnit.SECONDS),
421                                new InstantType(newLow));
422                return counter.get();
423        }
424
425        private void expungeJobsMarkedAsDeleted() {
426                myTxTemplate.execute(t -> {
427                        Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true);
428                        toDelete.forEach(job -> {
429                                ourLog.info("Purging deleted job[{}]", job.getId());
430                                myReindexJobDao.deleteById(job.getId());
431                        });
432                        return null;
433                });
434        }
435
436        private void markResourceAsIndexingFailed(final long theId) {
437                TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
438                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
439                txTemplate.execute((TransactionCallback<Void>) theStatus -> {
440                        ourLog.info("Marking resource with PID {} as indexing_failed", theId);
441
442                        myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED);
443
444                        Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
445                        q.setParameter("id", theId);
446                        q.executeUpdate();
447
448                        q = myEntityManager.createQuery(
449                                        "DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
450                        q.setParameter("id", theId);
451                        q.executeUpdate();
452
453                        q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
454                        q.setParameter("id", theId);
455                        q.executeUpdate();
456
457                        q = myEntityManager.createQuery(
458                                        "DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
459                        q.setParameter("id", theId);
460                        q.executeUpdate();
461
462                        q = myEntityManager.createQuery(
463                                        "DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
464                        q.setParameter("id", theId);
465                        q.executeUpdate();
466
467                        q = myEntityManager.createQuery(
468                                        "DELETE FROM ResourceIndexedSearchParamQuantityNormalized t WHERE t.myResourcePid = :id");
469                        q.setParameter("id", theId);
470                        q.executeUpdate();
471
472                        q = myEntityManager.createQuery(
473                                        "DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
474                        q.setParameter("id", theId);
475                        q.executeUpdate();
476
477                        q = myEntityManager.createQuery(
478                                        "DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
479                        q.setParameter("id", theId);
480                        q.executeUpdate();
481
482                        q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
483                        q.setParameter("id", theId);
484                        q.executeUpdate();
485
486                        q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
487                        q.setParameter("id", theId);
488                        q.executeUpdate();
489
490                        q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
491                        q.setParameter("id", theId);
492                        q.executeUpdate();
493
494                        return null;
495                });
496        }
497
498        private class ResourceReindexingTask implements Callable<Date> {
499                private final Long myNextId;
500                private final AtomicInteger myCounter;
501                private Date myUpdated;
502
503                ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) {
504                        myNextId = theNextId;
505                        myCounter = theCounter;
506                }
507
508                @Override
509                public Date call() {
510                        Throwable reindexFailure;
511
512                        try {
513                                reindexFailure = readResourceAndReindex();
514                        } catch (ResourceVersionConflictException e) {
515                                /*
516                                 * We reindex in multiple threads, so it's technically possible that two threads try
517                                 * to index resources that cause a constraint error now (i.e. because a unique index has been
518                                 * added that didn't previously exist). In this case, one of the threads would succeed and
519                                 * not get this error, so we'll let the other one fail and try
520                                 * again later.
521                                 */
522                                ourLog.info(
523                                                "Failed to reindex because of a version conflict. Leaving in unindexed state: {}",
524                                                e.getMessage());
525                                reindexFailure = null;
526                        }
527
528                        if (reindexFailure != null) {
529                                ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
530                                markResourceAsIndexingFailed(myNextId);
531                        }
532
533                        return myUpdated;
534                }
535
536                @Nullable
537                private Throwable readResourceAndReindex() {
538                        Throwable reindexFailure;
539                        reindexFailure = myTxTemplate.execute(t -> {
540                                ResourceTable resourceTable =
541                                                myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
542                                myUpdated = resourceTable.getUpdatedDate();
543
544                                try {
545                                        myResourceReindexer.reindexResourceEntity(resourceTable);
546                                        myCounter.incrementAndGet();
547                                        return null;
548
549                                } catch (Exception e) {
550                                        ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e, e);
551                                        t.setRollbackOnly();
552                                        return e;
553                                }
554                        });
555                        return reindexFailure;
556                }
557        }
558}