package com.mulesoft.connectors.mqtt3.internal.connection;

import com.mulesoft.connectors.mqtt3.api.Topic;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3ConnectionExceptionResolver;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3Error;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3InvalidTopicException;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3PersistenceException;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3PublishException;
import com.mulesoft.connectors.mqtt3.internal.routing.DefaultMQTT3Message;
import com.mulesoft.connectors.mqtt3.internal.routing.LWTMessage;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3MessageHandler;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3TopicRouter;
import com.mulesoft.connectors.mqtt3.internal.source.MQTT3ConnectionLostHandler;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.tls.TlsContextFactory;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/mqtt3/internal/connection/DefaultMQTT3Connection.class */
public class DefaultMQTT3Connection implements MQTT3Connection {
    private static final String ROOT_TOPIC = "#";
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMQTT3Connection.class);
    private static final int COMPLETION_WAIT_TIMEOUT_MILLIS = 10000;
    private final MqttConnectOptions mqttConnectOptions;
    private MQTT3TopicRouter topicRouter;
    private MqttAsyncClient mqttClient;
    private IMqttToken mqttToken;
    private AtomicBoolean notifiedReconnect;
    private int connectionTimeoutMultiplier;

    public DefaultMQTT3Connection(String str, String str2, MQTT3ConnectionOptions mQTT3ConnectionOptions, MQTT3FilePersistenceOptions mQTT3FilePersistenceOptions, LWTMessage lWTMessage) throws ConnectionException {
        this.mqttConnectOptions = new MqttConnectOptions();
        this.notifiedReconnect = new AtomicBoolean(false);
        this.connectionTimeoutMultiplier = 1;
        this.topicRouter = new MQTT3TopicRouter((str3, str4) -> {
            return MqttTopic.isMatched(str3, str4);
        });
        try {
            if (!mQTT3FilePersistenceOptions.getEnableFilePersistence()) {
                this.mqttClient = new MqttAsyncClient(str, str2, new MemoryPersistence());
            } else if (mQTT3FilePersistenceOptions.getDataStorePath() == null || mQTT3FilePersistenceOptions.getDataStorePath().isEmpty()) {
                this.mqttClient = new MqttAsyncClient(str, str2, new MqttDefaultFilePersistence());
            } else {
                this.mqttClient = new MqttAsyncClient(str, str2, new MqttDefaultFilePersistence(mQTT3FilePersistenceOptions.getDataStorePath()));
            }
            long convert = TimeUnit.SECONDS.convert(mQTT3ConnectionOptions.getKeepAliveInterval(), mQTT3ConnectionOptions.getKeepAliveIntervalUnit());
            this.mqttConnectOptions.setConnectionTimeout((int) TimeUnit.SECONDS.convert(mQTT3ConnectionOptions.getConnectionTimeout(), mQTT3ConnectionOptions.getConnectionTimeoutUnit()));
            this.mqttConnectOptions.setCleanSession(mQTT3ConnectionOptions.getCleanSession());
            this.mqttConnectOptions.setKeepAliveInterval((int) convert);
            this.mqttConnectOptions.setMaxInflight(mQTT3ConnectionOptions.getMaxInFlight());
            setLastWillAndTestamentMessage(lWTMessage);
            DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
            disconnectedBufferOptions.setBufferEnabled(true);
            this.mqttClient.setBufferOpts(disconnectedBufferOptions);
        } catch (MqttException e) {
            LOGGER.error("Failed to initialize mqttConnection, check that your connection parameters are correct." + e.getMessage(), e);
            throw new ConnectionException(e, this);
        }
    }

    private DefaultMQTT3Connection() {
        this.mqttConnectOptions = new MqttConnectOptions();
        this.notifiedReconnect = new AtomicBoolean(false);
        this.connectionTimeoutMultiplier = 1;
    }

    public boolean isCleanSessionEnabled() {
        return this.mqttConnectOptions.isCleanSession();
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public void setUsernamePassword(String str, String str2) {
        this.mqttConnectOptions.setUserName(str);
        if (str2 != null) {
            this.mqttConnectOptions.setPassword(str2.toCharArray());
        }
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public void setFailOverServers(String[] strArr) {
        this.mqttConnectOptions.setServerURIs(strArr);
        this.connectionTimeoutMultiplier = strArr.length;
    }

    public void connect() throws ConnectionException {
        final Reference reference = new Reference();
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.mqttToken = this.mqttClient.connect(this.mqttConnectOptions, (Object) null, new IMqttActionListener() { // from class: com.mulesoft.connectors.mqtt3.internal.connection.DefaultMQTT3Connection.1
                public void onSuccess(IMqttToken iMqttToken) {
                    reference.set((Object) null);
                    DefaultMQTT3Connection.LOGGER.debug("Successfully connected to " + DefaultMQTT3Connection.this.mqttClient.getCurrentServerURI());
                    countDownLatch.countDown();
                }

                public void onFailure(IMqttToken iMqttToken, Throwable th) {
                    DefaultMQTT3Connection.LOGGER.error("Error occurred establishing connection to " + DefaultMQTT3Connection.this.mqttClient.getCurrentServerURI() + ":" + th.getMessage(), th);
                    reference.set(th);
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await(this.mqttConnectOptions.getConnectionTimeout() * this.connectionTimeoutMultiplier, TimeUnit.SECONDS);
        } catch (InterruptedException | MqttException e) {
            if (this.notifiedReconnect.compareAndSet(false, true)) {
                LOGGER.error("Error occurred attempting to establish connection to mqtt broker " + e, e);
                throw new ConnectionException(e, this);
            }
        } catch (MqttSecurityException e2) {
            throw new ModuleException("Error connecting to mqtt broker: not authorized to connect", MQTT3Error.UNAUTHORIZED, e2);
        }
        Throwable th = (Throwable) reference.get();
        if (th != null) {
            if (isConnected() || !MQTT3ConnectionExceptionResolver.resolveMQTT3ConnectionException(th, this).isPresent()) {
                if (!(th instanceof MqttSecurityException)) {
                    throw new MuleRuntimeException(th);
                }
                throw new ModuleException("Error connecting to mqtt broker: not authorized to connect", MQTT3Error.UNAUTHORIZED, th);
            }
            if (this.notifiedReconnect.compareAndSet(false, true)) {
                LOGGER.error("Error occurred attempting to establish connection to mqtt broker " + reference.get(), th);
                throw MQTT3ConnectionExceptionResolver.resolveMQTT3ConnectionException(th, this).get();
            }
        }
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public void subscribeListenerToTopics(List<Topic> list, MQTT3MessageHandler mQTT3MessageHandler) throws ConnectionException {
        try {
            List<Topic> registerCallbackForTopics = this.topicRouter.registerCallbackForTopics(list, mQTT3MessageHandler);
            String[] strArr = (String[]) registerCallbackForTopics.stream().map((v0) -> {
                return v0.getTopicFilter();
            }).toArray(i -> {
                return new String[i];
            });
            int[] array = registerCallbackForTopics.stream().map((v0) -> {
                return v0.getQos();
            }).map((v0) -> {
                return v0.getValue();
            }).mapToInt((v0) -> {
                return v0.intValue();
            }).toArray();
            LOGGER.debug("Subscribing to topics: {}, with QOS {}", strArr, array);
            if (list.stream().anyMatch(topic -> {
                return topic.getTopicFilter().equals(ROOT_TOPIC);
            })) {
                LOGGER.warn("Issuing subscription request for the root topic #. This is not advisable, you will receive all messages issued to all topics.");
            }
            subscribe(strArr, array);
        } catch (Exception e) {
            LOGGER.error("Exception occurred during subscription to topics " + list, e);
            throw e;
        }
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public void unsubscribeListenerFromTopics(List<Topic> list, MQTT3MessageHandler mQTT3MessageHandler) {
        try {
            List<Topic> deregisterCallbackForTopics = this.topicRouter.deregisterCallbackForTopics(list, mQTT3MessageHandler);
            if (isCleanSessionEnabled() && !deregisterCallbackForTopics.isEmpty()) {
                String[] strArr = (String[]) deregisterCallbackForTopics.stream().map((v0) -> {
                    return v0.getTopicFilter();
                }).toArray(i -> {
                    return new String[i];
                });
                try {
                    LOGGER.debug("Unsubscribing from topics: {}", strArr);
                    this.mqttClient.unsubscribe(strArr);
                } catch (MqttException e) {
                    LOGGER.error("Error occurred unsubscribing from topics {}: {}", strArr, e);
                }
            }
        } catch (Exception e2) {
            LOGGER.error("Error unsubscribing callbacks for topics " + list);
        }
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public void setConnectionLostHandler(final MQTT3ConnectionLostHandler mQTT3ConnectionLostHandler) {
        this.mqttClient.setCallback(new MqttCallbackExtended() { // from class: com.mulesoft.connectors.mqtt3.internal.connection.DefaultMQTT3Connection.2
            public void connectComplete(boolean z, String str) {
                DefaultMQTT3Connection.this.notifiedReconnect.set(false);
                if (z && DefaultMQTT3Connection.this.mqttConnectOptions.isCleanSession()) {
                    List<Topic> distinctTopicFilters = DefaultMQTT3Connection.this.topicRouter.getDistinctTopicFilters();
                    if (distinctTopicFilters.isEmpty()) {
                        return;
                    }
                    String[] strArr = new String[distinctTopicFilters.size()];
                    int[] iArr = new int[distinctTopicFilters.size()];
                    for (int i = 0; i < distinctTopicFilters.size(); i++) {
                        strArr[i] = distinctTopicFilters.get(i).getTopicFilter();
                        iArr[i] = distinctTopicFilters.get(i).getQos().getValue();
                    }
                    if (DefaultMQTT3Connection.LOGGER.isDebugEnabled()) {
                        DefaultMQTT3Connection.LOGGER.debug("Reconnect to {} complete.", str);
                        DefaultMQTT3Connection.LOGGER.debug("Recovering subscriptions to topics {}", strArr);
                    }
                    try {
                        DefaultMQTT3Connection.this.mqttClient.subscribe(strArr, iArr, (Object) null, new MQTT3SubscriptionSuccessListener()).waitForCompletion(10000L);
                    } catch (MqttException e) {
                        DefaultMQTT3Connection.LOGGER.error("Re-subscribe after reconnection failed for " + str + " with error " + e);
                    }
                }
            }

            public void connectionLost(Throwable th) {
                if (DefaultMQTT3Connection.this.notifiedReconnect.compareAndSet(false, true)) {
                    mQTT3ConnectionLostHandler.onConnectionLost(th);
                }
            }

            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                DefaultMQTT3Connection.this.topicRouter.handleMessageArrived(new DefaultMQTT3Message(mqttMessage.getId(), str, mqttMessage.getPayload(), mqttMessage.getQos(), mqttMessage.isDuplicate(), mqttMessage.isRetained()));
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        });
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public boolean isConnected() {
        return this.mqttClient.isConnected();
    }

    private void close() {
        try {
            this.mqttClient.close(true);
        } catch (MqttException e) {
            LOGGER.error("Error occurred while attempting to close connection.", e);
        }
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public CompletableFuture<Integer> publish(String str, byte[] bArr, int i, boolean z) throws MQTT3PublishException, ConnectionException {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        try {
            this.mqttClient.publish(str, bArr, i, z, (Object) null, new MQTT3PublishActionListener(completableFuture, this));
            return completableFuture;
        } catch (IllegalArgumentException e) {
            LOGGER.error("IllegalArgumentException found performing publish operation: " + e.getMessage());
            throw new MQTT3InvalidTopicException(e);
        } catch (MqttPersistenceException e2) {
            LOGGER.error("MqttPersistenceException found performing publish operation: " + e2.getMessage());
            throw new MQTT3PersistenceException(e2);
        } catch (Throwable th) {
            Optional<ConnectionException> resolveMQTT3ConnectionException = MQTT3ConnectionExceptionResolver.resolveMQTT3ConnectionException(th, this);
            if (resolveMQTT3ConnectionException.isPresent()) {
                LOGGER.error("MqttConnectionException found performing publish operation: " + th.getMessage());
                throw resolveMQTT3ConnectionException.get();
            }
            LOGGER.error("MqttException found performing publish operation: " + th.getMessage());
            throw new MQTT3PublishException(th);
        }
    }

    private void subscribe(String[] strArr, int[] iArr) throws ConnectionException {
        try {
            this.mqttToken = this.mqttClient.subscribe(strArr, iArr, (Object) null, new MQTT3SubscriptionSuccessListener());
        } catch (MqttException e) {
            LOGGER.error("Subscription failed for topics " + strArr + " with error:" + e.getMessage(), e);
            throw new ConnectionException(e, this);
        }
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public boolean isSessionPresent() {
        return this.mqttToken.getSessionPresent();
    }

    private void setLastWillAndTestamentMessage(LWTMessage lWTMessage) {
        if (lWTMessage.getBody() == null || lWTMessage.getTopic() == null) {
            return;
        }
        this.mqttConnectOptions.setWill(lWTMessage.getTopic(), lWTMessage.getBody().getBytes(), lWTMessage.getQoS().getValue(), lWTMessage.isRetained());
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public void setTLSOptions(TlsContextFactory tlsContextFactory) throws ConnectionException {
        try {
            this.mqttConnectOptions.setSocketFactory(tlsContextFactory.createSocketFactory());
        } catch (KeyManagementException | NoSuchAlgorithmException e) {
            throw new ConnectionException(e, this);
        }
    }

    @Override // com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection
    public void disconnect() {
        try {
            try {
                this.mqttClient.disconnect().waitForCompletion(10000L);
                close();
            } catch (MqttException e) {
                LOGGER.error(e.getMessage(), e);
                try {
                    LOGGER.error("Error occurred while attempting to disconnect client " + this.mqttClient.getClientId());
                    LOGGER.error(" Attempting to forcibly disconnect...");
                    this.mqttClient.disconnectForcibly(10000L);
                } catch (MqttException e2) {
                    LOGGER.error("Error occurred while attempting to forcibly disconnect client " + this.mqttClient.getClientId() + ": " + e2, e2);
                    close();
                }
                close();
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (!(obj instanceof DefaultMQTT3Connection)) {
            return false;
        }
        DefaultMQTT3Connection defaultMQTT3Connection = (DefaultMQTT3Connection) obj;
        return new EqualsBuilder().append(this.mqttClient, defaultMQTT3Connection.mqttClient).append(this.connectionTimeoutMultiplier, defaultMQTT3Connection.connectionTimeoutMultiplier).append(this.mqttConnectOptions, defaultMQTT3Connection.mqttConnectOptions).isEquals();
    }

    public int hashCode() {
        return Objects.hash(this.mqttClient, Integer.valueOf(this.connectionTimeoutMultiplier), this.mqttConnectOptions);
    }
}
