package org.mule.extension.redis.internal.source;

import java.util.List;
import java.util.function.Consumer;
import org.mule.extension.redis.internal.connection.RedisConnection;
import org.mule.extension.redis.internal.error.RedisErrorType;
import org.mule.extension.redis.internal.error.exceptions.RedisConnectionException;
import org.mule.extension.redis.internal.error.exceptions.UnableToUnsubscribeException;
import org.mule.extension.redis.internal.operation.SubscribeChannelAttributes;
import org.mule.extension.redis.internal.service.ChannelSubscription;
import org.mule.extension.redis.internal.service.RedisClientAdapter;
import org.mule.extension.redis.internal.service.dto.MessageDTO;
import org.mule.extension.redis.internal.service.factory.RedisClientAdapterFactory;
import org.mule.extension.redis.internal.service.factory.ServiceFactory;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Streaming;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;

@Streaming
@MediaType(value = "*/*", strict = false)
@Alias("subscribe")
@EmitsResponse
/* loaded from: input_file:org/mule/extension/redis/internal/source/SubscribeChannelSource.class */
public class SubscribeChannelSource extends Source<String, SubscribeChannelAttributes> {
    private static final Logger logger = LoggerFactory.getLogger(SubscribeChannelSource.class);

    @Parameter
    private List<String> channels;

    @Connection
    private ConnectionProvider<RedisConnection> provider;
    private ServiceFactory serviceFactory = new ServiceFactory();
    private RedisClientAdapterFactory clientAdapterFactory = new RedisClientAdapterFactory();
    private ChannelSubscription channelSubscription;
    private RedisClientAdapter redisClientAdapter;

    public void onStart(SourceCallback<String, SubscribeChannelAttributes> sourceCallback) throws MuleException {
        this.redisClientAdapter = this.clientAdapterFactory.getRedisClientAdapter((RedisConnection) this.provider.connect());
        this.channelSubscription = this.serviceFactory.getMessagingAPIService(this.redisClientAdapter).subscribeToChannel(messageConsumer(sourceCallback), messageErrorHandler(sourceCallback), this.channels);
    }

    private Consumer<MessageDTO> messageConsumer(SourceCallback<String, SubscribeChannelAttributes> sourceCallback) {
        return messageDTO -> {
            SubscribeChannelAttributes subscribeChannelAttributes = new SubscribeChannelAttributes();
            subscribeChannelAttributes.setChannel(messageDTO.getChannel());
            sourceCallback.handle(Result.builder().output(messageDTO.getPayload()).attributes(subscribeChannelAttributes).build());
        };
    }

    private Consumer<RuntimeException> messageErrorHandler(SourceCallback<String, SubscribeChannelAttributes> sourceCallback) {
        return runtimeException -> {
            if (runtimeException instanceof RedisConnectionException) {
                sourceCallback.onConnectionException(new ConnectionException("Something nasty has happened with the connection to server.", runtimeException));
            } else if (runtimeException instanceof UnableToUnsubscribeException) {
                logger.warn("Unable to unsubscribe from channels.", runtimeException);
            } else {
                logger.warn("Unknown error. Please try again and if the problem persists then contact support team.", runtimeException);
            }
        };
    }

    public void onStop() {
        try {
            this.channelSubscription.stop();
        } finally {
            closeClientAdapter();
        }
    }

    private void closeClientAdapter() {
        try {
            this.redisClientAdapter.close();
        } catch (JedisException e) {
            throw new ModuleException("Unknown error while trying to close connection.", RedisErrorType.UNKNOWN, e);
        } catch (JedisConnectionException e2) {
            throw new ModuleException("Unable to close connection.", RedisErrorType.UNKNOWN, e2);
        }
    }

    public void setChannels(List<String> list) {
        this.channels = list;
    }
}
