package org.mule.test.routing;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.util.concurrent.CountDownLatch;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mule.functional.api.component.TestConnectorQueueHandler;
import org.mule.functional.api.flow.FlowRunner;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.test.AbstractIntegrationTestCase;

@Story("Async")
@Feature("Routers")
/* loaded from: input_file:org/mule/test/routing/AsyncTestCase.class */
public class AsyncTestCase extends AbstractIntegrationTestCase {
    private static final int MAX_CONCURRENCY = 2;

    @Rule
    public SystemProperty maxConcurrency = new SystemProperty("maxConcurrency", "2");
    private TestConnectorQueueHandler queueHandler;
    private CountDownLatch terminationLatch;

    protected String getConfigFile() {
        return "org/mule/test/routing/async-test.xml";
    }

    @Before
    public void before() {
        this.queueHandler = new TestConnectorQueueHandler(this.registry);
    }

    @After
    public void after() throws InterruptedException {
        this.terminationLatch.await();
    }

    @Test
    @Description("Assert that components in an async run in the correct thread according to the flow's PS")
    public void psThreadingPropagated() throws Exception {
        this.terminationLatch = new CountDownLatch(1);
        FlowRunner flowRunner = flowRunner("ps-threading-propagated");
        flowRunner.buildEvent().getContext().onTerminated((coreEvent, th) -> {
            this.terminationLatch.countDown();
        });
        flowRunner.run();
        CoreEvent read = this.queueHandler.read("asyncFinished", 1000L);
        Assert.assertThat(read, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(read.getMessage().getPayload().getValue().toString(), Matchers.startsWith("[MuleRuntime].cpuIntensive."));
    }

    @Test
    @Description("Assert that async maxConcurrency is honored")
    public void withMaxConcurrency() throws Exception {
        testAsyncMaxConcurrency("with-max-concurrency");
    }

    @Test
    @Description("Assert that even if async is full, the calling flow continues executing")
    public void withMaxConcurrencyAsyncDispatched() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows("with-max-concurrency", countDownLatch);
        for (int i = 0; i < 3; i++) {
            Assert.assertThat("" + i, this.queueHandler.read("asyncDispatched", 1000L), Matchers.not(Matchers.nullValue()));
        }
        countDownLatch.countDown();
    }

    @Test
    @Description("Assert that if no maxConcurrency is configured for an async, the value from the flow is inherited")
    public void withFlowMaxConcurrency() throws Exception {
        testAsyncMaxConcurrency("with-flow-max-concurrency");
    }

    @Test
    @Description("Assert that if both flow and async have maxConcurrency, they are independent")
    public void withLowerFlowMaxConcurrency() throws Exception {
        testAsyncMaxConcurrency("with-lower-flow-max-concurrency");
    }

    @Test
    @Description("Assert that asyncs in a sub-flow don't use up the maxConcurrency of the caller flow")
    public void withinSubflowDoesntUseFlowMaxConcurrency() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows("within-subflow-doesnt-use-flow-max-concurrency", countDownLatch);
        for (int i = 0; i < 3; i++) {
            Assert.assertThat("" + i, this.queueHandler.read("asyncRunning", 1000L), Matchers.not(Matchers.nullValue()));
        }
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertThat("" + i2, this.queueHandler.read("asyncDispatched", 1000L), Matchers.not(Matchers.nullValue()));
        }
        countDownLatch.countDown();
    }

    private void testAsyncMaxConcurrency(String str) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        runFlows(str, countDownLatch);
        for (int i = 0; i < 2; i++) {
            Assert.assertThat("" + i, this.queueHandler.read("asyncRunning", 1000L), Matchers.not(Matchers.nullValue()));
        }
        Assert.assertThat(this.queueHandler.read("asyncRunning", 1000L), Matchers.nullValue());
        countDownLatch.countDown();
        Assert.assertThat(this.queueHandler.read("asyncRunning", 1000L), Matchers.not(Matchers.nullValue()));
    }

    private void runFlows(String str, CountDownLatch countDownLatch) throws Exception {
        this.terminationLatch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            FlowRunner withVariable = flowRunner(str).withPayload(Integer.valueOf(i)).withVariable("latch", countDownLatch);
            withVariable.buildEvent().getContext().onTerminated((coreEvent, th) -> {
                this.terminationLatch.countDown();
            });
            withVariable.run();
        }
    }
}
