001/*- 002 * #%L 003 * HAPI FHIR JPA Server - Batch2 Task Processor 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.batch2.config; 021 022import ca.uhn.fhir.batch2.api.IJobCoordinator; 023import ca.uhn.fhir.batch2.api.IJobMaintenanceService; 024import ca.uhn.fhir.batch2.api.IJobPartitionProvider; 025import ca.uhn.fhir.batch2.api.IJobPersistence; 026import ca.uhn.fhir.batch2.api.IReductionStepExecutorService; 027import ca.uhn.fhir.batch2.channel.BatchJobSender; 028import ca.uhn.fhir.batch2.coordinator.DefaultJobPartitionProvider; 029import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl; 030import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry; 031import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl; 032import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; 033import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl; 034import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage; 035import ca.uhn.fhir.context.FhirContext; 036import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 037import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 038import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 039import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 040import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 041import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; 042import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; 043import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; 044import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; 045import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; 046import org.springframework.beans.factory.annotation.Autowired; 047import org.springframework.context.annotation.Bean; 048import org.springframework.context.annotation.Configuration; 049 050@Configuration 051public abstract class BaseBatch2Config { 052 053 public static final String CHANNEL_NAME = "batch2-work-notification"; 054 055 @Autowired 056 IJobPersistence myPersistence; 057 058 @Autowired 059 IChannelFactory myChannelFactory; 060 061 @Autowired 062 IHapiTransactionService myHapiTransactionService; 063 064 @Bean 065 public JobDefinitionRegistry batch2JobDefinitionRegistry() { 066 return new JobDefinitionRegistry(); 067 } 068 069 @Bean 070 public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) { 071 return new WorkChunkProcessor(myPersistence, theBatchJobSender, myHapiTransactionService); 072 } 073 074 @Bean 075 public BatchJobSender batchJobSender() { 076 return new BatchJobSender(batch2ProcessingChannelProducer(myChannelFactory)); 077 } 078 079 @Bean 080 public IJobCoordinator batch2JobCoordinator( 081 JobDefinitionRegistry theJobDefinitionRegistry, 082 BatchJobSender theBatchJobSender, 083 WorkChunkProcessor theExecutor, 084 IJobMaintenanceService theJobMaintenanceService, 085 IHapiTransactionService theTransactionService) { 086 return new JobCoordinatorImpl( 087 theBatchJobSender, 088 batch2ProcessingChannelReceiver(myChannelFactory), 089 myPersistence, 090 theJobDefinitionRegistry, 091 theExecutor, 092 theJobMaintenanceService, 093 theTransactionService); 094 } 095 096 @Bean 097 public IReductionStepExecutorService reductionStepExecutorService( 098 IJobPersistence theJobPersistence, 099 IHapiTransactionService theTransactionService, 100 JobDefinitionRegistry theJobDefinitionRegistry) { 101 return new ReductionStepExecutorServiceImpl(theJobPersistence, theTransactionService, theJobDefinitionRegistry); 102 } 103 104 @Bean 105 public IJobMaintenanceService batch2JobMaintenanceService( 106 ISchedulerService theSchedulerService, 107 JobDefinitionRegistry theJobDefinitionRegistry, 108 JpaStorageSettings theStorageSettings, 109 BatchJobSender theBatchJobSender, 110 WorkChunkProcessor theExecutor, 111 IReductionStepExecutorService theReductionStepExecutorService) { 112 return new JobMaintenanceServiceImpl( 113 theSchedulerService, 114 myPersistence, 115 theStorageSettings, 116 theJobDefinitionRegistry, 117 theBatchJobSender, 118 theExecutor, 119 theReductionStepExecutorService); 120 } 121 122 @Bean 123 public IChannelProducer batch2ProcessingChannelProducer(IChannelFactory theChannelFactory) { 124 ChannelProducerSettings settings = 125 new ChannelProducerSettings().setConcurrentConsumers(getConcurrentConsumers()); 126 return theChannelFactory.getOrCreateProducer(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings); 127 } 128 129 @Bean 130 public IChannelReceiver batch2ProcessingChannelReceiver(IChannelFactory theChannelFactory) { 131 ChannelConsumerSettings settings = 132 new ChannelConsumerSettings().setConcurrentConsumers(getConcurrentConsumers()); 133 return theChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings); 134 } 135 136 @Bean 137 public Batch2JobRegisterer batch2JobRegisterer() { 138 return new Batch2JobRegisterer(); 139 } 140 141 /** 142 * Can be overridden 143 */ 144 protected int getConcurrentConsumers() { 145 return 4; 146 } 147 148 @Bean 149 public IJobPartitionProvider jobPartitionProvider( 150 FhirContext theFhirContext, 151 IRequestPartitionHelperSvc theRequestPartitionHelperSvc, 152 MatchUrlService theMatchUrlService) { 153 return new DefaultJobPartitionProvider(theFhirContext, theRequestPartitionHelperSvc, theMatchUrlService); 154 } 155}