package com.mulesoft.connectors.kafka.internal.model.consumer;

import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationInterruptedException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.core.api.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/kafka/internal/model/consumer/DefaultConsumerPool.class */
public class DefaultConsumerPool implements ConsumerPool {
    private static final Logger logger = LoggerFactory.getLogger(DefaultConsumerPool.class);
    private Map<MuleConsumer, Semaphore> consumerMap;
    private List<Map.Entry<MuleConsumer, Semaphore>> consumerIndexes;
    private AtomicInteger currentPosition = new AtomicInteger();
    private AtomicBoolean isValid = new AtomicBoolean(true);
    private Semaphore consumerAvailabilitySemaphore;

    public DefaultConsumerPool(Set<MuleConsumer> set) {
        this.consumerMap = (Map) set.stream().peek(muleConsumer -> {
            muleConsumer.setPool(this);
        }).collect(Collectors.toMap(Function.identity(), muleConsumer2 -> {
            return new Semaphore(1, true);
        }));
        this.consumerAvailabilitySemaphore = new Semaphore(set.size(), true);
        this.consumerIndexes = new ArrayList(this.consumerMap.entrySet());
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public MuleConsumer checkOut(Duration duration) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> checkOut = checkOut(true, duration);
        if (!checkOut.isPresent()) {
            checkOut = checkOut(false, duration);
        }
        return checkOut.orElseThrow(() -> {
            return new OperationTimeoutException(String.format("Unable to checkout a consumer from the consumer using timeout of %d ms", Long.valueOf(duration.toMillis())), duration.toMillis());
        });
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public MuleConsumer checkOut(String str, int i, Duration duration) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> checkOutConsumer = checkOutConsumer(Optional.of(muleConsumer -> {
            return muleConsumer.assignment().stream().anyMatch(topicPartition -> {
                return topicPartition.getTopic().equals(str) && topicPartition.getPartition() == i;
            });
        }), duration);
        MuleConsumer orElseThrow = checkOutConsumer.orElseThrow(() -> {
            return new NotFoundException(String.format("There is no consumer for the topic: %s and partition %d", str, Integer.valueOf(i)));
        });
        checkPoolIsValid(checkOutConsumer);
        return orElseThrow;
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public Set<MuleConsumer> checkoutAll(Duration duration) throws ConsumerPoolClosedException {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        ArrayList arrayList = new ArrayList(this.consumerMap.size());
        int i = 0;
        try {
            int i2 = 0;
            for (Map.Entry<MuleConsumer, Semaphore> entry : this.consumerMap.entrySet()) {
                acquireSemaphore(this.consumerAvailabilitySemaphore, duration, true);
                i++;
                MuleConsumer key = entry.getKey();
                Semaphore value = entry.getValue();
                i2++;
                logger.trace("Checking out consumer {}/{}: {}.", new Object[]{Integer.valueOf(i2), Integer.valueOf(this.consumerMap.size()), key.getId()});
                if (duration.isNegative() || duration.isZero()) {
                    value.acquire();
                } else if (!entry.getValue().tryAcquire(currentTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
                    throw new OperationTimeoutException(duration.toMillis());
                }
                logger.trace("Consumer {} checked out.", key.getId());
                arrayList.add(value);
            }
            Set<MuleConsumer> keySet = this.consumerMap.keySet();
            checkPoolIsValid(keySet);
            return keySet;
        } catch (InterruptedException e) {
            arrayList.stream().forEach((v0) -> {
                v0.release();
            });
            this.consumerAvailabilitySemaphore.release(0);
            throw new OperationInterruptedException(e);
        } catch (RuntimeException e2) {
            arrayList.stream().forEach((v0) -> {
                v0.release();
            });
            this.consumerAvailabilitySemaphore.release(0);
            throw e2;
        }
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public void checkIn(MuleConsumer muleConsumer) {
        logger.trace("Checking in consumer {}.", muleConsumer.getId());
        this.consumerMap.get(muleConsumer).release();
        this.consumerAvailabilitySemaphore.release();
        logger.trace("Consumer {} checked in.", muleConsumer.getId());
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public boolean isValid() {
        return this.isValid.get();
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public void invalidate() {
        this.isValid.set(false);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            Set<MuleConsumer> checkoutAll = checkoutAll(Duration.ofMillis(-1L));
            checkoutAll.forEach((v0) -> {
                IOUtils.closeQuietly(v0);
            });
            checkoutAll.forEach(this::checkIn);
            invalidate();
        } catch (ConsumerPoolClosedException e) {
            logger.debug("Could not close consumer pool, it is already closed");
            throw new MuleRuntimeException(e);
        }
    }

    private Optional<MuleConsumer> checkOutConsumer(Optional<Predicate<MuleConsumer>> optional, Duration duration) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> empty = Optional.empty();
        logger.debug("checkOutConsumer: Trying to checkout specific consumer from pool.");
        boolean z = false;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            z = acquireSemaphore(this.consumerAvailabilitySemaphore, duration, true);
            int i = 0;
            while (true) {
                if (i >= this.consumerIndexes.size()) {
                    break;
                }
                Map.Entry<MuleConsumer, Semaphore> entry = this.consumerIndexes.get(i);
                MuleConsumer key = entry.getKey();
                Semaphore value = entry.getValue();
                boolean acquireSemaphore = acquireSemaphore(value, duration, true);
                if (optional.isPresent() && !optional.get().test(key)) {
                    value.release();
                    acquireSemaphore = false;
                }
                if (acquireSemaphore) {
                    empty = Optional.of(key);
                    break;
                }
                i++;
            }
            logger.debug("checkOutConsumer: WHILE LOOP took : {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            checkPoolIsValid(empty);
            logger.debug("checkOutConsumer returning");
            return empty;
        } catch (InterruptedException e) {
            if (z) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw new OperationInterruptedException(e);
        } catch (RuntimeException e2) {
            if (z) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw e2;
        }
    }

    private Optional<MuleConsumer> checkOut(boolean z, Duration duration) throws ConsumerPoolClosedException {
        Optional<MuleConsumer> empty = Optional.empty();
        logger.debug("checkOut: Trying to checkout consumer from pool");
        boolean z2 = false;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            z2 = acquireSemaphore(this.consumerAvailabilitySemaphore, duration, true);
            while (!empty.isPresent() && this.isValid.get()) {
                Map.Entry<MuleConsumer, Semaphore> entry = this.consumerIndexes.get(this.currentPosition.getAndUpdate(i -> {
                    return (i + 1) % this.consumerMap.size();
                }));
                MuleConsumer key = entry.getKey();
                Semaphore value = entry.getValue();
                if (((z && entry.getValue().availablePermits() == 0) ? false : true) && acquireSemaphore(value, duration, false)) {
                    empty = Optional.of(key);
                }
            }
            logger.debug("WHILE LOOP took : {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            logger.debug("checkOut returning");
            checkPoolIsValid(empty);
            return empty;
        } catch (InterruptedException e) {
            if (z2) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw new OperationInterruptedException(e);
        }
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public void checkPoolIsValid(Set<MuleConsumer> set) throws ConsumerPoolClosedException {
        if (isValid()) {
            return;
        }
        set.forEach(this::checkIn);
        throw new ConsumerPoolClosedException();
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool
    public void checkPoolIsValid(Optional<MuleConsumer> optional) throws ConsumerPoolClosedException {
        if (isValid()) {
            if (optional.isPresent()) {
                return;
            }
            this.consumerAvailabilitySemaphore.release();
        } else {
            optional.ifPresent(this::checkIn);
            if (!optional.isPresent()) {
                this.consumerAvailabilitySemaphore.release();
            }
            throw new ConsumerPoolClosedException();
        }
    }

    private boolean acquireSemaphore(Semaphore semaphore, Duration duration, boolean z) throws InterruptedException {
        if (duration.isNegative() || duration.isZero()) {
            return z ? acquire(semaphore) : semaphore.tryAcquire();
        }
        if (semaphore.tryAcquire(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            return true;
        }
        throw new OperationTimeoutException(duration.toMillis());
    }

    private boolean acquire(Semaphore semaphore) throws InterruptedException {
        semaphore.acquire();
        return true;
    }
}
