001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.sql; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor; 025import ca.uhn.fhir.jpa.util.ScrollableResultsIterator; 026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 027import ca.uhn.fhir.util.IoUtil; 028import org.apache.commons.lang3.Validate; 029import org.hibernate.CacheMode; 030import org.hibernate.ScrollMode; 031import org.hibernate.ScrollableResults; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import java.sql.Connection; 036import java.sql.PreparedStatement; 037import java.util.Arrays; 038import javax.persistence.EntityManager; 039import javax.persistence.FlushModeType; 040import javax.persistence.PersistenceContext; 041import javax.persistence.PersistenceContextType; 042import javax.persistence.Query; 043 044public class SearchQueryExecutor implements ISearchQueryExecutor { 045 046 private static final Long NO_MORE = -1L; 047 private static final SearchQueryExecutor NO_VALUE_EXECUTOR = new SearchQueryExecutor(); 048 private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; 049 private static final Logger ourLog = LoggerFactory.getLogger(SearchQueryExecutor.class); 050 private final GeneratedSql myGeneratedSql; 051 private final Integer myMaxResultsToFetch; 052 053 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 054 private EntityManager myEntityManager; 055 056 private boolean myQueryInitialized; 057 private Connection myConnection; 058 private PreparedStatement myStatement; 059 private ScrollableResultsIterator<Number> myResultSet; 060 private Long myNext; 061 062 /** 063 * Constructor 064 */ 065 public SearchQueryExecutor(GeneratedSql theGeneratedSql, Integer theMaxResultsToFetch) { 066 Validate.notNull(theGeneratedSql, "theGeneratedSql must not be null"); 067 myGeneratedSql = theGeneratedSql; 068 myQueryInitialized = false; 069 myMaxResultsToFetch = theMaxResultsToFetch; 070 } 071 072 /** 073 * Internal constructor for empty executor 074 */ 075 private SearchQueryExecutor() { 076 assert NO_MORE != null; 077 078 myGeneratedSql = null; 079 myMaxResultsToFetch = null; 080 myNext = NO_MORE; 081 } 082 083 @Override 084 public void close() { 085 IoUtil.closeQuietly(myResultSet); 086 } 087 088 @Override 089 public boolean hasNext() { 090 fetchNext(); 091 return !NO_MORE.equals(myNext); 092 } 093 094 @Override 095 public Long next() { 096 fetchNext(); 097 Validate.isTrue(hasNext(), "Can not call next() right now, no data remains"); 098 Long next = myNext; 099 myNext = null; 100 return next; 101 } 102 103 private void fetchNext() { 104 if (myNext == null) { 105 String sql = myGeneratedSql.getSql(); 106 Object[] args = myGeneratedSql.getBindVariables().toArray(EMPTY_OBJECT_ARRAY); 107 108 try { 109 if (!myQueryInitialized) { 110 111 /* 112 * Note that we use the spring managed connection, and the expectation is that a transaction that 113 * is managed by Spring has been started before this method is called. 114 */ 115 HapiTransactionService.requireTransaction(); 116 117 Query nativeQuery = myEntityManager.createNativeQuery(sql); 118 org.hibernate.query.Query<?> hibernateQuery = (org.hibernate.query.Query<?>) nativeQuery; 119 for (int i = 1; i <= args.length; i++) { 120 hibernateQuery.setParameter(i, args[i - 1]); 121 } 122 123 ourLog.trace("About to execute SQL: {}. Parameters: {}", sql, Arrays.toString(args)); 124 125 /* 126 * These settings help to ensure that we use a search cursor 127 * as opposed to loading all search results into memory 128 */ 129 hibernateQuery.setFetchSize(500000); 130 hibernateQuery.setCacheable(false); 131 hibernateQuery.setCacheMode(CacheMode.IGNORE); 132 hibernateQuery.setReadOnly(true); 133 134 // This tells hibernate not to flush when we call scroll(), but rather to wait until the transaction 135 // commits and 136 // only flush then. We need to do this so that any exceptions that happen in the transaction happen 137 // when 138 // we try to commit the transaction, and not here. 139 // See the test called testTransaction_multiThreaded (in FhirResourceDaoR4ConcurrentWriteTest) which 140 // triggers 141 // the following exception if we don't set this flush mode: 142 // java.util.concurrent.ExecutionException: 143 // org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back 144 // because it has been marked as rollback-only 145 hibernateQuery.setFlushMode(FlushModeType.COMMIT); 146 ScrollableResults scrollableResults = hibernateQuery.scroll(ScrollMode.FORWARD_ONLY); 147 myResultSet = new ScrollableResultsIterator<>(scrollableResults); 148 myQueryInitialized = true; 149 } 150 151 if (myResultSet == null || !myResultSet.hasNext()) { 152 myNext = NO_MORE; 153 } else { 154 Number next = myResultSet.next(); 155 myNext = next.longValue(); 156 } 157 158 } catch (Exception e) { 159 ourLog.error("Failed to create or execute SQL query", e); 160 close(); 161 throw new InternalErrorException(Msg.code(1262) + e, e); 162 } 163 } 164 } 165 166 public static SearchQueryExecutor emptyExecutor() { 167 return NO_VALUE_EXECUTOR; 168 } 169}