package com.mulesoft.connector.azure.messaging.internal.source;

import com.microsoft.azure.servicebus.ClientFactory;
import com.microsoft.azure.servicebus.IMessageReceiver;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.mulesoft.connector.azure.messaging.api.AckMode;
import com.mulesoft.connector.azure.messaging.api.AzureMessageAttributes;
import com.mulesoft.connector.azure.messaging.internal.config.AzureServiceBusConfiguration;
import com.mulesoft.connector.azure.messaging.internal.connection.AzureServiceBusConnection;
import com.mulesoft.connector.azure.messaging.internal.connection.PeekLockReceiver;
import com.mulesoft.connector.azure.messaging.internal.metadata.DestinationValueProvider;
import com.mulesoft.connector.azure.messaging.internal.model.Destination;
import com.mulesoft.connector.azure.messaging.internal.model.ResponseMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.BackPressure;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.annotation.values.OfValues;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.source.BackPressureMode;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@BackPressure(defaultMode = BackPressureMode.WAIT, supportedModes = {BackPressureMode.WAIT})
@ClusterSupport(SourceClusterSupport.DEFAULT_PRIMARY_NODE_ONLY)
@MediaType(value = "*/*", strict = false)
@Alias("message-listener")
@EmitsResponse
/* loaded from: input_file:com/mulesoft/connector/azure/messaging/internal/source/MessageListenerSource.class */
public class MessageListenerSource extends Source<byte[], AzureMessageAttributes> {
    private static final Logger logger = LoggerFactory.getLogger(MessageListenerSource.class);

    @Connection
    private ConnectionProvider<AzureServiceBusConnection> connectionProvider;

    @Config
    private AzureServiceBusConfiguration configuration;
    private AzureServiceBusConnection connection;

    @Inject
    private SchedulerService schedulerService;

    @Inject
    private SchedulerConfig schedulerConfig;

    @OfValues(DestinationValueProvider.class)
    @ParameterGroup(name = "Destination")
    Destination destination;

    @DisplayName("Acknowledgement mode")
    @Parameter
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private AckMode ackMode;

    @Optional(defaultValue = "0")
    @Parameter
    @Summary("When Prefetch is enabled, the receiver quietly acquires more messages, up to the PrefetchCount limit, beyond what the application initially asked for. Setting this to 0 will disable prefetching")
    @Placement(tab = "Advanced")
    private Integer prefetchCount;

    @Optional
    @Parameter
    @Summary("The number of concurrent consumers that will be used to receive messages")
    @Placement(tab = "Advanced")
    private Integer numberOfConsumers;

    @Optional(defaultValue = "true")
    @Parameter
    @Summary("Buffered is the default mode: the consumer(s) send the message to the flow and immediately receive the next one. If it is disabled, the consumer(s) will not receive the next message until the current one exits the flow.")
    @Placement(tab = "Advanced")
    private boolean bufferedMode;
    private List<ReceiverTask> receiverTasks;
    private Scheduler listenerScheduler;

    public void onStart(SourceCallback<byte[], AzureMessageAttributes> sourceCallback) {
        try {
            String destinationName = (this.destination.getSubscriptionName() == null || this.destination.getSubscriptionName().isEmpty() || this.destination.getSubscriptionName().equalsIgnoreCase("NONE")) ? this.destination.getDestinationName() : this.destination.getDestinationName() + "/Subscriptions/" + this.destination.getSubscriptionName();
            logger.info("Starting listener for entity {}", destinationName);
            this.connection = (AzureServiceBusConnection) this.connectionProvider.connect();
            if (this.numberOfConsumers == null || this.numberOfConsumers.intValue() < 1) {
                this.numberOfConsumers = Integer.valueOf(Runtime.getRuntime().availableProcessors());
            }
            this.listenerScheduler = this.schedulerService.ioScheduler(this.schedulerConfig.withMaxConcurrentTasks(this.numberOfConsumers.intValue()).withName("Mule Azure Service Bus Messaging Connector - Message Listener Scheduler"));
            this.receiverTasks = new ArrayList();
            for (int i = 0; i < this.numberOfConsumers.intValue(); i++) {
                IMessageReceiver createMessageReceiverFromEntityPath = ClientFactory.createMessageReceiverFromEntityPath(this.connection.getMessagingFactory(), destinationName, ReceiveMode.valueOf(this.ackMode.getValue()));
                createMessageReceiverFromEntityPath.setPrefetchCount(this.prefetchCount.intValue());
                ReceiverTask receiverTask = new ReceiverTask(this.connection, createMessageReceiverFromEntityPath, sourceCallback, this.ackMode, i, this.configuration.getZoneId(), this.bufferedMode);
                this.receiverTasks.add(receiverTask);
                this.listenerScheduler.submit(receiverTask);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MuleRuntimeException(e);
        } catch (Exception e2) {
            throw new MuleRuntimeException(e2);
        }
    }

    @OnSuccess
    public void onSuccess(@Placement(tab = "Advanced") @DisplayName("Reply-To Response") @ParameterGroup(name = "Response", showInDsl = true) ResponseMessage responseMessage, CorrelationInfo correlationInfo, SourceCallbackContext sourceCallbackContext) {
        logger.debug("Finished the flow successfully.");
        if (this.ackMode.equals(AckMode.AUTO)) {
            complete(sourceCallbackContext);
        } else if (this.ackMode.equals(AckMode.MANUAL)) {
            abandonIfNotAcknowledged(sourceCallbackContext);
        }
        sourceCallbackContext.getVariable(ReceiverTask.REPLY_TO_DESTINATION_VAR).ifPresent(str -> {
            doReply(responseMessage, sourceCallbackContext, str, correlationInfo);
        });
    }

    @OnError
    public void onError(SourceCallbackContext sourceCallbackContext) {
        logger.debug("Finished the flow with error.");
        if (this.ackMode.equals(AckMode.AUTO)) {
            abandonIfNotAcknowledged(sourceCallbackContext, true);
        } else if (this.ackMode.equals(AckMode.MANUAL)) {
            abandonIfNotAcknowledged(sourceCallbackContext);
        }
    }

    protected void complete(SourceCallbackContext sourceCallbackContext) {
        try {
            String str = (String) sourceCallbackContext.getVariable(ReceiverTask.UUID_VAR).orElse("");
            PeekLockReceiver receiverFromLock = this.connection.getReceiverFromLock(str);
            if (!receiverFromLock.isAcknowledged()) {
                receiverFromLock.getReceiver().complete(UUID.fromString(str));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MuleRuntimeException(e);
        } catch (Exception e2) {
            throw new MuleRuntimeException(e2);
        }
    }

    protected void abandonIfNotAcknowledged(SourceCallbackContext sourceCallbackContext) {
        abandonIfNotAcknowledged(sourceCallbackContext, false);
    }

    protected void abandonIfNotAcknowledged(SourceCallbackContext sourceCallbackContext, boolean z) {
        try {
            String str = (String) sourceCallbackContext.getVariable(ReceiverTask.UUID_VAR).orElse("");
            PeekLockReceiver receiverFromLock = this.connection.getReceiverFromLock(str);
            if (z || (!receiverFromLock.isAcknowledged() && !receiverFromLock.isAbandoned())) {
                receiverFromLock.getReceiver().abandon(UUID.fromString(str));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MuleRuntimeException(e);
        } catch (Exception e2) {
            throw new MuleRuntimeException(e2);
        }
    }

    @OnTerminate
    public void onTerminate(SourceCallbackContext sourceCallbackContext) {
        if (!this.ackMode.equals(AckMode.IMMEDIATE)) {
            this.connection.unRegisterReceiverToLock((String) sourceCallbackContext.getVariable(ReceiverTask.UUID_VAR).orElse(""));
        }
        if (!this.bufferedMode) {
            sourceCallbackContext.getVariable(ReceiverTask.SEMAPHORE_VAR).ifPresent((v0) -> {
                v0.release();
            });
        }
        logger.debug("Terminated flow with context {}", sourceCallbackContext);
    }

    public void onStop() {
        logger.info("Stopping listener {}", getClass().getSimpleName());
        if (this.receiverTasks != null) {
            this.receiverTasks.forEach((v0) -> {
                IOUtils.closeQuietly(v0);
            });
        }
        if (this.listenerScheduler != null) {
            this.listenerScheduler.stop();
        }
        logger.info("Stopped listener {}", getClass().getSimpleName());
    }

    private void doReply(ResponseMessage responseMessage, SourceCallbackContext sourceCallbackContext, String str, CorrelationInfo correlationInfo) {
        logger.debug("Begin reply to destination {}", str);
        try {
            this.connection.send(str, new ResponseMessageBuilder().build(responseMessage, correlationInfo, this.configuration.getZoneId(), (String) sourceCallbackContext.getVariable(ReceiverTask.REPLY_TO_SESSION_ID_VAR).orElse(responseMessage.getSessionId())));
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }
}
