package org.mule.modules.kafka.internal.sources;

import java.io.Serializable;
import org.mule.modules.kafka.internal.connection.KafkaConsumerConnection;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Streaming;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;

@Streaming
@DisplayName("Consumer")
@MediaType("*/*")
@Alias("consumer")
@EmitsResponse
/* loaded from: input_file:org/mule/modules/kafka/internal/sources/KafkaSource.class */
public class KafkaSource extends Source<String, Serializable> {

    @Connection
    private ConnectionProvider<KafkaConsumerConnection> provider;
    private KafkaConsumerConnection connection;

    @DisplayName("Topic")
    @Parameter
    private String topic;

    public void onStart(SourceCallback<String, Serializable> sourceCallback) throws MuleException {
        this.connection = (KafkaConsumerConnection) this.provider.connect();
        this.connection.getMuleConsumer().run(sourceCallback, this.topic);
    }

    public void onStop() {
        this.provider.disconnect(this.connection);
    }
}
