package org.mule.test.module.extension.streaming;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Features;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsCollectionWithSize;
import org.hamcrest.core.Is;
import org.hamcrest.number.OrderingComparison;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.mule.functional.junit4.matchers.ThrowableCauseMatcher;
import org.mule.functional.junit4.matchers.ThrowableMessageMatcher;
import org.mule.runtime.api.exception.ComposedErrorException;
import org.mule.runtime.core.api.event.EventContextService;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;

@Story("Bytes Streaming")
@Features({@Feature("Streaming"), @Feature("Fork/Join Strategies used by scatter-gather and parallel-foreach routers")})
/* loaded from: input_file:org/mule/test/module/extension/streaming/ScatterGatherTimeoutDontCompleteWithBytesStreamingExtensionTestCase.class */
public class ScatterGatherTimeoutDontCompleteWithBytesStreamingExtensionTestCase extends AbstractExtensionFunctionalTestCase {
    private static final String DATA = RandomStringUtils.insecure().nextAlphabetic(2048);

    @ClassRule
    public static SystemProperty CONFIG_NAME = new SystemProperty("configName", "drStrange");

    @Inject
    private EventContextService eventContextService;

    protected String getConfigFile() {
        return "streaming/scatter-gather-bytes-streaming-extension-config.xml";
    }

    @Test
    @Description("A Scatter Gather router will time out while an operation is still executing. The operation then finishes and generates a stream which will not be closed because the feature flag is disabled.")
    @Issue("W-16941297")
    public void whenScatterGatherTimesOutThenStreamsAreLeaked() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MatcherAssert.assertThat(Assert.assertThrows(MessagingException.class, () -> {
            flowRunner("scatterGatherWithTimeout").withPayload(Collections.singletonList(DATA)).withVariable("latch", countDownLatch).withVariable("providerClosedLatch", countDownLatch2).run();
        }), ThrowableCauseMatcher.hasCause(CoreMatchers.allOf(new Matcher[]{Is.isA(ComposedErrorException.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Route 1: java.util.concurrent.TimeoutException: Timeout while processing route/part: '1'"))})));
        countDownLatch.countDown();
        MatcherAssert.assertThat("Paging provider should not have been closed", Boolean.valueOf(countDownLatch2.await(5L, TimeUnit.SECONDS)), CoreMatchers.is(false));
        MatcherAssert.assertThat(this.eventContextService.getCurrentlyActiveFlowStacks(), IsCollectionWithSize.hasSize(OrderingComparison.greaterThan(0)));
    }
}
