package com.mulesoft.extension.mq.internal.messagelistener;

import com.mulesoft.extension.mq.api.attributes.AnypointMQMessageAttributes;
import com.mulesoft.extension.mq.api.modes.SubscriberAckMode;
import com.mulesoft.extension.mq.internal.config.SubscriberConfiguration;
import com.mulesoft.extension.mq.internal.source.AnypointMQSource;
import com.mulesoft.mq.restclient.api.AnypointMQMessage;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.Lock;
import com.mulesoft.mq.restclient.api.exception.ResourceNotFoundException;
import java.util.Optional;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/extension/mq/internal/messagelistener/MessageListenerImpl.class */
public class MessageListenerImpl implements MessageListener {
    private static final Logger logger = LoggerFactory.getLogger(MessageListenerImpl.class);
    private final SubscriberConfiguration subscriberConfiguration;
    private final SubscriberAckMode acknowledgementMode;
    private final Destination destination;
    private SourceCallback<byte[], AnypointMQMessageAttributes> callback;

    public MessageListenerImpl(SubscriberConfiguration subscriberConfiguration, Destination destination, SourceCallback<byte[], AnypointMQMessageAttributes> sourceCallback) {
        this.subscriberConfiguration = subscriberConfiguration;
        this.acknowledgementMode = (SubscriberAckMode) Optional.ofNullable(subscriberConfiguration.getAcknowledgementMode()).orElse(SubscriberAckMode.AUTO);
        this.destination = destination;
        this.callback = sourceCallback;
    }

    @Override // com.mulesoft.extension.mq.internal.messagelistener.MessageListener
    public void onReceive(AnypointMQMessage anypointMQMessage) {
        if (shouldHandleRedelivery(anypointMQMessage, (Integer) Optional.ofNullable(Integer.valueOf(this.subscriberConfiguration.getMaxRedelivery())).orElse(-1))) {
            handleMessage(anypointMQMessage);
        }
    }

    private void handleMessage(AnypointMQMessage anypointMQMessage) {
        try {
            this.callback.handle(toResult(anypointMQMessage), createCallbackContext(anypointMQMessage));
            if (this.acknowledgementMode == SubscriberAckMode.IMMEDIATE) {
                this.destination.ack(new Lock(anypointMQMessage)).fireAndForget();
            }
        } catch (Exception e) {
            logger.error("An error occurred while dispatching a message to the Mule flow", e);
            this.destination.nack(new Lock(anypointMQMessage)).fireAndForget();
            throw e;
        }
    }

    private Result<byte[], AnypointMQMessageAttributes> toResult(AnypointMQMessage anypointMQMessage) {
        AnypointMQMessageAttributes anypointMQMessageAttributes = new AnypointMQMessageAttributes(this.destination.getName(), anypointMQMessage, this.acknowledgementMode == SubscriberAckMode.MANUAL);
        Result.Builder builder = Result.builder();
        builder.output(anypointMQMessage.getBody()).attributes(anypointMQMessageAttributes);
        if (anypointMQMessage.getContentType() != null) {
            builder.mediaType(MediaType.parse(anypointMQMessage.getContentType()));
        }
        return builder.build();
    }

    private SourceCallbackContext createCallbackContext(AnypointMQMessage anypointMQMessage) {
        SourceCallbackContext createContext = this.callback.createContext();
        createContext.addVariable(AnypointMQSource.SourceCallbackContextKeys.ACKNOWLEDGEMENT_MODE, this.acknowledgementMode);
        createContext.addVariable(AnypointMQSource.SourceCallbackContextKeys.DESTINATION, this.destination);
        createContext.addVariable(AnypointMQSource.SourceCallbackContextKeys.MESSAGE, anypointMQMessage);
        return createContext;
    }

    @Override // com.mulesoft.extension.mq.internal.messagelistener.MessageListener
    public void onError(Throwable th) {
        if (!(th instanceof ResourceNotFoundException)) {
            logger.error("Can not process received message.", th);
        } else {
            logger.error("Destination not found: {}. Shutting down subscriber...", this.destination);
            this.callback.onConnectionException(new ConnectionException(th));
        }
    }

    @Override // com.mulesoft.extension.mq.internal.messagelistener.MessageListener
    public void onStop() {
        this.callback = null;
    }

    private boolean shouldHandleRedelivery(AnypointMQMessage anypointMQMessage, Integer num) throws RuntimeException {
        boolean z = true;
        if (num.intValue() >= 0 && anypointMQMessage.getDeliveryCount() - 1 > num.intValue()) {
            z = false;
        }
        return z;
    }
}
