package org.mule.jms.commons.internal.consume;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.CompletionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.commons.lang3.time.StopWatch;
import org.mule.jms.commons.api.exception.JmsConsumeException;
import org.mule.jms.commons.api.exception.JmsTimeoutException;
import org.mule.jms.commons.internal.message.JmsResultFactory;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.api.util.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/jms/commons/internal/consume/JmsMessageConsumer.class */
public final class JmsMessageConsumer implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageConsumer.class);
    private final MessageConsumer consumer;
    private final JmsResultFactory resultFactory = JmsResultFactory.getInstance();

    public JmsMessageConsumer(MessageConsumer messageConsumer) {
        Preconditions.checkArgument(messageConsumer != null, "A non null MessageConsumer is required to use as delegate");
        this.consumer = messageConsumer;
    }

    public void listen(MessageListener messageListener) throws JMSException {
        this.consumer.setMessageListener(messageListener);
    }

    public Message consume(Long l) throws JMSException, JmsTimeoutException {
        return l.longValue() == -1 ? receive() : l.longValue() == 0 ? receiveNoWait() : receiveWithTimeout(l);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws JMSException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Closing consumer " + this.consumer);
        }
        this.consumer.close();
    }

    private Message receiveWithTimeout(Long l) throws JMSException, JmsTimeoutException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Waiting for a message, timeout will be in [%s] millis", l));
        }
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        Message receive = this.consumer.receive(l.longValue());
        stopWatch.stop();
        if (receive != null || stopWatch.getTime() < l.longValue()) {
            return receive;
        }
        throw new JmsTimeoutException("Failed to retrieve a Message, operation timed out");
    }

    private Message receiveNoWait() throws JMSException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Trying to consume an immediately available message");
        }
        return this.consumer.receiveNoWait();
    }

    private Message receive() throws JMSException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("No Timeout set, waiting for a message until one arrives");
        }
        return this.consumer.receive();
    }

    public MessageConsumer get() {
        return this.consumer;
    }

    public void consume(Long l, Scheduler scheduler, CompletionListener completionListener) {
        if (l.longValue() == -1) {
            listenForMessage(completionListener, scheduler);
            return;
        }
        if (l.longValue() != 0) {
            listenForMessage(l, scheduler, completionListener);
            return;
        }
        try {
            completionListener.onCompletion(this.consumer.receiveNoWait());
        } catch (Exception e) {
            completionListener.onException((Message) null, e);
        }
    }

    private void listenForMessage(Long l, Scheduler scheduler, CompletionListener completionListener) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Reference reference = new Reference();
        try {
            this.consumer.setMessageListener(message -> {
                if (atomicBoolean.compareAndSet(true, true)) {
                    return;
                }
                if (reference.get() != null) {
                    ((ScheduledFuture) reference.get()).cancel(true);
                }
                try {
                    synchronized (this.consumer) {
                        if (!atomicBoolean2.getAndSet(true) && this.consumer.getMessageListener() != null) {
                            scheduler.submit(() -> {
                                this.consumer.setMessageListener((MessageListener) null);
                            });
                        }
                    }
                } catch (Throwable th) {
                    LOGGER.warn("An unknown error occurred trying to shutdown a listener.", th);
                }
                completionListener.onCompletion(message);
            });
        } catch (JMSException e) {
            completionListener.onException((Message) null, e);
        }
        if (l.longValue() > 0) {
            reference.set(scheduler.schedule(() -> {
                if (atomicBoolean.get()) {
                    return;
                }
                try {
                    synchronized (this.consumer) {
                        atomicBoolean2.getAndSet(true);
                        if (this.consumer.getMessageListener() != null) {
                            this.consumer.setMessageListener((MessageListener) null);
                        }
                    }
                    if (!atomicBoolean.get()) {
                        completionListener.onException((Message) null, new JmsTimeoutException(String.format("Failed to retrieve a Message. Operation timed out after %s milliseconds", l)));
                    }
                } catch (JMSException e2) {
                    completionListener.onException((Message) null, new JmsConsumeException("Unable to listen for message.", (Exception) e2));
                }
            }, l.longValue(), TimeUnit.MILLISECONDS));
        }
    }

    private void listenForMessage(CompletionListener completionListener, Scheduler scheduler) {
        listenForMessage(-1L, scheduler, completionListener);
    }
}
