package org.mule.test.module.extension.reconnection;

import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mule.extension.test.extension.reconnection.FallibleReconnectableSource;
import org.mule.extension.test.extension.reconnection.NonReconnectableSource;
import org.mule.extension.test.extension.reconnection.ReconnectableConnection;
import org.mule.extension.test.extension.reconnection.ReconnectableConnectionProvider;
import org.mule.extension.test.extension.reconnection.ReconnectionOperations;
import org.mule.extension.test.extension.reconnection.SynchronizableConnection;
import org.mule.extension.test.extension.reconnection.SynchronizableSource;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.notification.ExceptionNotification;
import org.mule.runtime.api.notification.ExceptionNotificationListener;
import org.mule.runtime.api.streaming.object.CursorIterator;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.retry.policy.RetryPolicy;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.extension.api.error.MuleErrors;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.tck.probe.PollingProber;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;

/* loaded from: input_file:org/mule/test/module/extension/reconnection/ReconnectionTestCase.class */
public class ReconnectionTestCase extends AbstractExtensionFunctionalTestCase {
    private static final long TIMEOUT = 5000;
    private static final long POLL_DELAY = 500;
    private static List<CoreEvent> capturedEvents;

    /* loaded from: input_file:org/mule/test/module/extension/reconnection/ReconnectionTestCase$CaptureProcessor.class */
    public static class CaptureProcessor implements Processor {
        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            synchronized (ReconnectionTestCase.capturedEvents) {
                ReconnectionTestCase.capturedEvents.add(coreEvent);
            }
            return coreEvent;
        }
    }

    public static void resetCounters() {
        ReconnectionOperations.closePagingProviderCalls = 0;
        ReconnectionOperations.getPageCalls = 0;
        ReconnectableConnectionProvider.disconnectCalls = 0;
        SynchronizableSource.first = true;
        SynchronizableConnection.disconnectionWaitedFullTimeout = false;
    }

    protected String getConfigFile() {
        return "reconnection-config.xml";
    }

    protected void doSetUp() throws Exception {
        capturedEvents = new LinkedList();
        ReconnectableConnectionProvider.fail = false;
        FallibleReconnectableSource.fail = false;
        NonReconnectableSource.fail = false;
    }

    protected void doTearDown() throws Exception {
        capturedEvents = null;
        ReconnectableConnectionProvider.fail = false;
        FallibleReconnectableSource.fail = false;
        NonReconnectableSource.fail = false;
    }

    @Test
    public void reconnectSource() throws Exception {
        getFlowConstruct("reconnectForever").start();
        PollingProber.check(TIMEOUT, 1000L, () -> {
            return Boolean.valueOf(!capturedEvents.isEmpty());
        });
        switchConnection();
        PollingProber.check(10000L, 1000L, () -> {
            Boolean valueOf;
            synchronized (capturedEvents) {
                valueOf = Boolean.valueOf(capturedEvents.stream().map(coreEvent -> {
                    return (ReconnectableConnection) coreEvent.getMessage().getPayload().getValue();
                }).filter(reconnectableConnection -> {
                    return reconnectableConnection.getReconnectionAttempts() >= 3;
                }).findAny().isPresent());
            }
            return valueOf;
        });
    }

    @Test
    public void sendNotificationOnReconnection() throws Exception {
        Latch latch = new Latch();
        ArrayList arrayList = new ArrayList();
        ExceptionNotificationListener exceptionNotificationListener = exceptionNotification -> {
            arrayList.add(exceptionNotification);
            latch.release();
        };
        this.notificationListenerRegistry.registerListener(exceptionNotificationListener);
        try {
            reconnectSource();
            latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
            Assert.assertThat(Integer.valueOf(arrayList.size()), Matchers.greaterThanOrEqualTo(1));
            Assert.assertThat(((ExceptionNotification) arrayList.get(0)).getInfo(), CoreMatchers.notNullValue());
            Assert.assertThat(((ExceptionNotification) arrayList.get(0)).getInfo().getException(), CoreMatchers.notNullValue());
            Assert.assertThat(((ExceptionNotification) arrayList.get(0)).getInfo().getException(), Matchers.instanceOf(ConnectionException.class));
            this.notificationListenerRegistry.unregisterListener(exceptionNotificationListener);
        } catch (Throwable th) {
            this.notificationListenerRegistry.unregisterListener(exceptionNotificationListener);
            throw th;
        }
    }

    @Test
    public void noReconnectSource() throws Exception {
        getFlowConstruct("noReconnect").start();
        PollingProber.check(TIMEOUT, POLL_DELAY, () -> {
            return Boolean.valueOf(!capturedEvents.isEmpty());
        });
        NonReconnectableSource.fail = true;
        PollingProber.check(TIMEOUT, POLL_DELAY, () -> {
            return Boolean.valueOf(NonReconnectableSource.attempts.get() > 0);
        });
        clear(capturedEvents);
        PollingProber.checkNot(TIMEOUT, POLL_DELAY, () -> {
            return Boolean.valueOf(size(capturedEvents) > 0);
        });
    }

    @Test
    public void doNotStartSourceTwiceAfterExceptionOnReconnection() throws Exception {
        getFlowConstruct("reconnectAfterFailure").start();
        PollingProber.check(TIMEOUT, POLL_DELAY, () -> {
            return Boolean.valueOf(!capturedEvents.isEmpty());
        });
        FallibleReconnectableSource.fail = true;
        PollingProber.checkNot(TIMEOUT, POLL_DELAY, () -> {
            return Boolean.valueOf(FallibleReconnectableSource.simultaneouslyStartedSources);
        });
        FallibleReconnectableSource.release();
        PollingProber.checkNot(TIMEOUT, POLL_DELAY, () -> {
            return Boolean.valueOf(FallibleReconnectableSource.simultaneouslyStartedSources);
        });
    }

    @Test
    public void getInlineRetryPolicyTemplate() throws Exception {
        assertRetryTemplate((RetryPolicyTemplate) flowRunner("getInlineReconnection").run().getMessage().getPayload().getValue(), true, 30, 50L);
    }

    @Test
    public void getInlineRetryPolicyBlockingTemplate() throws Exception {
        assertRetryTemplate((RetryPolicyTemplate) flowRunner("getInlineReconnectionBlocking").run().getMessage().getPayload().getValue(), false, 30, 50L);
    }

    @Test
    public void reconnectAfterConnectionExceptionOnFirstPage() throws Exception {
        resetCounters();
        getCursor("pagedOperation", 1, MuleErrors.CONNECTIVITY).next();
        Assert.assertThat("Connection was not disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(1));
        Assert.assertThat("Paging provider was not closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(1));
    }

    @Test(expected = IllegalArgumentException.class)
    public void doNotReconnectAfterOtherExceptionOnFirstPage() throws Throwable {
        resetCounters();
        try {
            getCursor("pagedOperation", 1, MuleErrors.VALIDATION).next();
        } catch (Exception e) {
            Assert.assertThat(e.getCause(), Matchers.instanceOf(IllegalArgumentException.class));
            Assert.assertThat(e.getMessage(), CoreMatchers.is("An illegal argument was received."));
            Assert.assertThat("Paging provider was not closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(1));
            Assert.assertThat("Connection was disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(0));
            throw e.getCause();
        }
    }

    @Test
    public void reconnectionDuringConnectionExceptionOnSecondPage() throws Exception {
        resetCounters();
        CursorIterator cursor = getCursor("pagedOperation", 2, MuleErrors.CONNECTIVITY);
        cursor.next();
        Assert.assertThat("Connection was disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(0));
        Assert.assertThat("Paging provider was closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(0));
        cursor.next();
        Assert.assertThat("Connection was not disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(1));
        Assert.assertThat("Paging provider was closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(0));
    }

    @Test(expected = IllegalArgumentException.class)
    public void doNotReconnectAfterOtherExceptionOnSecondPage() throws Exception {
        resetCounters();
        try {
            CursorIterator cursor = getCursor("pagedOperation", 2, MuleErrors.VALIDATION);
            cursor.next();
            cursor.next();
        } catch (Exception e) {
            Assert.assertThat(e, Matchers.instanceOf(IllegalArgumentException.class));
            Assert.assertThat(e.getMessage(), CoreMatchers.is("An illegal argument was received."));
            Assert.assertThat("Paging provider was not closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(0));
            Assert.assertThat("Connection was disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(0));
            throw e;
        }
    }

    @Test
    public void stickyConnectionIsClosedAndReconnectedDuringConnectionExceptionOnFirstPage() throws Exception {
        resetCounters();
        getCursor("stickyPagedOperation", 1, MuleErrors.CONNECTIVITY).next();
        Assert.assertThat("Connection was not disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(1));
        Assert.assertThat("Paging provider was not closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(1));
    }

    @Test(expected = ModuleException.class)
    public void stickyConnectionIsNotReconnectedDuringConnectionExceptionOnSecondPage() throws Exception {
        resetCounters();
        try {
            CursorIterator cursor = getCursor("stickyPagedOperation", 2, MuleErrors.CONNECTIVITY);
            cursor.next();
            cursor.next();
        } catch (Exception e) {
            Assert.assertThat(e, Matchers.instanceOf(ModuleException.class));
            Assert.assertThat(e.getCause(), Matchers.instanceOf(ConnectionException.class));
            Assert.assertThat(e.getCause().getMessage(), CoreMatchers.is("Failed to retrieve Page"));
            Assert.assertThat("Paging provider was not closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(0));
            Assert.assertThat("Connection was not disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(1));
            throw e;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void stickyConnectionIsNotReconnectedDuringOtherExceptionOnSecondPage() throws Exception {
        resetCounters();
        try {
            CursorIterator cursor = getCursor("stickyPagedOperation", 2, MuleErrors.VALIDATION);
            cursor.next();
            cursor.next();
        } catch (Exception e) {
            Assert.assertThat(e, Matchers.instanceOf(IllegalArgumentException.class));
            Assert.assertThat(e.getMessage(), CoreMatchers.is("An illegal argument was received."));
            Assert.assertThat("Paging provider was not closed.", Integer.valueOf(ReconnectionOperations.closePagingProviderCalls), CoreMatchers.is(0));
            Assert.assertThat("Connection was not disconnected.", Integer.valueOf(ReconnectableConnectionProvider.disconnectCalls), CoreMatchers.is(0));
            throw e;
        }
    }

    @Test
    public void connectionInvalidatedOnCallback() throws Exception {
        resetCounters();
        getFlowConstruct("synchronizableSource").start();
        PollingProber.check(TIMEOUT, POLL_DELAY, () -> {
            return Boolean.valueOf(SynchronizableConnection.disconnectionWaitedFullTimeout);
        });
    }

    @Test
    public void connectionInvalidatedAndRecreatedIfConnectionExceptionOnStart() throws Exception {
        getFlowConstruct("invalidateConnectionIfConnectionExceptionOnStart").start();
        PollingProber.check(10000L, 1000L, () -> {
            Boolean valueOf;
            synchronized (capturedEvents) {
                valueOf = Boolean.valueOf(capturedEvents.stream().flatMap(coreEvent -> {
                    return ((List) coreEvent.getMessage().getPayload().getValue()).stream();
                }).distinct().count() == 5);
            }
            return valueOf;
        });
    }

    protected void assertRetryTemplate(RetryPolicyTemplate retryPolicyTemplate, boolean z, int i, long j) throws Exception {
        Assert.assertThat(Boolean.valueOf(retryPolicyTemplate.isAsync()), CoreMatchers.is(Boolean.valueOf(z)));
        RetryPolicy createRetryInstance = retryPolicyTemplate.createRetryInstance();
        Assert.assertThat(ClassUtils.getFieldValue(createRetryInstance, "count", false), CoreMatchers.is(Integer.valueOf(i)));
        Assert.assertThat(Long.valueOf(((Duration) ClassUtils.getFieldValue(createRetryInstance, "frequency", false)).toMillis()), CoreMatchers.is(Long.valueOf(j)));
    }

    private void switchConnection() throws Exception {
        flowRunner("switchConnection").run();
    }

    private <T> CursorIterator<T> getCursor(String str, Integer num, MuleErrors muleErrors) throws Exception {
        return ((CursorIteratorProvider) flowRunner(str).withPayload(num).withVariable("errorType", muleErrors).keepStreamsOpen().run().getMessage().getPayload().getValue()).openCursor();
    }

    private void clear(List<CoreEvent> list) {
        synchronized (list) {
            list.clear();
        }
    }

    private int size(List<CoreEvent> list) {
        int size;
        synchronized (list) {
            size = list.size();
        }
        return size;
    }
}
