package org.mule.test.routing;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Stories;
import io.qameta.allure.Story;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.mule.functional.api.flow.FlowRunner;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.tck.junit4.rule.DynamicPort;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.test.AbstractIntegrationTestCase;
import org.mule.tests.api.TestQueueManager;

@Story("Async")
@Feature("Scope")
/* loaded from: input_file:org/mule/test/routing/AsyncTestCase.class */
public class AsyncTestCase extends AbstractIntegrationTestCase {
    private static final int MAX_CONCURRENCY = 2;

    @Inject
    private TestQueueManager queueManager;

    @Rule
    public DynamicPort port = new DynamicPort("http.port");

    @Rule
    public SystemProperty maxConcurrency = new SystemProperty("maxConcurrency", "2");
    private CountDownLatch terminationLatch;

    @Inject
    @Named("with-max-concurrency")
    private FlowConstruct withMaxConcurrency;

    protected String getConfigFile() {
        return "org/mule/test/routing/async-test.xml";
    }

    @After
    public void after() throws InterruptedException {
        if (this.terminationLatch != null) {
            this.terminationLatch.await();
        }
    }

    @Test
    @Description("Assert that components in an async run in the correct thread according to the flow's PS")
    public void psThreadingPropagated() throws Exception {
        this.terminationLatch = new CountDownLatch(1);
        FlowRunner flowRunner = flowRunner("ps-threading-propagated");
        flowRunner.buildEvent().getContext().onTerminated((coreEvent, th) -> {
            this.terminationLatch.countDown();
        });
        flowRunner.run();
        CoreEvent read = this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(read, Matchers.not(Matchers.nullValue()));
        MatcherAssert.assertThat(read.getMessage().getPayload().getValue().toString(), Matchers.startsWith("[MuleRuntime].uber."));
    }

    @Test
    @Description("Assert that async maxConcurrency is honored")
    @Stories({@Story("Backpressure"), @Story("Max concurrency")})
    public void withMaxConcurrency() throws Exception {
        testAsyncMaxConcurrency("with-max-concurrency");
    }

    @Test
    @Story("Backpressure")
    @Description("Assert that even if async is full, the calling flow continues executing")
    public void withMaxConcurrencyAsyncDispatched() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows("with-max-concurrency", countDownLatch);
        for (int i = 0; i < 3; i++) {
            MatcherAssert.assertThat("" + i, this.queueManager.read("asyncDispatched", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        }
        countDownLatch.countDown();
    }

    @Test
    @Description("Assert that if no maxConcurrency is configured for an async, the value from the flow is inherited")
    @Stories({@Story("Backpressure"), @Story("Max concurrency")})
    public void withFlowMaxConcurrency() throws Exception {
        testAsyncMaxConcurrency("with-flow-max-concurrency");
    }

    @Test
    @Description("Assert that if both flow and async have maxConcurrency, they are independent")
    @Stories({@Story("Backpressure"), @Story("Max concurrency")})
    public void withLowerFlowMaxConcurrency() throws Exception {
        testAsyncMaxConcurrency("with-lower-flow-max-concurrency");
    }

    @Test
    @Story("Local Transaction")
    @Description("Assert that async blocks run outside of the transaction from the caller flow")
    public void withSourceTx() throws Exception {
        this.terminationLatch = new CountDownLatch(0);
        ((Startable) this.locator.find(Location.builderFromStringRepresentation("with-source-tx").build()).get()).start();
        MatcherAssert.assertThat(this.queueManager.read("asyncDispatched", 5000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        MatcherAssert.assertThat(this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
    }

    @Test
    @Story("Local Transaction")
    @Description("Assert that async blocks run outside of the transaction from the `try` in the caller flow")
    public void withTryTx() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows("with-try-tx", countDownLatch);
        MatcherAssert.assertThat(this.queueManager.read("asyncDispatched", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        MatcherAssert.assertThat(this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        countDownLatch.countDown();
    }

    @Test
    @Story("Local Transaction")
    @Description("Assert that txs within async blocks are honored")
    public void txWithinAsync() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows("tx-within-async", countDownLatch);
        MatcherAssert.assertThat(this.queueManager.read("asyncDispatched", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        MatcherAssert.assertThat(this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        countDownLatch.countDown();
    }

    @Test
    @Description("Assert that a combination of sub-flow, async, and try works as expected")
    public void tryNoTxWithinAsyncSubFlow() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows("tryNoTx-within-async-subFlow", countDownLatch);
        MatcherAssert.assertThat(this.queueManager.read("asyncDispatched", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        MatcherAssert.assertThat(this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        countDownLatch.countDown();
    }

    @Test
    @Story("Graceful shutdown")
    @Issue("MULE-17048")
    public void flowStoppedWhileAsyncInFlight() throws Exception {
        flowRunner("with-max-concurrency").withPayload("").withVariable("latch", new CountDownLatch(1)).run();
        MatcherAssert.assertThat(this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        LifecycleUtils.stopIfNeeded(this.withMaxConcurrency);
        MatcherAssert.assertThat(this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS), Matchers.nullValue());
        LifecycleUtils.startIfNeeded(this.withMaxConcurrency);
    }

    @Test
    @Description("Verify that operations inner fluxes are not terminated when within async/sub-flow combination.")
    @Issue("MULE-18304")
    public void asyncFlowWithSdkOperation() throws Exception {
        flowRunner("asyncFlowWithSdkOperation").run();
        MatcherAssert.assertThat(this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        flowRunner("asyncFlowWithSdkOperation").run();
        MatcherAssert.assertThat(this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
    }

    @Test
    @Description("Verify that operations inner fluxes are not terminated when within error-handler/async/sub-flow combination.")
    @Issue("MULE-18304")
    public void asyncFlowWithSdkOperationInErrorHandler() throws Exception {
        flowRunner("asyncFlowWithSdkOperationInErrorHandler").runExpectingException();
        MatcherAssert.assertThat(this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        flowRunner("asyncFlowWithSdkOperationInErrorHandler").runExpectingException();
        MatcherAssert.assertThat(this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
    }

    @Test
    @Description("Verify that operations inner fluxes are not terminated when within error-handler/async/sub-flow combination.")
    @Issue("MULE-19091")
    public void asyncFlowWithSdkOperationInRefErrorHandler() throws Exception {
        flowRunner("asyncFlowWithSdkOperationInRefErrorHandler").runExpectingException();
        MatcherAssert.assertThat(this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        flowRunner("asyncFlowWithSdkOperationInRefErrorHandler").runExpectingException();
        MatcherAssert.assertThat(this.queueManager.read("asyncFinished", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
    }

    private void testAsyncMaxConcurrency(String str) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows(str, countDownLatch);
        for (int i = 0; i < 2; i++) {
            MatcherAssert.assertThat("" + i, this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
        }
        MatcherAssert.assertThat(this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.nullValue());
        countDownLatch.countDown();
        MatcherAssert.assertThat(this.queueManager.read("asyncRunning", 1000L, TimeUnit.MILLISECONDS), Matchers.not(Matchers.nullValue()));
    }

    private void runFlows(String str, CountDownLatch countDownLatch) throws Exception {
        this.terminationLatch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            FlowRunner withVariable = flowRunner(str).withPayload(Integer.valueOf(i)).withVariable("latch", countDownLatch);
            withVariable.buildEvent().getContext().onTerminated((coreEvent, th) -> {
                this.terminationLatch.countDown();
            });
            withVariable.run();
        }
    }

    protected boolean isGracefulShutdown() {
        return true;
    }
}
