package org.mule.extensions.jms.api.source;

import javax.inject.Inject;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.mule.extensions.jms.api.config.AckMode;
import org.mule.extensions.jms.api.config.JmsConfig;
import org.mule.extensions.jms.api.config.JmsConsumerConfig;
import org.mule.extensions.jms.api.connection.JmsConnection;
import org.mule.extensions.jms.api.connection.JmsSession;
import org.mule.extensions.jms.api.destination.ConsumerType;
import org.mule.extensions.jms.api.exception.JmsExtensionException;
import org.mule.extensions.jms.api.message.JmsAttributes;
import org.mule.extensions.jms.api.message.MessageBuilder;
import org.mule.extensions.jms.internal.common.JmsOperationCommons;
import org.mule.extensions.jms.internal.consume.JmsMessageConsumer;
import org.mule.extensions.jms.internal.message.JmsResultFactory;
import org.mule.extensions.jms.internal.metadata.JmsOutputResolver;
import org.mule.extensions.jms.internal.publish.JmsPublishParameters;
import org.mule.extensions.jms.internal.support.Jms102bSupport;
import org.mule.extensions.jms.internal.support.JmsSupport;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.util.StringMessageUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.dsl.xml.XmlHints;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.NullSafe;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.UseConfig;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MetadataScope(outputResolver = JmsOutputResolver.class)
@Alias("listener")
@EmitsResponse
/* loaded from: input_file:org/mule/extensions/jms/api/source/JmsListener.class */
public class JmsListener extends Source<Object, JmsAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class);
    private static final String MESSAGE_TO_ACK = "MESSAGE_TO_ACK";
    private static final String REPLY_TO_DESTINATION = "REPLY_TO_DESTINATION";
    private final JmsResultFactory resultFactory = new JmsResultFactory();

    @Inject
    private MuleContext muleContext;

    @Inject
    private SchedulerService schedulerService;

    @UseConfig
    private JmsConfig config;
    private JmsConsumerConfig consumerConfig;

    @Connection
    private JmsConnection connection;
    private JmsSession session;
    private JmsSupport jmsSupport;

    @XmlHints(allowReferences = false)
    @Parameter
    @Summary("The name of the Destination from where the Message should be consumed")
    private String destination;

    @Optional
    @Parameter
    @Summary("The Type of the Consumer that should be used for the provided destination")
    private ConsumerType consumerType;

    @Optional
    @Parameter
    @Summary("The Session ACK mode to use when consuming a message")
    private AckMode ackMode;

    @Optional
    @Parameter
    @Summary("JMS selector to be used for filtering incoming messages")
    private String selector;

    @Optional
    @Parameter
    @Summary("The content type of the message body")
    private String contentType;

    @Optional
    @Parameter
    @Summary("The encoding of the message body")
    private String encoding;

    public void onStart(SourceCallback<Object, JmsAttributes> sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting subscriber source");
        }
        this.consumerConfig = this.config.getConsumerConfig();
        this.ackMode = (AckMode) JmsOperationCommons.resolveOverride(this.consumerConfig.getAckMode(), this.ackMode);
        this.selector = (String) JmsOperationCommons.resolveOverride(this.consumerConfig.getSelector(), this.selector);
        this.encoding = (String) JmsOperationCommons.resolveOverride(this.config.getEncoding(), this.encoding);
        try {
            this.session = this.connection.createSession(this.ackMode, this.consumerType.isTopic());
            this.jmsSupport = this.connection.getJmsSupport();
            JmsMessageConsumer createConsumer = this.connection.createConsumer(this.session.get(), this.jmsSupport.createDestination(this.session.get(), this.destination, this.consumerType.isTopic()), this.selector, this.consumerType);
            if (LOGGER.isDebugEnabled()) {
                Logger logger = LOGGER;
                Object[] objArr = new Object[2];
                objArr[0] = this.destination;
                objArr[1] = this.consumerType.isTopic() ? "TOPIC" : "QUEUE";
                logger.debug(String.format("Starting Message listener on destination [%s] of type [%s]", objArr));
            }
            createConsumer.listen(message -> {
                SourceCallbackContext createContext = sourceCallback.createContext();
                if (message != null) {
                    evaluateAckAction(sourceCallback, this.session, message);
                    this.contentType = resolveContentType(message);
                    saveMessageForAck(message, createContext);
                    saveReplyToDestination(sourceCallback, message, createContext);
                }
                produceMessageResult(sourceCallback, this.jmsSupport, this.session, message, createContext);
            });
        } catch (Exception e) {
            LOGGER.error("An error occurred while consuming a message: ", (Throwable) e);
            sourceCallback.onSourceException(new JmsExtensionException(e, "An error occurred while consuming a message: "));
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Stopping JMSSubscriber source");
        }
    }

    @OnSuccess
    public void onSuccess(@NullSafe @Optional JmsListenerResponseBuilder jmsListenerResponseBuilder, SourceCallbackContext sourceCallbackContext) {
        ackOriginalMessage(sourceCallbackContext);
        Destination destination = (Destination) sourceCallbackContext.getVariable(REPLY_TO_DESTINATION);
        if (destination != null) {
            doReply(jmsListenerResponseBuilder.getMessageBuilder(), jmsListenerResponseBuilder.getOverrides(), sourceCallbackContext, destination);
        }
    }

    @OnError
    public void onError(Error error, SourceCallbackContext sourceCallbackContext) {
        LOGGER.error(error.getDescription(), error.getCause());
    }

    private void doReply(MessageBuilder messageBuilder, JmsPublishParameters jmsPublishParameters, SourceCallbackContext sourceCallbackContext, Destination destination) {
        try {
            boolean replyDestinationIsTopic = replyDestinationIsTopic(destination);
            String topicName = replyDestinationIsTopic ? ((Topic) destination).getTopicName() : ((Queue) destination).getQueueName();
            if (LOGGER.isDebugEnabled()) {
                Logger logger = LOGGER;
                Object[] objArr = new Object[2];
                objArr[0] = topicName;
                objArr[1] = replyDestinationIsTopic ? "TOPIC" : "QUEUE";
                logger.debug(String.format("Begin reply to destination [%s] of type [%s]", objArr));
            }
            Message build = messageBuilder.build(this.connection.getJmsSupport(), this.session.get(), this.config);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Message built, sending message to %s", topicName));
            }
            this.connection.createProducer(this.connection.createSession(AckMode.AUTO, replyDestinationIsTopic).get(), destination, replyDestinationIsTopic).publish(build, this.config.getProducerConfig(), jmsPublishParameters);
        } catch (Exception e) {
            LOGGER.error("An error occurred during reply: ", (Throwable) e);
            sourceCallbackContext.getSourceCallback().onSourceException(e);
        }
    }

    private void produceMessageResult(SourceCallback<Object, JmsAttributes> sourceCallback, JmsSupport jmsSupport, JmsSession jmsSession, Message message, SourceCallbackContext sourceCallbackContext) {
        try {
            sourceCallback.handle(this.resultFactory.createResult(message, jmsSupport.getSpecification(), this.contentType, this.encoding, jmsSession.getAckId()), sourceCallbackContext);
        } catch (Exception e) {
            LOGGER.error("An error occurred while creating the initial message", (Throwable) e);
            sourceCallback.onSourceException(e);
        }
    }

    private void evaluateAckAction(SourceCallback<Object, JmsAttributes> sourceCallback, JmsSession jmsSession, Message message) {
        try {
            JmsOperationCommons.evaluateMessageAck(this.connection, this.ackMode, jmsSession, message);
        } catch (JMSException e) {
            LOGGER.error("An error occurred while processing an incoming message: ", (Throwable) e);
            sourceCallback.onSourceException(e);
        }
    }

    private String resolveContentType(Message message) {
        return (String) JmsOperationCommons.resolveOverride(JmsOperationCommons.resolveMessageContentType(message, this.config.getContentType()), this.contentType);
    }

    private void saveReplyToDestination(SourceCallback<Object, JmsAttributes> sourceCallback, Message message, SourceCallbackContext sourceCallbackContext) {
        try {
            Destination jMSReplyTo = message.getJMSReplyTo();
            if (jMSReplyTo != null) {
                sourceCallbackContext.addVariable(REPLY_TO_DESTINATION, jMSReplyTo);
            }
        } catch (JMSException e) {
            LOGGER.error("An error occurred while obtaining the ReplyTo destination: ", (Throwable) e);
            sourceCallback.onSourceException(new JmsExtensionException(e, "An error occurred while obtaining the ReplyTo destination: "));
        }
    }

    private void saveMessageForAck(Message message, SourceCallbackContext sourceCallbackContext) {
        if (this.ackMode.equals(AckMode.AUTO)) {
            sourceCallbackContext.addVariable(MESSAGE_TO_ACK, message);
        }
    }

    private void ackOriginalMessage(SourceCallbackContext sourceCallbackContext) {
        Message message = (Message) sourceCallbackContext.getVariable(MESSAGE_TO_ACK);
        if (message != null) {
            try {
                message.acknowledge();
            } catch (JMSException e) {
                LOGGER.error("Failed to ACK the pending message: ", (Throwable) e);
                sourceCallbackContext.getSourceCallback().onSourceException(e);
            }
        }
    }

    private boolean replyDestinationIsTopic(Destination destination) {
        if ((destination instanceof Topic) && (destination instanceof Queue) && (this.jmsSupport instanceof Jms102bSupport)) {
            LOGGER.error(StringMessageUtils.getBoilerPlate("Destination implements both Queue and Topic while complying with JMS 1.0.2b specification. Please report your application server or JMS vendor name and version to http://www.mulesoft.org/jira"));
        }
        return destination instanceof Topic;
    }
}
