/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extensions.jms.api.source;

import javax.inject.Inject;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Topic;
import org.mule.extensions.jms.api.config.AckMode;
import org.mule.extensions.jms.api.config.JmsConfig;
import org.mule.extensions.jms.api.config.JmsConsumerConfig;
import org.mule.extensions.jms.api.connection.JmsConnection;
import org.mule.extensions.jms.api.connection.JmsSession;
import org.mule.extensions.jms.api.destination.ConsumerType;
import org.mule.extensions.jms.api.exception.JmsExtensionException;
import org.mule.extensions.jms.api.message.JmsAttributes;
import org.mule.extensions.jms.api.message.MessageBuilder;
import org.mule.extensions.jms.api.source.JmsListenerResponseBuilder;
import org.mule.extensions.jms.internal.common.JmsOperationCommons;
import org.mule.extensions.jms.internal.consume.JmsMessageConsumer;
import org.mule.extensions.jms.internal.message.JmsResultFactory;
import org.mule.extensions.jms.internal.metadata.JmsOutputResolver;
import org.mule.extensions.jms.internal.publish.JmsPublishParameters;
import org.mule.extensions.jms.internal.support.Jms102bSupport;
import org.mule.extensions.jms.internal.support.JmsSupport;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.util.StringMessageUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.dsl.xml.XmlHints;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.NullSafe;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.UseConfig;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="listener")
@EmitsResponse
@MetadataScope(outputResolver=JmsOutputResolver.class)
public class JmsListener
extends Source<Object, JmsAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsListener.class);
    private static final String MESSAGE_TO_ACK = "MESSAGE_TO_ACK";
    private static final String REPLY_TO_DESTINATION = "REPLY_TO_DESTINATION";
    private final JmsResultFactory resultFactory = new JmsResultFactory();
    @Inject
    private MuleContext muleContext;
    @Inject
    private SchedulerService schedulerService;
    @UseConfig
    private JmsConfig config;
    private JmsConsumerConfig consumerConfig;
    @Connection
    private JmsConnection connection;
    private JmsSession session;
    private JmsSupport jmsSupport;
    @Parameter
    @XmlHints(allowReferences=false)
    @Summary(value="The name of the Destination from where the Message should be consumed")
    private String destination;
    @Parameter
    @Optional
    @Summary(value="The Type of the Consumer that should be used for the provided destination")
    private ConsumerType consumerType;
    @Parameter
    @Optional
    @Summary(value="The Session ACK mode to use when consuming a message")
    private AckMode ackMode;
    @Parameter
    @Optional
    @Summary(value="JMS selector to be used for filtering incoming messages")
    private String selector;
    @Parameter
    @Optional
    @Summary(value="The content type of the message body")
    private String contentType;
    @Parameter
    @Optional
    @Summary(value="The encoding of the message body")
    private String encoding;

    public void onStart(SourceCallback<Object, JmsAttributes> sourceCallback) throws MuleException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting subscriber source");
        }
        this.consumerConfig = this.config.getConsumerConfig();
        this.ackMode = JmsOperationCommons.resolveOverride(this.consumerConfig.getAckMode(), this.ackMode);
        this.selector = JmsOperationCommons.resolveOverride(this.consumerConfig.getSelector(), this.selector);
        this.encoding = JmsOperationCommons.resolveOverride(this.config.getEncoding(), this.encoding);
        try {
            this.session = this.connection.createSession(this.ackMode, this.consumerType.isTopic());
            this.jmsSupport = this.connection.getJmsSupport();
            Destination jmsDestination = this.jmsSupport.createDestination(this.session.get(), this.destination, this.consumerType.isTopic());
            JmsMessageConsumer consumer = this.connection.createConsumer(this.session.get(), jmsDestination, this.selector, this.consumerType);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Starting Message listener on destination [%s] of type [%s]", this.destination, this.consumerType.isTopic() ? "TOPIC" : "QUEUE"));
            }
            consumer.listen(message -> {
                SourceCallbackContext context = sourceCallback.createContext();
                if (message != null) {
                    this.evaluateAckAction(sourceCallback, this.session, message);
                    this.contentType = this.resolveContentType(message);
                    this.saveMessageForAck(message, context);
                    this.saveReplyToDestination(sourceCallback, message, context);
                }
                this.produceMessageResult(sourceCallback, this.jmsSupport, this.session, message, context);
            });
        }
        catch (Exception e) {
            LOGGER.error("An error occurred while consuming a message: ", (Throwable)e);
            sourceCallback.onSourceException((Throwable)((Object)new JmsExtensionException(e, "An error occurred while consuming a message: ")));
        }
    }

    public void onStop() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Stopping JMSSubscriber source");
        }
    }

    @OnSuccess
    public void onSuccess(@Optional @NullSafe JmsListenerResponseBuilder response, SourceCallbackContext callbackContext) {
        this.ackOriginalMessage(callbackContext);
        Destination replyTo = (Destination)callbackContext.getVariable(REPLY_TO_DESTINATION);
        if (replyTo != null) {
            this.doReply(response.getMessageBuilder(), response.getOverrides(), callbackContext, replyTo);
        }
    }

    @OnError
    public void onError(Error error, SourceCallbackContext context) {
        LOGGER.error(error.getDescription(), error.getCause());
    }

    private void doReply(MessageBuilder messageBuilder, JmsPublishParameters overrides, SourceCallbackContext callbackContext, Destination replyTo) {
        try {
            String destinationName;
            boolean replyToTopic = this.replyDestinationIsTopic(replyTo);
            String string = destinationName = replyToTopic ? ((Topic)replyTo).getTopicName() : ((Queue)replyTo).getQueueName();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Begin reply to destination [%s] of type [%s]", destinationName, replyToTopic ? "TOPIC" : "QUEUE"));
            }
            Message message = messageBuilder.build(this.connection.getJmsSupport(), this.session.get(), this.config);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Message built, sending message to %s", destinationName));
            }
            JmsSession replySession = this.connection.createSession(AckMode.AUTO, replyToTopic);
            this.connection.createProducer(replySession.get(), replyTo, replyToTopic).publish(message, this.config.getProducerConfig(), overrides);
        }
        catch (Exception e) {
            LOGGER.error("An error occurred during reply: ", (Throwable)e);
            callbackContext.getSourceCallback().onSourceException((Throwable)e);
        }
    }

    private void produceMessageResult(SourceCallback<Object, JmsAttributes> sourceCallback, JmsSupport jmsSupport, JmsSession session, Message message, SourceCallbackContext context) {
        try {
            Result<Object, JmsAttributes> result = this.resultFactory.createResult(message, jmsSupport.getSpecification(), this.contentType, this.encoding, session.getAckId());
            sourceCallback.handle(result, context);
        }
        catch (Exception e) {
            LOGGER.error("An error occurred while creating the initial message", (Throwable)e);
            sourceCallback.onSourceException((Throwable)e);
        }
    }

    private void evaluateAckAction(SourceCallback<Object, JmsAttributes> sourceCallback, JmsSession session, Message message) {
        try {
            JmsOperationCommons.evaluateMessageAck(this.connection, this.ackMode, session, message);
        }
        catch (JMSException e) {
            LOGGER.error("An error occurred while processing an incoming message: ", (Throwable)e);
            sourceCallback.onSourceException((Throwable)e);
        }
    }

    private String resolveContentType(Message message) {
        return JmsOperationCommons.resolveOverride(JmsOperationCommons.resolveMessageContentType(message, this.config.getContentType()), this.contentType);
    }

    private void saveReplyToDestination(SourceCallback<Object, JmsAttributes> sourceCallback, Message message, SourceCallbackContext context) {
        try {
            Destination replyTo = message.getJMSReplyTo();
            if (replyTo != null) {
                context.addVariable(REPLY_TO_DESTINATION, (Object)replyTo);
            }
        }
        catch (JMSException e) {
            LOGGER.error("An error occurred while obtaining the ReplyTo destination: ", (Throwable)e);
            sourceCallback.onSourceException((Throwable)((Object)new JmsExtensionException((Exception)((Object)e), "An error occurred while obtaining the ReplyTo destination: ")));
        }
    }

    private void saveMessageForAck(Message message, SourceCallbackContext context) {
        if (this.ackMode.equals((Object)AckMode.AUTO)) {
            context.addVariable(MESSAGE_TO_ACK, (Object)message);
        }
    }

    private void ackOriginalMessage(SourceCallbackContext callbackContext) {
        Message message = (Message)callbackContext.getVariable(MESSAGE_TO_ACK);
        if (message != null) {
            try {
                message.acknowledge();
            }
            catch (JMSException e) {
                LOGGER.error("Failed to ACK the pending message: ", (Throwable)e);
                callbackContext.getSourceCallback().onSourceException((Throwable)e);
            }
        }
    }

    private boolean replyDestinationIsTopic(Destination destination) {
        if (destination instanceof Topic && destination instanceof Queue && this.jmsSupport instanceof Jms102bSupport) {
            LOGGER.error(StringMessageUtils.getBoilerPlate((String)"Destination implements both Queue and Topic while complying with JMS 1.0.2b specification. Please report your application server or JMS vendor name and version to http://www.mulesoft.org/jira"));
        }
        return destination instanceof Topic;
    }
}

