package org.mule.modules.microsoftservicebus.extension.internal.source;

import java.util.Map;
import javax.jms.JMSException;
import org.mule.modules.microsoftservicebus.extension.api.entity.AmqpMessage;
import org.mule.modules.microsoftservicebus.extension.internal.amqp.messaging.MessageHandler;
import org.mule.modules.microsoftservicebus.extension.internal.config.ServiceBusConfig;
import org.mule.modules.microsoftservicebus.extension.internal.connection.ServiceBusConnection;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.AbstractJmsListeningContainer;

@Alias("topic-receive")
@EmitsResponse
/* loaded from: input_file:org/mule/modules/microsoftservicebus/extension/internal/source/TopicReceive.class */
public class TopicReceive extends Source<AmqpMessage, Map<String, Object>> {
    private static final Logger logger = LoggerFactory.getLogger(TopicReceive.class);
    private AbstractJmsListeningContainer container;

    @Parameter
    private String sourceTopic;

    @Parameter
    private String subscriptionName;

    @Config
    private ServiceBusConfig config;

    @Connection
    private ConnectionProvider<ServiceBusConnection> connectionProvider;
    private ServiceBusConnection connection;

    public void onStart(SourceCallback sourceCallback) throws MuleException {
        logger.info("Preparing receive for topic: {}", this.sourceTopic);
        this.connection = (ServiceBusConnection) this.connectionProvider.connect();
        this.container = this.connection.getAmqpClient().createContainerForTopic(sourceCallback, this.sourceTopic, this.subscriptionName, this.connection.getBlobContainer());
    }

    @OnSuccess
    public void onSuccess(SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(MessageHandler.JMS_MESSAGE).ifPresent(message -> {
            try {
                message.acknowledge();
                logger.debug("Message acknowledged");
            } catch (JMSException e) {
                logger.debug("Could not acknowledge message", e);
            }
        });
    }

    @OnError
    public void onError(SourceCallbackContext sourceCallbackContext) {
    }

    public void onStop() {
        if (this.container != null) {
            this.container.stop(() -> {
                try {
                    this.container.shutdown();
                } finally {
                    this.container = null;
                }
            });
        }
    }
}
