package com.mulesoft.mq.restclient.internal.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.client.mq.domain.Lock;
import com.mulesoft.mq.restclient.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.exception.RestException;
import com.mulesoft.mq.restclient.internal.BufferedQueue;
import com.mulesoft.mq.restclient.internal.CourierObserver;
import com.mulesoft.mq.restclient.internal.Destination;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.Prefetcher;
import com.mulesoft.mq.restclient.utils.ClientUtils;
import com.mulesoft.mq.restclient.utils.ExecutorUtils;
import com.mulesoft.mq.restclient.utils.MessageUtils;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:com/mulesoft/mq/restclient/internal/impl/ScheduledPrefetcher.class */
public class ScheduledPrefetcher implements Prefetcher {
    public static final int MAX_CONCURRENT_REQUESTS = 3;
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledPrefetcher.class);
    private final ScheduledExecutorService retriever;
    private final Destination destination;
    private final int batchSize;
    private final long lockTimeToLive;
    private final int bufferLimit;
    private final MQCircuitBreaker circuitBreaker;
    private final int maxConcurrentRequests;
    private final int circuitTtl;
    private AtomicBoolean running;
    private long retrievePeriod;
    private MessagePreserver preserver;
    private BufferedQueue bufferedQueue;
    private RestException invalidPrefetcherException;
    private final BlockingQueue<Subscriber<? super AnypointMQMessage>> waitingSubscribers = new LinkedBlockingQueue();
    private final AtomicInteger inFlightRequests = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.mulesoft.mq.restclient.internal.impl.ScheduledPrefetcher$2, reason: invalid class name */
    /* loaded from: input_file:com/mulesoft/mq/restclient/internal/impl/ScheduledPrefetcher$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$mulesoft$mq$restclient$circuit$MQCircuitBreaker$CircuitState = new int[MQCircuitBreaker.CircuitState.values().length];

        static {
            try {
                $SwitchMap$com$mulesoft$mq$restclient$circuit$MQCircuitBreaker$CircuitState[MQCircuitBreaker.CircuitState.OPEN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$mulesoft$mq$restclient$circuit$MQCircuitBreaker$CircuitState[MQCircuitBreaker.CircuitState.CLOSED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$mulesoft$mq$restclient$circuit$MQCircuitBreaker$CircuitState[MQCircuitBreaker.CircuitState.HALF_OPEN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:com/mulesoft/mq/restclient/internal/impl/ScheduledPrefetcher$NoOpMessagePreserver.class */
    private static final class NoOpMessagePreserver implements MessagePreserver {
        private NoOpMessagePreserver() {
        }

        @Override // com.mulesoft.mq.restclient.internal.MessagePreserver
        public void add(AnypointMQMessage anypointMQMessage, long j) {
        }

        @Override // com.mulesoft.mq.restclient.internal.MessagePreserver
        public void add(List<AnypointMQMessage> list, long j) {
        }

        @Override // com.mulesoft.mq.restclient.internal.MessagePreserver
        public boolean remove(String str) {
            return false;
        }

        @Override // com.mulesoft.mq.restclient.internal.MessagePreserver
        public boolean isExpired(String str) {
            return false;
        }

        @Override // com.mulesoft.mq.restclient.internal.MessagePreserver
        public void stop() {
        }
    }

    public ScheduledPrefetcher(Destination destination, int i, long j, long j2, MessagePreserver messagePreserver, MQCircuitBreaker mQCircuitBreaker, int i2) {
        Preconditions.checkArgument(mQCircuitBreaker != null, "A valid MQ CircuitBreaker implementation should be provided but 'null' was found");
        this.circuitBreaker = mQCircuitBreaker;
        this.destination = destination;
        this.bufferLimit = i;
        this.batchSize = Math.min(i, 10);
        this.maxConcurrentRequests = getMaxConcurrentRequests(i);
        this.retrievePeriod = j2;
        this.lockTimeToLive = j <= 0 ? Destination.DEFAULT_LOCK_TTL : j;
        this.bufferedQueue = new SimpleBufferedQueue();
        this.retriever = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("scheduled-prefetcher-retriever-%d").build());
        this.preserver = messagePreserver == null ? new NoOpMessagePreserver() : messagePreserver;
        this.running = new AtomicBoolean(false);
        this.circuitTtl = i2;
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.retriever.scheduleAtFixedRate(() -> {
                retrieveMessages();
            }, 0L, this.retrievePeriod, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        if (!this.running.compareAndSet(true, false)) {
            return;
        }
        while (true) {
            AnypointMQMessage take = this.bufferedQueue.take();
            if (take == null) {
                this.preserver.stop();
                this.bufferedQueue.clear();
                this.inFlightRequests.set(0);
                ExecutorUtils.stopExecutorService(this.retriever);
                return;
            }
            this.preserver.remove(take.getMessageId());
        }
    }

    private void retrieveMessages() {
        if (!this.running.get()) {
            LOGGER.debug("Prefetcher already stopped, skipping message fetch");
            return;
        }
        switch (AnonymousClass2.$SwitchMap$com$mulesoft$mq$restclient$circuit$MQCircuitBreaker$CircuitState[this.circuitBreaker.getState().ordinal()]) {
            case Destination.MIN_BATCH_SIZE /* 1 */:
                LOGGER.debug("Circuit is OPEN, skipping message fetch");
                return;
            case 2:
                doFetch(this.batchSize);
                return;
            case MAX_CONCURRENT_REQUESTS /* 3 */:
                if (this.inFlightRequests.get() > 0) {
                    LOGGER.debug("Already {} request in progress, skipping message fetch for test", Integer.valueOf(this.inFlightRequests.get()));
                    return;
                } else {
                    doFetch(1);
                    return;
                }
            default:
                LOGGER.error("Invalid circuit breaker value: {}", this.circuitBreaker.getState());
                return;
        }
    }

    private void doFetch(int i) {
        int size = this.bufferedQueue.size();
        if (size >= this.bufferLimit) {
            LOGGER.debug("Buffer is full, skipping message fetch");
            return;
        }
        if (this.inFlightRequests.get() >= this.maxConcurrentRequests) {
            LOGGER.debug("Already {} request in progress, skipping message fetch", Integer.valueOf(this.maxConcurrentRequests));
            return;
        }
        LOGGER.debug("Retrieving messages...");
        LOGGER.debug("Buffer contains '{}' messages", Integer.valueOf(size));
        this.inFlightRequests.incrementAndGet();
        this.destination.receive(i, 20000L, this.lockTimeToLive).subscribe(new CourierObserver<List<AnypointMQMessage>>() { // from class: com.mulesoft.mq.restclient.internal.impl.ScheduledPrefetcher.1
            @Override // com.mulesoft.mq.restclient.internal.CourierObserver
            public void onSuccess(List<AnypointMQMessage> list) {
                if (list == null || list.size() <= 0) {
                    ScheduledPrefetcher.LOGGER.debug("No messages received");
                    ScheduledPrefetcher.this.circuitBreaker.releaseCircuitLock();
                } else {
                    ScheduledPrefetcher.this.handleRetrieveSuccess(list);
                }
                ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
            }

            @Override // com.mulesoft.mq.restclient.internal.CourierObserver
            public void onError(Throwable th) {
                ScheduledPrefetcher.this.handleRetrieveError(th);
                ScheduledPrefetcher.this.circuitBreaker.releaseCircuitLock();
                ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRetrieveError(Throwable th) {
        if (ClientUtils.isTimeout(th)) {
            LOGGER.debug("Timeout while retrieving messages.");
            return;
        }
        if (requestedToStop()) {
            LOGGER.debug("Prefetcher already disposed, ignoring exception.", th);
        } else {
            if (!(th instanceof ResourceNotFoundException)) {
                LOGGER.error("Can not retrieve messages: {}.", MessageUtils.getCompleteMessage(th), th);
                return;
            }
            LOGGER.error("Destination not found: {}. Shutting down scheduler prefetcher...", this.destination.getName());
            stop();
            raiseErrorOnSubscribers((ResourceNotFoundException) th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRetrieveSuccess(List<AnypointMQMessage> list) {
        LOGGER.debug("Received '{}' messages: ", Integer.valueOf(list.size()));
        switch (AnonymousClass2.$SwitchMap$com$mulesoft$mq$restclient$circuit$MQCircuitBreaker$CircuitState[this.circuitBreaker.getState().ordinal()]) {
            case Destination.MIN_BATCH_SIZE /* 1 */:
                nackAll(list);
                this.circuitBreaker.releaseCircuitLock();
                return;
            case 2:
                dispatchAll(list);
                return;
            case MAX_CONCURRENT_REQUESTS /* 3 */:
                tryCircuitTest(list, this.waitingSubscribers.poll());
                return;
            default:
                LOGGER.error("Invalid circuit breaker value: {}", this.circuitBreaker.getState());
                return;
        }
    }

    private void tryCircuitTest(List<AnypointMQMessage> list, Subscriber<? super AnypointMQMessage> subscriber) {
        if (subscriber == null) {
            list.forEach(this::addToBuffer);
            return;
        }
        List<AnypointMQMessage> list2 = list;
        if (this.circuitBreaker.acquireCircuitLock()) {
            LOGGER.debug("Circuit HALF_OPEN, dispatching single message for testing");
            Iterator<AnypointMQMessage> it = list.iterator();
            dispatchMessageToSubscriber(subscriber, it.next());
            list2 = ImmutableList.copyOf(it);
        } else {
            this.waitingSubscribers.offer(subscriber);
        }
        waitCircuitTest();
        if (this.circuitBreaker.isClosed()) {
            LOGGER.debug("Circuit recovered, submitting next poll");
            dispatchAll(list2);
        } else {
            LOGGER.debug("Circuit testing failed, returning remaining messages to the Queue and waiting for next poll");
            nackAll(list2);
        }
    }

    private void waitCircuitTest() {
        try {
            LOGGER.debug("Circuit testing in progress, waiting for circuit test to complete");
            this.circuitBreaker.awaitCircuitLock(this.circuitTtl);
        } catch (InterruptedException e) {
            if (this.running.get()) {
                LOGGER.debug("Thread interrupted while waiting for the circuit test lock");
            }
        }
    }

    private void dispatchAll(List<AnypointMQMessage> list) {
        Subscriber<? super AnypointMQMessage> poll;
        for (AnypointMQMessage anypointMQMessage : list) {
            if (!this.circuitBreaker.isClosed() || (poll = this.waitingSubscribers.poll()) == null) {
                addToBuffer(anypointMQMessage);
            } else {
                dispatchMessageToSubscriber(poll, anypointMQMessage);
            }
        }
        submitRetrieve();
    }

    private void addToBuffer(AnypointMQMessage anypointMQMessage) {
        this.preserver.add(anypointMQMessage, this.lockTimeToLive);
        this.bufferedQueue.add(anypointMQMessage);
    }

    private void submitRetrieve() {
        if (requestedToStop()) {
            return;
        }
        this.retriever.submit(this::retrieveMessages);
    }

    private boolean requestedToStop() {
        return !this.running.get() || this.retriever.isShutdown();
    }

    private void raiseErrorOnSubscribers(ResourceNotFoundException resourceNotFoundException) {
        this.invalidPrefetcherException = resourceNotFoundException;
        while (true) {
            Subscriber<? super AnypointMQMessage> poll = this.waitingSubscribers.poll();
            if (poll == null) {
                return;
            } else {
                poll.onError(resourceNotFoundException);
            }
        }
    }

    @Override // com.mulesoft.mq.restclient.internal.Prefetcher
    public Observable<AnypointMQMessage> get() {
        return Observable.create(subscriber -> {
            AnypointMQMessage anypointMQMessage;
            if (this.invalidPrefetcherException != null) {
                subscriber.onError(this.invalidPrefetcherException);
                return;
            }
            AnypointMQMessage take = this.bufferedQueue.take();
            while (true) {
                anypointMQMessage = take;
                if (anypointMQMessage == null || !isExpired(anypointMQMessage)) {
                    break;
                }
                LOGGER.debug("Discarding expired message with ID '{}'", anypointMQMessage.getMessageId());
                this.preserver.remove(anypointMQMessage.getMessageId());
                take = this.bufferedQueue.take();
            }
            if (anypointMQMessage != null) {
                handleMessageFromBuffer(subscriber, anypointMQMessage);
                return;
            }
            LOGGER.debug("No message available in the buffer");
            while (anypointMQMessage == null && this.inFlightRequests.get() > 0 && this.running.get()) {
                anypointMQMessage = this.bufferedQueue.poll(20000L, TimeUnit.MILLISECONDS);
            }
            if (anypointMQMessage != null) {
                handleMessageFromBuffer(subscriber, anypointMQMessage);
            } else {
                this.waitingSubscribers.offer(subscriber);
                submitRetrieve();
            }
        });
    }

    private void handleMessageFromBuffer(Subscriber<? super AnypointMQMessage> subscriber, AnypointMQMessage anypointMQMessage) {
        switch (AnonymousClass2.$SwitchMap$com$mulesoft$mq$restclient$circuit$MQCircuitBreaker$CircuitState[this.circuitBreaker.getState().ordinal()]) {
            case Destination.MIN_BATCH_SIZE /* 1 */:
                nackAll(Collections.singletonList(anypointMQMessage));
                this.waitingSubscribers.offer(subscriber);
                return;
            case 2:
                dispatchMessageToSubscriber(subscriber, anypointMQMessage);
                return;
            case MAX_CONCURRENT_REQUESTS /* 3 */:
                tryCircuitTest(Collections.singletonList(anypointMQMessage), subscriber);
                return;
            default:
                LOGGER.error("Invalid circuit breaker value: {}", this.circuitBreaker.getState());
                return;
        }
    }

    private void nackAll(List<AnypointMQMessage> list) {
        LOGGER.debug("Returning all messages to the Queue");
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.addAll(list);
        while (true) {
            AnypointMQMessage take = this.bufferedQueue.take();
            if (take == null) {
                break;
            }
            this.preserver.remove(take.getMessageId());
            builder.add(take);
        }
        Stream<R> map = list.stream().map((v0) -> {
            return v0.getMessageId();
        });
        MessagePreserver messagePreserver = this.preserver;
        messagePreserver.getClass();
        map.forEach(messagePreserver::remove);
        List<Lock> list2 = (List) builder.build().stream().map(Lock::new).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return;
        }
        this.destination.nack(list2).fireAndForget();
    }

    private boolean isExpired(AnypointMQMessage anypointMQMessage) {
        return this.preserver.isExpired(anypointMQMessage.getMessageId());
    }

    private void dispatchMessageToSubscriber(Subscriber<? super AnypointMQMessage> subscriber, AnypointMQMessage anypointMQMessage) {
        try {
            LOGGER.debug("Dispatch to subscriber message with ID '{}'", anypointMQMessage.getMessageId());
            subscriber.onStart();
            subscriber.onNext(anypointMQMessage);
            this.preserver.remove(anypointMQMessage.getMessageId());
        } finally {
            subscriber.onCompleted();
        }
    }

    private int getMaxConcurrentRequests(int i) {
        if (i > 10) {
            return Math.min(i / 10, 3);
        }
        return 1;
    }
}
