package org.mule.test.module.extension.streaming;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Features;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.mule.functional.junit4.matchers.ThrowableCauseMatcher;
import org.mule.functional.junit4.matchers.ThrowableMessageMatcher;
import org.mule.runtime.api.exception.ComposedErrorException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextService;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.tck.junit4.matcher.Eventually;
import org.mule.tck.junit4.matcher.FunctionExpressionMatcher;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;
import org.mule.test.runner.RunnerDelegateTo;

@Story("Bytes Streaming")
@RunnerDelegateTo(Parameterized.class)
@Features({@Feature("Streaming"), @Feature("Fork/Join Strategies used by scatter-gather and parallel-foreach routers")})
/* loaded from: input_file:org/mule/test/module/extension/streaming/ScatterGatherTimeoutWithBytesStreamingExtensionTestCase.class */
public class ScatterGatherTimeoutWithBytesStreamingExtensionTestCase extends AbstractExtensionFunctionalTestCase {
    private static final String DATA = RandomStringUtils.insecure().nextAlphabetic(2048);

    @Inject
    private EventContextService eventContextService;

    @Rule
    public SystemProperty configName;

    /* loaded from: input_file:org/mule/test/module/extension/streaming/ScatterGatherTimeoutWithBytesStreamingExtensionTestCase$AssertPayloadIsIteratorProvider.class */
    public static class AssertPayloadIsIteratorProvider implements Processor {
        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            MatcherAssert.assertThat(coreEvent.getMessage().getPayload().getValue(), CoreMatchers.instanceOf(CursorIteratorProvider.class));
            return coreEvent;
        }
    }

    @Parameterized.Parameters(name = "config used: `{0}`")
    public static Iterable<String> configs() {
        return Arrays.asList("drStrange", "poolingDrStrange");
    }

    public ScatterGatherTimeoutWithBytesStreamingExtensionTestCase(String str) {
        this.configName = new SystemProperty("configName", str);
    }

    protected String getConfigFile() {
        return "streaming/scatter-gather-bytes-streaming-extension-config.xml";
    }

    @Test
    @Description("A Scatter Gather router will time out while an operation is still executing. The operation then finishes and generates a stream which should eventually be closed.")
    @Issue("W-16941297")
    public void whenScatterGatherTimesOutThenStreamsAreNotLeaked() throws InterruptedException {
        runScatterGatherFlowAndAwaitStreamClosed("scatterGatherWithTimeout");
        MatcherAssert.assertThat(this.eventContextService, CoreMatchers.is(Eventually.eventually(FunctionExpressionMatcher.expressionMatches((v0) -> {
            return v0.getCurrentlyActiveFlowStacks();
        }, IsEmptyCollection.empty()))));
    }

    @Test
    @Description("A Scatter Gather router with collect-list strategy will time out while an operation is still executing. The operation then finishes and generates a stream which should eventually be closed.")
    @Issue("W-16941297")
    public void whenScatterGatherWithCollectListTimesOutThenStreamsAreNotLeaked() throws InterruptedException {
        runScatterGatherFlowAndAwaitStreamClosed("scatterGatherWithTimeoutCollectList");
        MatcherAssert.assertThat(this.eventContextService, CoreMatchers.is(Eventually.eventually(FunctionExpressionMatcher.expressionMatches((v0) -> {
            return v0.getCurrentlyActiveFlowStacks();
        }, IsEmptyCollection.empty()))));
    }

    @Test
    @Description("A Scatter Gather router will time out while an operation inside a referenced flow is still executing. The operation then finishes and generates a stream which should eventually be closed.")
    @Issue("W-16941297")
    public void whenScatterGatherWithFlowRefTimesOutThenStreamsAreNotLeaked() throws InterruptedException {
        runScatterGatherFlowAndAwaitStreamClosed("scatterGatherWithTimeoutFlowRef");
        MatcherAssert.assertThat(this.eventContextService, CoreMatchers.is(Eventually.eventually(FunctionExpressionMatcher.expressionMatches((v0) -> {
            return v0.getCurrentlyActiveFlowStacks();
        }, IsEmptyCollection.empty()))));
    }

    @Test
    @Description("A Scatter Gather router will time out while an operation inside another nested Scatter Gather is still executing. The operation then finishes and generates a stream which should eventually be closed.")
    @Issue("W-16941297")
    public void whenScatterGatherWithNestedTimesOutThenStreamsAreNotLeaked() throws InterruptedException {
        runScatterGatherFlowAndAwaitStreamClosed("scatterGatherWithNestedRoute");
        MatcherAssert.assertThat(this.eventContextService, CoreMatchers.is(Eventually.eventually(FunctionExpressionMatcher.expressionMatches((v0) -> {
            return v0.getCurrentlyActiveFlowStacks();
        }, IsEmptyCollection.empty()))));
    }

    @Test
    @Issue("W-16941297")
    public void scatterGatherTimeoutStress() throws InterruptedException, ExecutionException {
        String str = "scatterGatherWithTimeout";
        runScatterGatherFlowAndAwaitStreamClosed("scatterGatherWithTimeout");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    runScatterGatherFlowAndAwaitStreamClosed(str);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        newFixedThreadPool.shutdown();
        MatcherAssert.assertThat(Boolean.valueOf(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)), CoreMatchers.is(true));
        runScatterGatherFlowAndAwaitStreamClosed("scatterGatherWithTimeout");
        runScatterGatherFlowAndAwaitStreamClosed("scatterGatherWithTimeout");
        MatcherAssert.assertThat(this.eventContextService, CoreMatchers.is(Eventually.eventually(FunctionExpressionMatcher.expressionMatches((v0) -> {
            return v0.getCurrentlyActiveFlowStacks();
        }, IsEmptyCollection.empty())).atMostIn(20, TimeUnit.SECONDS)));
    }

    private void runScatterGatherFlowAndAwaitStreamClosed(String str) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MatcherAssert.assertThat(Assert.assertThrows(MessagingException.class, () -> {
            flowRunner(str).withPayload(Collections.singletonList(DATA)).withVariable("latch", countDownLatch).withVariable("providerClosedLatch", countDownLatch2).run();
        }), ThrowableCauseMatcher.hasCause(CoreMatchers.allOf(new Matcher[]{Is.isA(ComposedErrorException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Route 1: java.util.concurrent.TimeoutException: Timeout while processing route/part: '1'"))})));
        countDownLatch.countDown();
        countDownLatch2.await();
    }

    protected boolean isGracefulShutdown() {
        return true;
    }
}
