package com.mulesoft.connectors.kafka.internal.model.consumer;

import com.mulesoft.connectors.kafka.api.source.TopicPartition;
import com.mulesoft.connectors.kafka.internal.error.KafkaErrorType;
import com.mulesoft.connectors.kafka.internal.error.exception.CommitFailedException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidInputException;
import com.mulesoft.connectors.kafka.internal.error.exception.NoPollException;
import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.model.TopicPartitionDescription;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.mule.runtime.core.api.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/kafka/internal/model/consumer/DefaultMuleConsumer.class */
public class DefaultMuleConsumer implements MuleConsumer {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMuleConsumer.class);
    private final Properties properties;
    private final long maxPollTimeout;
    private Consumer<InputStream, InputStream> consumer;
    private List<ConsumerRecord<InputStream, InputStream>> inFlightRecords;
    private DefaultConsumerPool pool;
    private final UUID id = UUID.randomUUID();
    private long lastPollTime = 0;
    private List<ConsumerRecord<InputStream, InputStream>> bufRecords = Collections.synchronizedList(new ArrayList());
    private Semaphore commitSemaphore = new Semaphore(1);

    public DefaultMuleConsumer(Function<Properties, Consumer<InputStream, InputStream>> function, Properties properties) {
        this.properties = properties;
        this.maxPollTimeout = Long.parseLong((String) properties.get("max.poll.interval.ms"));
        this.consumer = function.apply(getProperties());
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public void setPool(DefaultConsumerPool defaultConsumerPool) {
        this.pool = defaultConsumerPool;
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public UUID getId() {
        return this.id;
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public void assign(List<TopicPartition> list) {
        this.consumer.unsubscribe();
        this.consumer.assign((Collection) list.stream().map(topicPartition -> {
            return new org.apache.kafka.common.TopicPartition(topicPartition.getTopic(), topicPartition.getPartition());
        }).collect(Collectors.toList()));
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public void subscribe(List<String> list) {
        this.consumer.unsubscribe();
        try {
            this.consumer.subscribe(Pattern.compile((String) list.stream().peek(str -> {
                logger.info("Subscribing to topic pattern '{}'.", str);
            }).map(Pattern::compile).map((v0) -> {
                return v0.pattern();
            }).collect(Collectors.joining("|"))), new ConsumerRebalanceListener() { // from class: com.mulesoft.connectors.kafka.internal.model.consumer.DefaultMuleConsumer.1
                public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> collection) {
                    DefaultMuleConsumer.logger.warn("Revoked topic-partitions: {}", collection);
                }

                public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> collection) {
                    DefaultMuleConsumer.logger.warn("Assigned topic-partitions: {}", collection);
                }
            });
        } catch (PatternSyntaxException e) {
            throw new InvalidInputException("The subscription patterns are invalid", e);
        }
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public Set<TopicPartition> assignment() {
        return (Set) this.consumer.assignment().stream().map(topicPartition -> {
            return new TopicPartition(topicPartition.topic(), topicPartition.partition());
        }).peek(topicPartition2 -> {
            logger.trace("Found description: {}", topicPartition2);
        }).collect(Collectors.toSet());
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public Set<TopicPartitionDescription> assignmentDescription() {
        return (Set) this.consumer.assignment().stream().map(topicPartition -> {
            return new TopicPartitionDescription(topicPartition.topic(), topicPartition.partition(), this.consumer.position(topicPartition), ((Long) this.consumer.beginningOffsets(Collections.singletonList(topicPartition)).get(topicPartition)).longValue(), ((Long) this.consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition)).longValue());
        }).peek(topicPartitionDescription -> {
            logger.trace("Found description: {}", topicPartitionDescription);
        }).collect(Collectors.toSet());
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public void seek(String str, int i, long j) {
        this.consumer.seek(new org.apache.kafka.common.TopicPartition(str, i), j);
        List list = (List) this.bufRecords.stream().filter(consumerRecord -> {
            return (consumerRecord.topic().equals(str) && consumerRecord.partition() == i) ? false : true;
        }).collect(Collectors.toList());
        this.bufRecords.clear();
        this.bufRecords.addAll(list);
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public List<ConsumerRecord<InputStream, InputStream>> poll(Duration duration) {
        this.inFlightRecords = (List) getRecords(duration).peek(this::logRecord).collect(Collectors.toList());
        if (this.inFlightRecords.isEmpty()) {
            return null;
        }
        return this.inFlightRecords;
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public ConsumerRecord<InputStream, InputStream> singleElementPoll(Duration duration) {
        this.inFlightRecords = (List) Stream.of(getRecords(duration).findFirst().orElseThrow(NotFoundException::new)).peek(this::logRecord).collect(Collectors.toList());
        if (this.inFlightRecords.isEmpty()) {
            return null;
        }
        return this.inFlightRecords.get(0);
    }

    private void throwIfMultipleCommits(boolean z) {
        if (z) {
            if (logger.isDebugEnabled()) {
                logger.debug("Failed to commit because the commit operation is already made by another thread.");
            }
            throw new CommitFailedException("Failed to commit because the commit operation is already made by another thread.", KafkaErrorType.ALREADY_COMMITED);
        }
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public void commit() {
        try {
            try {
                throwIfMultipleCommits(!this.commitSemaphore.tryAcquire());
                HashMap hashMap = new HashMap();
                validatePreviousPollReturnedResults();
                this.inFlightRecords.forEach(consumerRecord -> {
                });
                this.consumer.commitSync(hashMap);
                throwIfMultipleCommits(!this.bufRecords.removeAll(this.inFlightRecords));
                this.commitSemaphore.release();
            } catch (KafkaException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Commit failed, marking pool as invalid. Exception was {}.", e.getMessage());
                }
                this.bufRecords.clear();
                this.inFlightRecords.clear();
                this.pool.invalidate();
                throw e;
            }
        } catch (Throwable th) {
            this.commitSemaphore.release();
            throw th;
        }
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public void asyncCommit() {
        HashMap hashMap = new HashMap();
        validatePreviousPollReturnedResults();
        this.inFlightRecords.forEach(consumerRecord -> {
        });
        commitAsync(hashMap);
        this.bufRecords.removeAll(this.inFlightRecords);
    }

    private void commitAsync(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> map) {
        this.consumer.commitAsync(map, (map2, exc) -> {
            if (exc != null) {
                if (exc instanceof RetriableException) {
                    logger.debug("A retriable exception happened during the execution. The next execution will re-try committing the processed messages.");
                    return;
                }
                logger.error("An unrecoverable exception happened during the asyncCommit.");
                this.bufRecords.clear();
                this.inFlightRecords.clear();
                this.pool.invalidate();
            }
        });
    }

    @Override // com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer
    public void resetBuffer() {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        this.inFlightRecords.forEach(consumerRecord -> {
            if (!this.bufRecords.contains(consumerRecord)) {
                arrayList.add(consumerRecord);
                return;
            }
            Map map = (Map) hashMap.get(consumerRecord.topic());
            if (map == null) {
                HashMap hashMap2 = new HashMap();
                hashMap2.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
                hashMap.put(consumerRecord.topic(), hashMap2);
            } else if (map.get(Integer.valueOf(consumerRecord.partition())) == null || ((Long) map.get(Integer.valueOf(consumerRecord.partition()))).longValue() > consumerRecord.offset()) {
                map.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
            }
        });
        this.inFlightRecords.removeAll(arrayList);
        hashMap.entrySet().forEach(entry -> {
            ((Map) entry.getValue()).entrySet().forEach(entry -> {
                seek((String) entry.getKey(), ((Integer) entry.getKey()).intValue(), ((Long) entry.getValue()).longValue());
            });
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.consumer.close();
    }

    private Stream<ConsumerRecord<InputStream, InputStream>> getRecords(Duration duration) {
        if ((this.maxPollTimeout < System.currentTimeMillis() - this.lastPollTime) || this.bufRecords.isEmpty()) {
            this.bufRecords.clear();
            logger.trace("Buffer empty. Retrieving records.");
            this.consumer.poll(duration).forEach(consumerRecord -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("Adding message from topic {} partition {} with offset {} to buffer.", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
                }
                this.bufRecords.add(new KafkaConsumerRecord(consumerRecord));
            });
            this.lastPollTime = System.currentTimeMillis();
            logger.trace("Retrieved {} records.", Integer.valueOf(this.bufRecords.size()));
            if (this.bufRecords.isEmpty()) {
                throw new NotFoundException("No message was found when executing the poll on KafkaConsumer");
            }
        }
        return this.bufRecords.stream();
    }

    protected void validatePreviousPollReturnedResults() {
        if (this.inFlightRecords == null || this.inFlightRecords.isEmpty()) {
            throw new NoPollException("There is no previous poll to commit");
        }
    }

    private void logRecord(ConsumerRecord<InputStream, InputStream> consumerRecord) {
        if (logger.isTraceEnabled()) {
            logger.trace("Message key: {}", Optional.ofNullable(consumerRecord.key()).map(IOUtils::toString).orElse(" no key"));
            logger.trace("Message value: {}", Optional.ofNullable(consumerRecord.value()).map(IOUtils::toString).orElse(" no value"));
            logger.trace("Message topic: {}", Optional.ofNullable(consumerRecord.topic()).orElse(" no topic"));
            logger.trace("Message partition: {}", Optional.ofNullable(Integer.valueOf(consumerRecord.partition())).map((v0) -> {
                return String.valueOf(v0);
            }).orElse(" no partition"));
            logger.trace("Message offset: {}", Optional.ofNullable(Long.valueOf(consumerRecord.offset())).map((v0) -> {
                return String.valueOf(v0);
            }).orElse(" no offset"));
            ((Iterable) Optional.ofNullable(consumerRecord.headers()).orElseGet(ArrayList::new)).forEach(header -> {
                String str = "";
                logger.trace("Message Header: '{}': '{}'", header.key(), new String((byte[]) Optional.ofNullable(header.value()).orElseGet(str::getBytes)));
            });
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.id, ((DefaultMuleConsumer) obj).id);
    }

    public int hashCode() {
        return Objects.hash(this.id);
    }

    private Properties getProperties() {
        return this.properties;
    }
}
