package org.mule.extension.async.apikit.internal.protocols.kafka;

import amf.apicontract.client.platform.AMFElementClient;
import amf.apicontract.client.platform.model.domain.api.AsyncApi;
import com.mulesoft.connectors.kafka.api.KafkaRecordAttributes;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import org.mule.extension.async.apikit.api.attributes.AsyncMessageAttributes;
import org.mule.extension.async.apikit.internal.bindings.AsyncBinding;
import org.mule.extension.async.apikit.internal.exception.AsyncApiRoutingException;
import org.mule.extension.async.apikit.internal.execution.ChannelBasedRegistry;
import org.mule.extension.async.apikit.internal.execution.SourceCallbackRegistry;
import org.mule.extension.async.apikit.internal.protocols.MessageListenerHandler;
import org.mule.extension.async.apikit.internal.protocols.ProtocolHandler;
import org.mule.runtime.extension.api.client.source.SourceParameterizer;
import org.mule.runtime.extension.api.runtime.operation.Result;

/* loaded from: input_file:org/mule/extension/async/apikit/internal/protocols/kafka/KafkaMessageListenerHandler.class */
public class KafkaMessageListenerHandler extends MessageListenerHandler<InputStream, KafkaRecordAttributes> {
    protected String consumerConfigRef;

    public KafkaMessageListenerHandler(AsyncApi asyncApi, SourceCallbackRegistry sourceCallbackRegistry, ChannelBasedRegistry channelBasedRegistry, AMFElementClient aMFElementClient, String str, String str2, List<AsyncBinding> list) {
        super(asyncApi, sourceCallbackRegistry, channelBasedRegistry, aMFElementClient, str, null, list);
        this.consumerConfigRef = str2;
    }

    @Override // org.mule.extension.async.apikit.internal.protocols.MessageListenerHandler
    public String getSourceListenerName() {
        return "message-listener";
    }

    @Override // org.mule.extension.async.apikit.internal.protocols.MessageListenerHandler
    public void configureSourceListener(SourceParameterizer sourceParameterizer) {
        sourceParameterizer.withConfigRef(this.consumerConfigRef);
        Iterator<AsyncBinding> it = this.asyncBindings.iterator();
        while (it.hasNext()) {
            it.next().applyBindings(sourceParameterizer);
        }
    }

    @Override // org.mule.extension.async.apikit.internal.protocols.MessageListenerHandler
    protected AsyncMessageAttributes buildResultAttributes(Result<InputStream, KafkaRecordAttributes> result, String str) {
        KafkaRecordAttributes kafkaRecordAttributes = (KafkaRecordAttributes) result.getAttributes().orElseThrow(() -> {
            return new AsyncApiRoutingException("Unable to get attributes from Kafka message");
        });
        return KafkaMessageAttributesBuilder.builder().channelName(kafkaRecordAttributes.getTopic()).serverName(this.serverName).protocol(ProtocolHandler.Protocol.KAFKA.name()).partition(kafkaRecordAttributes.getPartition()).headers(kafkaRecordAttributes.getHeaders()).key(kafkaRecordAttributes.getKey()).offset(kafkaRecordAttributes.getOffset()).creationTimestamp(kafkaRecordAttributes.getCreationTimestamp()).logAppendTimestamp(kafkaRecordAttributes.getLogAppendTimestamp()).leaderEpoch(kafkaRecordAttributes.getLeaderEpoch()).build();
    }

    @Override // org.mule.extension.async.apikit.internal.protocols.MessageListenerHandler
    protected InputStream buildResultOutput(Result<InputStream, KafkaRecordAttributes> result) {
        return (InputStream) result.getOutput();
    }

    @Override // org.mule.extension.async.apikit.internal.protocols.MessageListenerHandler
    protected String getRuntimeChannelName(Result<InputStream, KafkaRecordAttributes> result) {
        return (String) result.getAttributes().map((v0) -> {
            return v0.getTopic();
        }).orElseThrow(() -> {
            return new AsyncApiRoutingException("Unable to find topic name from Kafka message");
        });
    }
}
