001package ca.uhn.fhir.jpa.mdm.broker; 002 003import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; 004import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; 005import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; 006import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 007import ca.uhn.fhir.mdm.api.IMdmSettings; 008import ca.uhn.fhir.mdm.log.Logs; 009import com.google.common.annotations.VisibleForTesting; 010import org.slf4j.Logger; 011import org.springframework.stereotype.Service; 012 013import javax.annotation.PreDestroy; 014 015/*- 016 * #%L 017 * HAPI FHIR JPA Server - Master Data Management 018 * %% 019 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 020 * %% 021 * Licensed under the Apache License, Version 2.0 (the "License"); 022 * you may not use this file except in compliance with the License. 023 * You may obtain a copy of the License at 024 * 025 * http://www.apache.org/licenses/LICENSE-2.0 026 * 027 * Unless required by applicable law or agreed to in writing, software 028 * distributed under the License is distributed on an "AS IS" BASIS, 029 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 030 * See the License for the specific language governing permissions and 031 * limitations under the License. 032 * #L% 033 */ 034 035@Service 036public class MdmQueueConsumerLoader { 037 private static final Logger ourLog = Logs.getMdmTroubleshootingLog(); 038 039 private final IChannelFactory myChannelFactory; 040 private final IMdmSettings myMdmSettings; 041 private final MdmMessageHandler myMdmMessageHandler; 042 043 protected IChannelReceiver myMdmChannel; 044 045 public MdmQueueConsumerLoader(IChannelFactory theChannelFactory, IMdmSettings theMdmSettings, MdmMessageHandler theMdmMessageHandler) { 046 myChannelFactory = theChannelFactory; 047 myMdmSettings = theMdmSettings; 048 myMdmMessageHandler = theMdmMessageHandler; 049 050 startListeningToMdmChannel(); 051 } 052 053 054 private void startListeningToMdmChannel() { 055 if (myMdmChannel == null) { 056 ChannelConsumerSettings config = new ChannelConsumerSettings(); 057 058 config.setConcurrentConsumers(myMdmSettings.getConcurrentConsumers()); 059 060 myMdmChannel = myChannelFactory.getOrCreateReceiver(IMdmSettings.EMPI_CHANNEL_NAME, ResourceModifiedJsonMessage.class, config); 061 if (myMdmChannel == null) { 062 ourLog.error("Unable to create receiver for {}", IMdmSettings.EMPI_CHANNEL_NAME); 063 } else { 064 myMdmChannel.subscribe(myMdmMessageHandler); 065 ourLog.info("MDM Matching Consumer subscribed to Matching Channel {} with name {}", myMdmChannel.getClass().getName(), myMdmChannel.getName()); 066 } 067 } 068 } 069 070 @SuppressWarnings("unused") 071 @PreDestroy 072 public void stop() throws Exception { 073 if (myMdmChannel != null) { 074 // JMS channel needs to be destroyed to avoid dangling receivers 075 myMdmChannel.destroy(); 076 ourLog.info("MDM Matching Consumer unsubscribed from Matching Channel {} with name {}", myMdmChannel.getClass().getName(), myMdmChannel.getName()); 077 } 078 } 079 080 @VisibleForTesting 081 public IChannelReceiver getMdmChannelForUnitTest() { 082 return myMdmChannel; 083 } 084}