package com.mulesoft.extension.mq.internal;

import com.google.common.collect.ImmutableList;
import com.mulesoft.extension.mq.api.modes.SubscriberAckMode;
import com.mulesoft.extension.mq.api.source.MQSubscriber;
import com.mulesoft.extension.mq.internal.server.MessageListener;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.client.mq.domain.Lock;
import com.mulesoft.mq.restclient.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.internal.Destination;
import com.mulesoft.mq.restclient.utils.ClientUtils;
import com.mulesoft.mq.restclient.utils.MessageUtils;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/extension/mq/internal/AbstractSubscriber.class */
public abstract class AbstractSubscriber implements MQSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSubscriber.class);
    private final MessageListener messageListener;
    protected final MQCircuitBreaker circuitBreaker;
    protected final Destination destination;
    protected final SubscriberAckMode acknowledgementMode;
    protected final long acknowledgementTimeout;
    protected AtomicBoolean isRunning = new AtomicBoolean(false);

    public AbstractSubscriber(SubscriberAckMode subscriberAckMode, long j, Destination destination, MessageListener messageListener, MQCircuitBreaker mQCircuitBreaker) {
        this.acknowledgementMode = subscriberAckMode;
        this.acknowledgementTimeout = j;
        this.destination = destination;
        this.messageListener = messageListener;
        this.circuitBreaker = mQCircuitBreaker;
    }

    @Override // com.mulesoft.extension.mq.api.source.MQSubscriber
    public synchronized void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            LOGGER.debug("Starting Subscriber on destination '{}'", this.destination.getName());
            submitWork();
        }
    }

    @Override // com.mulesoft.extension.mq.api.source.MQSubscriber
    public void stop() {
        if (this.isRunning.compareAndSet(true, false)) {
            LOGGER.debug("Stopping Subscriber on destination '{}'", this.destination.getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void submitWork() {
        if (this.isRunning.get()) {
            doSubmitWork();
        }
    }

    protected abstract void doSubmitWork();

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleError(Throwable th) {
        if (!this.isRunning.get()) {
            LOGGER.debug("An error occurred while subscriber was shutting down: " + th.getMessage(), th);
            return;
        }
        if (th instanceof ResourceNotFoundException) {
            LOGGER.error("Destination '{}' not found. Shutting down subscriber...", this.destination.getName());
            stop();
        } else if (ClientUtils.isTimeout(th)) {
            LOGGER.error("Can not retrieve messages: {}.", MessageUtils.getCompleteMessage(th), th);
        } else {
            LOGGER.debug("ENTRO POR DONDE DICE QUE NO ES TIMEOUT");
            this.messageListener.onError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessages(List<AnypointMQMessage> list) {
        LOGGER.debug("Received messages count: {}", Integer.valueOf(list.size()));
        processMessages(list.iterator());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessages(Iterator<AnypointMQMessage> it) {
        if (this.isRunning.get()) {
            while (it.hasNext()) {
                processMessage(it.next());
            }
        } else {
            LOGGER.debug("Subscriber stopped, returning all local messages to the queue");
            nackAll(it);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessage(AnypointMQMessage anypointMQMessage) {
        this.messageListener.onReceive(anypointMQMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void nackAll(Iterator<AnypointMQMessage> it) {
        nackAll((List<AnypointMQMessage>) ImmutableList.copyOf(it));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void nackAll(List<AnypointMQMessage> list) {
        if (list.isEmpty()) {
            return;
        }
        this.destination.nack((List) list.stream().map(anypointMQMessage -> {
            return new Lock(anypointMQMessage.getMessageId(), anypointMQMessage.getLockId());
        }).collect(Collectors.toList())).fireAndForget();
    }
}
