001/*-
002 * #%L
003 * HAPI FHIR JPA Server - Batch2 Task Processor
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.batch2.config;
021
022import ca.uhn.fhir.batch2.api.IJobCoordinator;
023import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
024import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
025import ca.uhn.fhir.batch2.api.IJobPersistence;
026import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
027import ca.uhn.fhir.batch2.channel.BatchJobSender;
028import ca.uhn.fhir.batch2.coordinator.DefaultJobPartitionProvider;
029import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
030import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
031import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl;
032import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
033import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
034import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
035import ca.uhn.fhir.context.FhirContext;
036import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
037import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
038import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
039import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
040import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
041import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
042import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
043import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
044import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
045import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
046import org.springframework.beans.factory.annotation.Autowired;
047import org.springframework.context.annotation.Bean;
048import org.springframework.context.annotation.Configuration;
049
050@Configuration
051public abstract class BaseBatch2Config {
052
053        public static final String CHANNEL_NAME = "batch2-work-notification";
054
055        @Autowired
056        IJobPersistence myPersistence;
057
058        @Autowired
059        IChannelFactory myChannelFactory;
060
061        @Autowired
062        IHapiTransactionService myHapiTransactionService;
063
064        @Bean
065        public JobDefinitionRegistry batch2JobDefinitionRegistry() {
066                return new JobDefinitionRegistry();
067        }
068
069        @Bean
070        public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) {
071                return new WorkChunkProcessor(myPersistence, theBatchJobSender, myHapiTransactionService);
072        }
073
074        @Bean
075        public BatchJobSender batchJobSender() {
076                return new BatchJobSender(batch2ProcessingChannelProducer(myChannelFactory));
077        }
078
079        @Bean
080        public IJobCoordinator batch2JobCoordinator(
081                        JobDefinitionRegistry theJobDefinitionRegistry,
082                        BatchJobSender theBatchJobSender,
083                        WorkChunkProcessor theExecutor,
084                        IJobMaintenanceService theJobMaintenanceService,
085                        IHapiTransactionService theTransactionService) {
086                return new JobCoordinatorImpl(
087                                theBatchJobSender,
088                                batch2ProcessingChannelReceiver(myChannelFactory),
089                                myPersistence,
090                                theJobDefinitionRegistry,
091                                theExecutor,
092                                theJobMaintenanceService,
093                                theTransactionService);
094        }
095
096        @Bean
097        public IReductionStepExecutorService reductionStepExecutorService(
098                        IJobPersistence theJobPersistence,
099                        IHapiTransactionService theTransactionService,
100                        JobDefinitionRegistry theJobDefinitionRegistry) {
101                return new ReductionStepExecutorServiceImpl(theJobPersistence, theTransactionService, theJobDefinitionRegistry);
102        }
103
104        @Bean
105        public IJobMaintenanceService batch2JobMaintenanceService(
106                        ISchedulerService theSchedulerService,
107                        JobDefinitionRegistry theJobDefinitionRegistry,
108                        JpaStorageSettings theStorageSettings,
109                        BatchJobSender theBatchJobSender,
110                        WorkChunkProcessor theExecutor,
111                        IReductionStepExecutorService theReductionStepExecutorService) {
112                return new JobMaintenanceServiceImpl(
113                                theSchedulerService,
114                                myPersistence,
115                                theStorageSettings,
116                                theJobDefinitionRegistry,
117                                theBatchJobSender,
118                                theExecutor,
119                                theReductionStepExecutorService);
120        }
121
122        @Bean
123        public IChannelProducer batch2ProcessingChannelProducer(IChannelFactory theChannelFactory) {
124                ChannelProducerSettings settings =
125                                new ChannelProducerSettings().setConcurrentConsumers(getConcurrentConsumers());
126                return theChannelFactory.getOrCreateProducer(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings);
127        }
128
129        @Bean
130        public IChannelReceiver batch2ProcessingChannelReceiver(IChannelFactory theChannelFactory) {
131                ChannelConsumerSettings settings =
132                                new ChannelConsumerSettings().setConcurrentConsumers(getConcurrentConsumers());
133                return theChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, settings);
134        }
135
136        @Bean
137        public Batch2JobRegisterer batch2JobRegisterer() {
138                return new Batch2JobRegisterer();
139        }
140
141        /**
142         * Can be overridden
143         */
144        protected int getConcurrentConsumers() {
145                return 4;
146        }
147
148        @Bean
149        public IJobPartitionProvider jobPartitionProvider(
150                        FhirContext theFhirContext,
151                        IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
152                        MatchUrlService theMatchUrlService) {
153                return new DefaultJobPartitionProvider(theFhirContext, theRequestPartitionHelperSvc, theMatchUrlService);
154        }
155}