package com.mulesoft.extension.mq.internal.source;

import com.mulesoft.extension.mq.api.attributes.AnypointMQMessageAttributes;
import com.mulesoft.extension.mq.api.circuit.CircuitBreakerConfiguration;
import com.mulesoft.extension.mq.api.exception.MQAckException;
import com.mulesoft.extension.mq.api.exception.MQDestinationNotFoundException;
import com.mulesoft.extension.mq.api.exception.MQNackException;
import com.mulesoft.extension.mq.api.modes.SubscriberAckMode;
import com.mulesoft.extension.mq.api.source.PrefetchTypeSubscriberFactory;
import com.mulesoft.extension.mq.api.source.SubscriberFactory;
import com.mulesoft.extension.mq.internal.config.DefaultValues;
import com.mulesoft.extension.mq.internal.config.SubscriberConfig;
import com.mulesoft.extension.mq.internal.connection.AnypointMQConnection;
import com.mulesoft.extension.mq.internal.error.AnypointMQSubscriberErrorTypeProvider;
import com.mulesoft.extension.mq.internal.server.AnypointMQServer;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.circuit.impl.NoOpCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.client.mq.domain.Lock;
import com.mulesoft.mq.restclient.client.mq.domain.MessageIdResult;
import com.mulesoft.mq.restclient.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.internal.CourierObserver;
import com.mulesoft.mq.restclient.internal.Destination;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.dsl.xml.ParameterDsl;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.NullSafe;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MediaType(value = "*/*", strict = false)
@Throws({AnypointMQSubscriberErrorTypeProvider.class})
@Alias("subscriber")
/* loaded from: input_file:com/mulesoft/extension/mq/internal/source/AnypointMQSource.class */
public class AnypointMQSource extends Source<InputStream, AnypointMQMessageAttributes> {
    public static final String ACK_MODE_CTX_VAR = "ACKNOWLEDGEMENT_MODE";
    public static final String DESTINATION_CTX_VAR = "DESTINATION";
    public static final String MESSAGE_CTX_VAR = "MESSAGE";
    private static final String SOURCE_NAME_MASK = "_MQ_Subscriber_%s";

    @Parameter
    @Summary("The name of the Queue from which messages will be retrieved")
    @Placement(order = 1)
    @DisplayName("Queue")
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private String destination;

    @Placement(order = 2)
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    @ParameterDsl(allowReferences = false)
    @Optional
    @Parameter
    @Summary("The strategy to be used when subscribing for messages in the Queue")
    @NullSafe(defaultImplementingType = PrefetchTypeSubscriberFactory.class)
    private SubscriberFactory subscriberType;

    @Optional(defaultValue = "AUTO")
    @Parameter
    @Summary("Acknowledgement mode to use for the messages retrieved")
    @Placement(order = 3)
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private SubscriberAckMode acknowledgementMode;

    @Optional(defaultValue = DefaultValues.DEFAULT_CONSUMER_ACK_TIMEOUT)
    @Parameter
    @Summary("Duration that a message is held by a consumer waiting for an ACK or NACK, before returning to the Queue for redelivery")
    @Placement(order = 4)
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private long acknowledgementTimeout;

    @Optional(defaultValue = "MILLISECONDS")
    @Parameter
    @Summary("Time unit to be used in the acknowledgementTimeout configuration")
    @Placement(order = 5)
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private TimeUnit acknowledgementTimeoutUnit;

    @Optional
    @ParameterDsl(allowReferences = true)
    @Parameter
    @Placement(order = 6, tab = "Advanced")
    @Alias("circuitBreaker")
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private CircuitBreakerConfiguration circuitBreakerConfig;

    @Connection
    private ConnectionProvider<AnypointMQConnection> connectionProvider;

    @Inject
    private MQCircuitsManager circuitsManager;

    @Inject
    private SchedulerService schedulerService;
    private ComponentLocation componentLocation;
    private MQCircuitBreaker circuitBreaker = NO_OP_CIRCUIT;
    private AnypointMQServer server;
    private boolean isRunning;
    private static final Logger LOGGER = LoggerFactory.getLogger(AnypointMQSource.class);
    private static final MQCircuitBreaker NO_OP_CIRCUIT = new NoOpCircuitBreaker();

    public void onStart(SourceCallback<InputStream, AnypointMQMessageAttributes> sourceCallback) throws ConnectionException {
        String format = String.format(SOURCE_NAME_MASK, this.componentLocation.getRootContainerName());
        initialiseCircuitBreaker(format);
        this.server = new AnypointMQServer(new SubscriberConfig(this.acknowledgementMode, this.acknowledgementTimeout, this.acknowledgementTimeoutUnit, this.subscriberType), getDestination(), sourceCallback, this.circuitBreaker, format, this.schedulerService);
        this.server.start();
        this.isRunning = true;
    }

    private Destination getDestination() throws ConnectionException {
        try {
            return ((AnypointMQConnection) this.connectionProvider.connect()).getDestination(this.destination);
        } catch (ResourceNotFoundException e) {
            throw new MQDestinationNotFoundException(String.format("Failed to subscribe on destination '%s'. Such destination does not exist in the configured environment", this.destination));
        }
    }

    private void initialiseCircuitBreaker(String str) {
        if (this.circuitBreakerConfig != null) {
            if (this.circuitBreakerConfig.getErrorsThreshold() <= 0) {
                throw new IllegalArgumentException(String.format("Circuit's errors threshold `%s` cannot be 0 or less", Integer.valueOf(this.circuitBreakerConfig.getErrorsThreshold())));
            }
            if (this.circuitBreakerConfig.getTripTimeout() <= 0) {
                throw new IllegalArgumentException(String.format("Circuit's trip timeout `%s` cannot be 0 or less", Long.valueOf(this.circuitBreakerConfig.getTripTimeout())));
            }
            this.circuitBreaker = this.circuitsManager.getOrCreateCircuit(this.circuitBreakerConfig, str);
        }
    }

    public void onStop() {
        LOGGER.debug("Source requested to stop");
        if (this.server != null) {
            this.server.stop();
            this.isRunning = false;
        }
    }

    @OnSuccess
    public void onSuccess(SourceCallbackContext sourceCallbackContext) throws Exception {
        notifyCircuitSuccess();
        if (SubscriberAckMode.AUTO.equals(getAckMode(sourceCallbackContext))) {
            Destination destination = getDestination(sourceCallbackContext);
            Lock lock = new Lock(getMessage(sourceCallbackContext));
            try {
                doAck(destination, lock);
            } catch (Exception e) {
                doNack(destination, lock);
            }
        }
    }

    @OnError
    public void onError(Error error, SourceCallbackContext sourceCallbackContext) {
        notifyCircuitError(error);
        if (SubscriberAckMode.AUTO.equals(getAckMode(sourceCallbackContext))) {
            doNack(getDestination(sourceCallbackContext), new Lock(getMessage(sourceCallbackContext)));
        }
    }

    private void notifyCircuitSuccess() {
        LOGGER.debug("Notify Circuit SUCCESS");
        this.circuitBreaker.onSuccess();
    }

    private void notifyCircuitError(Error error) {
        String resolveErrorName = resolveErrorName(error);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Notify Circuit ERROR: {}", resolveErrorName);
            if (error != null) {
                LOGGER.debug("Error Cause: ", error.getCause());
            } else {
                LOGGER.debug("Error Cause: 'UNKNOWN' - Error was 'null'");
            }
        }
        this.circuitBreaker.onFailure(resolveErrorName);
    }

    private String resolveErrorName(Error error) {
        return error != null ? error.getErrorType().getNamespace() + ":" + error.getErrorType().getIdentifier() : "UNKNOWN";
    }

    private void doAck(final Destination destination, final Lock lock) {
        LOGGER.debug("Doing a ACK on destination '{}' with lockId '{}'", destination.getName(), lock.getLockId());
        destination.ack(lock).subscribe(new CourierObserver<MessageIdResult>() { // from class: com.mulesoft.extension.mq.internal.source.AnypointMQSource.1
            public void onSuccess(MessageIdResult messageIdResult) {
            }

            public void onError(Throwable th) {
                String format = String.format("Failed to do AUTO 'ACK' on the message with id '%s' on destination '%s': %s", lock.getMessageId(), destination.getName(), th.getMessage());
                if (AnypointMQSource.this.isRunning) {
                    AnypointMQSource.LOGGER.error(format, th);
                    throw new MQAckException(format, th);
                }
                AnypointMQSource.LOGGER.debug(format);
            }
        });
    }

    private void doNack(final Destination destination, final Lock lock) {
        LOGGER.debug("Doing a NACK on destination '{}' with lockId '{}'", destination.getName(), lock.getLockId());
        try {
            destination.nack(lock).subscribe(new CourierObserver<MessageIdResult>() { // from class: com.mulesoft.extension.mq.internal.source.AnypointMQSource.2
                public void onSuccess(MessageIdResult messageIdResult) {
                }

                public void onError(Throwable th) {
                    String format = String.format("Failed to do AUTO 'NACK' on the message with id '%s' on destination '%s': %s", lock.getMessageId(), destination.getName(), th.getMessage());
                    if (AnypointMQSource.this.isRunning) {
                        AnypointMQSource.LOGGER.error(format, th);
                        throw new MQNackException(format, th);
                    }
                    AnypointMQSource.LOGGER.debug(format);
                }
            });
        } catch (Exception e) {
            LOGGER.error("An error occurred while trying to perform an AUTO 'NACK' on message with id '{}' on destination '{}': {}", new Object[]{lock.getMessageId(), destination.getName(), e.getMessage(), e});
        }
    }

    private AnypointMQMessage getMessage(SourceCallbackContext sourceCallbackContext) {
        return (AnypointMQMessage) sourceCallbackContext.getVariable(MESSAGE_CTX_VAR).orElseThrow(() -> {
            return new IllegalStateException("Missing 'Message' information on callback");
        });
    }

    private Destination getDestination(SourceCallbackContext sourceCallbackContext) {
        return (Destination) sourceCallbackContext.getVariable(DESTINATION_CTX_VAR).orElseThrow(() -> {
            return new IllegalStateException("Missing 'Destination' information on callback");
        });
    }

    private SubscriberAckMode getAckMode(SourceCallbackContext sourceCallbackContext) {
        return (SubscriberAckMode) sourceCallbackContext.getVariable(ACK_MODE_CTX_VAR).orElseThrow(() -> {
            return new IllegalStateException("Missing 'Acknowledgment Mode' information on callback");
        });
    }
}
