package org.mule.runtime.module.extension.internal.runtime.source;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.resource.spi.work.Work;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.assertj.core.api.ThrowableAssert;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.Injector;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.util.MessagingExceptionResolver;
import org.mule.runtime.extension.api.runtime.exception.ExceptionHandler;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.test.heisenberg.extension.exception.HeisenbergConnectionExceptionEnricher;
import org.mule.test.module.extension.internal.util.ExtensionsTestUtils;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSourceTestCase.class */
public class ExtensionMessageSourceTestCase extends AbstractExtensionMessageSourceTestCase {

    /* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSourceTestCase$DummySource.class */
    private class DummySource extends Source {
        private String metadataKey;

        DummySource(String str) {
            this.metadataKey = str;
        }

        public void onStart(SourceCallback sourceCallback) throws MuleException {
        }

        public void onStop() {
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{"primary node only", true}, new Object[]{"all nodes", false});
    }

    public ExtensionMessageSourceTestCase(String str, boolean z) {
        this.primaryNodeOnly = z;
    }

    @Test
    public void handleMessage() throws Exception {
        Mockito.reset(new SourceCallbackFactory[]{this.sourceCallbackFactory});
        Mockito.when(this.sourceCallbackFactory.createSourceCallback((SourceCompletionHandlerFactory) Matchers.any())).thenReturn(this.sourceCallback);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ((Source) Mockito.doAnswer(invocationOnMock -> {
            this.sourceCallback.handle(this.result);
            atomicBoolean.set(true);
            return null;
        }).when(this.source)).onStart(this.sourceCallback);
        ((Scheduler) Mockito.doAnswer(invocationOnMock2 -> {
            ((Work) invocationOnMock2.getArguments()[0]).run();
            return null;
        }).when(this.cpuLightScheduler)).execute((Runnable) Matchers.any());
        start();
        Assert.assertThat(Boolean.valueOf(atomicBoolean.get()), CoreMatchers.is(true));
    }

    @Test
    public void handleExceptionAndRestart() throws Exception {
        start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        ((Source) Mockito.verify(this.source)).onStop();
        ((Scheduler) Mockito.verify(this.ioScheduler, Mockito.never())).stop();
        ((Scheduler) Mockito.verify(this.cpuLightScheduler, Mockito.never())).stop();
        ((Source) Mockito.verify(this.source, Mockito.times(2))).onStart(this.sourceCallback);
    }

    @Test
    public void initialise() throws Exception {
        if (this.messageSource.getLifecycleState().isInitialised()) {
            return;
        }
        this.messageSource.initialise();
        ((Source) Mockito.verify(this.source, Mockito.never())).onStart(this.sourceCallback);
    }

    @Test
    public void sourceIsInstantiatedOnce() throws Exception {
        initialise();
        start();
        ((SourceAdapterFactory) Mockito.verify(this.sourceAdapterFactory, Mockito.times(1))).createAdapter((Optional) Matchers.any(), (SourceCallbackFactory) Matchers.any(), (Component) Matchers.any(), (SourceConnectionManager) Matchers.any(), (MessagingExceptionResolver) Matchers.any());
    }

    @Test
    public void failToStart() throws Exception {
        ((Source) Mockito.doThrow(new DefaultMuleException(new Exception())).when(this.source)).onStart((SourceCallback) Matchers.any());
        this.expectedException.expect(CoreMatchers.is(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
        this.messageSource.initialise();
        this.messageSource.start();
    }

    @Test
    public void failWithConnectionExceptionWhenStartingAndGetRetryPolicyExhausted() throws Exception {
        this.messageSource.initialise();
        ConnectionException connectionException = new ConnectionException("ERROR");
        ((Source) Mockito.doThrow(new RuntimeException((Throwable) connectionException)).when(this.source)).onStart(this.sourceCallback);
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(extensionMessageSource::start);
        Assert.assertThat(catchThrowable, CoreMatchers.is(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
        Assert.assertThat(catchThrowable, CoreMatchers.is(exhaustedBecauseOf((Throwable) connectionException)));
        ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
    }

    @Test
    public void failWithNonConnectionExceptionWhenStartingAndGetRetryPolicyExhausted() throws Exception {
        ((Source) Mockito.doThrow(new DefaultMuleException(new IOException("ERROR"))).when(this.source)).onStart(this.sourceCallback);
        this.messageSource.initialise();
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(extensionMessageSource::start);
        Assert.assertThat(catchThrowable, CoreMatchers.is(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
        Assert.assertThat(ExceptionUtils.getThrowables(catchThrowable), org.hamcrest.Matchers.hasItemInArray(CoreMatchers.instanceOf(IOException.class)));
        ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
    }

    @Test
    public void failWithConnectionExceptionWhenStartingAndGetsReconnected() throws Exception {
        ((Source) Mockito.doThrow(new RuntimeException((Throwable) new ConnectionException("ERROR"))).doThrow(new RuntimeException((Throwable) new ConnectionException("ERROR"))).doNothing().when(this.source)).onStart(this.sourceCallback);
        this.messageSource.initialise();
        this.messageSource.start();
        ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
        ((Source) Mockito.verify(this.source, Mockito.times(2))).onStop();
    }

    @Test
    public void getBackPressureStrategy() {
        Assert.assertThat(this.messageSource.getBackPressureStrategy(), CoreMatchers.is(MessageSource.BackPressureStrategy.FAIL));
    }

    @Test
    public void failOnExceptionWithConnectionExceptionAndGetsReconnected() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        ((Source) Mockito.verify(this.source, Mockito.times(2))).onStart(this.sourceCallback);
        ((Source) Mockito.verify(this.source, Mockito.times(1))).onStop();
    }

    @Test
    public void startFailsWithRandomException() throws Exception {
        final RuntimeException runtimeException = new RuntimeException();
        ((Source) Mockito.doThrow(runtimeException).when(this.source)).onStart(this.sourceCallback);
        this.expectedException.expect(exhaustedBecauseOf((Matcher<Throwable>) new BaseMatcher<Throwable>() { // from class: org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSourceTestCase.1
            private Matcher<Exception> exceptionMatcher;

            {
                this.exceptionMatcher = ThrowableCauseMatcher.hasCause(CoreMatchers.sameInstance(runtimeException));
            }

            public boolean matches(Object obj) {
                return this.exceptionMatcher.matches(obj);
            }

            public void describeTo(Description description) {
                this.exceptionMatcher.describeTo(description);
            }
        }));
        initialise();
        this.messageSource.start();
    }

    @Test
    public void start() throws Exception {
        initialise();
        if (!this.messageSource.getLifecycleState().isStarted()) {
            this.messageSource.start();
        }
        ((Source) Mockito.verify(this.source)).onStart(this.sourceCallback);
        ((Injector) Mockito.verify(muleContext.getInjector())).inject(this.source);
    }

    @Test
    public void failedToCreateRetryScheduler() throws Exception {
        this.messageSource.initialise();
        RuntimeException runtimeException = new RuntimeException();
        ((SchedulerService) Mockito.doThrow(runtimeException).when(muleContext.getSchedulerService())).cpuLightScheduler();
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        Assert.assertThat(ThrowableAssert.catchThrowable(extensionMessageSource::start).getCause(), CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
    }

    @Test
    public void failedToCreateFlowTrigger() throws Exception {
        RuntimeException runtimeException = new RuntimeException();
        ((SchedulerService) Mockito.doThrow(runtimeException).when(muleContext.getSchedulerService())).cpuLightScheduler();
        this.messageSource.initialise();
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(extensionMessageSource::start);
        Assert.assertThat(catchThrowable, CoreMatchers.is(CoreMatchers.instanceOf(LifecycleException.class)));
        Assert.assertThat(catchThrowable.getCause(), CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
    }

    @Test
    public void stop() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.stop();
        ((Source) Mockito.verify(this.source)).onStop();
    }

    @Test
    public void enrichExceptionWithSourceExceptionEnricher() throws Exception {
        Mockito.when(this.enricherFactory.createHandler()).thenReturn(new HeisenbergConnectionExceptionEnricher());
        ExtensionsTestUtils.mockExceptionEnricher(this.sourceModel, this.enricherFactory);
        ExtensionsTestUtils.mockExceptionEnricher(this.sourceModel, this.enricherFactory);
        ExtensionMessageSource newExtensionMessageSourceInstance = getNewExtensionMessageSourceInstance();
        newExtensionMessageSourceInstance.initialise();
        ((Source) Mockito.doThrow(new RuntimeException("ERROR")).when(this.source)).onStart(this.sourceCallback);
        newExtensionMessageSourceInstance.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(newExtensionMessageSourceInstance::start);
        Assert.assertThat(Boolean.valueOf(org.mule.runtime.core.api.util.ExceptionUtils.containsType(catchThrowable, ConnectionException.class)), CoreMatchers.is(true));
        Assert.assertThat(catchThrowable.getMessage(), StringContains.containsString("Enriched Connection Exception: ERROR"));
        newExtensionMessageSourceInstance.stop();
    }

    @Test
    public void enrichExceptionWithExtensionEnricher() throws Exception {
        ExceptionHandler exceptionHandler = (ExceptionHandler) Mockito.mock(ExceptionHandler.class);
        Mockito.when(exceptionHandler.enrichException((Exception) Matchers.any(Exception.class))).thenReturn(new Exception("Enriched: ERROR"));
        Mockito.when(this.enricherFactory.createHandler()).thenReturn(exceptionHandler);
        ExtensionsTestUtils.mockExceptionEnricher(this.extensionModel, this.enricherFactory);
        ExtensionMessageSource newExtensionMessageSourceInstance = getNewExtensionMessageSourceInstance();
        newExtensionMessageSourceInstance.initialise();
        ((Source) Mockito.doThrow(new RuntimeException("ERROR")).when(this.source)).onStart(this.sourceCallback);
        newExtensionMessageSourceInstance.getClass();
        Assert.assertThat(ThrowableAssert.catchThrowable(newExtensionMessageSourceInstance::start).getMessage(), StringContains.containsString("Enriched: ERROR"));
        newExtensionMessageSourceInstance.stop();
    }

    @Test
    public void workManagerDisposedIfSourceFailsToStart() throws Exception {
        start();
        final RuntimeException runtimeException = new RuntimeException();
        ((Source) Mockito.doThrow(runtimeException).when(this.source)).onStop();
        this.expectedException.expect(new BaseMatcher<Throwable>() { // from class: org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSourceTestCase.2
            public boolean matches(Object obj) {
                Exception exc = (Exception) obj;
                return (exc.getCause() instanceof MuleException) && exc.getCause().getCause() == runtimeException;
            }

            public void describeTo(Description description) {
                description.appendText("Exception was not wrapped as expected");
            }
        });
    }

    private BaseMatcher<Throwable> exhaustedBecauseOf(Throwable th) {
        return exhaustedBecauseOf(CoreMatchers.sameInstance(th));
    }

    private BaseMatcher<Throwable> exhaustedBecauseOf(final Matcher<Throwable> matcher) {
        return new BaseMatcher<Throwable>() { // from class: org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSourceTestCase.3
            public boolean matches(Object obj) {
                return matcher.matches(((Throwable) obj).getCause());
            }

            public void describeTo(Description description) {
                matcher.describeTo(description);
            }
        };
    }

    @Test
    public void getMetadataKeyIdObjectValue() throws Exception {
        this.source = new DummySource("person");
        this.sourceAdapter = createSourceAdapter();
        Mockito.when(this.sourceAdapterFactory.createAdapter((Optional) Matchers.any(), (SourceCallbackFactory) Matchers.any(), (Component) Matchers.any(), (SourceConnectionManager) Matchers.any(), (MessagingExceptionResolver) Matchers.any())).thenReturn(this.sourceAdapter);
        this.messageSource = getNewExtensionMessageSourceInstance();
        this.messageSource.initialise();
        this.messageSource.start();
        Assert.assertThat(this.messageSource.getParameterValueResolver().getParameterValue("metadataKey"), CoreMatchers.is("person"));
    }
}
