package org.mule.test.construct;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.logging.log4j.core.util.Throwables;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.mule.functional.api.component.EventCallback;
import org.mule.functional.api.exception.ExpectedError;
import org.mule.functional.junit4.matchers.ThrowableMessageMatcher;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.MessageProcessorNotificationListener;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.HttpService;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.TestHttpClient;
import org.mule.tck.junit4.matcher.ErrorTypeMatcher;
import org.mule.tck.junit4.rule.DynamicPort;
import org.mule.tck.probe.PollingProber;
import org.mule.test.AbstractIntegrationTestCase;

@Story("Flow Reference")
@Feature("Core Components")
/* loaded from: input_file:org/mule/test/construct/FlowRefTestCase.class */
public class FlowRefTestCase extends AbstractIntegrationTestCase {
    private static final String CONTEXT_DEPTH_MESSAGE = "Too many nested child contexts.";

    @Rule
    public ExpectedError expectedException = ExpectedError.none();

    @Rule
    public DynamicPort port = new DynamicPort("port");
    private List<Future<HttpResponse>> sendAsyncs = new ArrayList();

    @Rule
    public TestHttpClient httpClient = new TestHttpClient.Builder(getService(HttpService.class)).build();
    private Scheduler asyncFlowRunnerScheduler;

    @Inject
    private Flow referencedFlowWithMaxConcurrency;
    private static CountDownLatch latch;
    private static AtomicInteger callbackInFlight = new AtomicInteger();
    private static AtomicInteger awaiting = new AtomicInteger();

    /* loaded from: input_file:org/mule/test/construct/FlowRefTestCase$LatchAwaitCallback.class */
    public static class LatchAwaitCallback extends AbstractComponent implements EventCallback {
        public void eventReceived(CoreEvent coreEvent, Object obj, MuleContext muleContext) throws Exception {
            FlowRefTestCase.callbackInFlight.incrementAndGet();
            FlowRefTestCase.awaiting.incrementAndGet();
            FlowRefTestCase.latch.await();
            FlowRefTestCase.callbackInFlight.decrementAndGet();
        }
    }

    protected String getConfigFile() {
        return "org/mule/test/construct/flow-ref.xml";
    }

    @Before
    public void before() {
        this.sendAsyncs = new ArrayList();
        latch = new CountDownLatch(1);
        awaiting.set(0);
        this.asyncFlowRunnerScheduler = muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withShutdownTimeout(0L, TimeUnit.SECONDS));
    }

    @After
    public void after() throws Exception {
        this.asyncFlowRunnerScheduler.shutdownNow();
        latch.countDown();
        Iterator<Future<HttpResponse>> it = this.sendAsyncs.iterator();
        while (it.hasNext()) {
            it.next().get(5000L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void twoFlowRefsToSubFlow() throws Exception {
        Assert.assertThat(getPayloadAsString(flowRunner("flow1").withPayload("0").run().getMessage()), Matchers.is("012xyzabc312xyzabc3"));
    }

    @Test
    public void dynamicFlowRef() throws Exception {
        Assert.assertThat(flowRunner("flow2").withPayload("0").withVariable("letter", "A").run().getMessage().getPayload().getValue(), Matchers.is("0A"));
        Assert.assertThat(flowRunner("flow2").withPayload("0").withVariable("letter", "B").run().getMessage().getPayload().getValue(), Matchers.is("0B"));
    }

    @Test
    public void dynamicFlowRefTextPlain() throws Exception {
        Assert.assertThat(flowRunner("flow3").withPayload("0").withVariable("letter", " A ", DataType.TEXT_STRING).run().getMessage().getPayload().getValue(), Matchers.is("0A"));
        Assert.assertThat(flowRunner("flow3").withPayload("0").withVariable("letter", " B ", DataType.TEXT_STRING).run().getMessage().getPayload().getValue(), Matchers.is("0B"));
    }

    @Test
    public void dynamicFlowRefWithChoice() throws Exception {
        Assert.assertThat(flowRunner("flow2").withPayload("0").withVariable("letter", "C").run().getMessage().getPayload().getValue(), Matchers.is("0A"));
    }

    @Test
    public void flowRefTargetToFlow() throws Exception {
        Assert.assertThat(((TypedValue) flowRunner("targetToFlow").run().getVariables().get("flowRefResult")).getValue(), Matchers.is("result"));
    }

    @Test
    public void flowRefTargetToSubFlow() throws Exception {
        Assert.assertThat(((TypedValue) flowRunner("targetToSubFlow").run().getVariables().get("flowRefResult")).getValue(), Matchers.is("result"));
    }

    @Test
    public void dynamicFlowRefWithScatterGather() throws Exception {
        List list = (List) ((Map) flowRunner("flow2").withPayload("0").withVariable("letter", "SG").run().getMessage().getPayload().getValue()).values().stream().map(message -> {
            return message.getPayload().getValue();
        }).collect(Collectors.toList());
        Assert.assertEquals("0A", list.get(0));
        Assert.assertEquals("0B", list.get(1));
    }

    @Test
    public void flowRefNotFound() throws Exception {
        this.expectedException.expectMessage(CoreMatchers.containsString("No flow/sub-flow with name 'sub-flow-Z' found"));
        this.expectedException.expectErrorType(Errors.CORE_NAMESPACE_NAME, "ROUTING");
        Assert.assertThat(flowRunner("flow2").withPayload("0").withVariable("letter", "Z").run().getMessage().getPayload().getValue(), Matchers.is("0C"));
    }

    @Test
    @Issue("MULE-14285")
    public void flowRefFlowErrorNotifications() throws Exception {
        List<MessageProcessorNotification> synchronizedList = Collections.synchronizedList(new ArrayList());
        setupMessageProcessorNotificationListener(synchronizedList);
        Assert.assertThat(flowRunner("flowRefFlowErrorNotifications").runExpectingException().getCause(), Matchers.instanceOf(IllegalStateException.class));
        assertNotifications(synchronizedList, "flowRefFlowErrorNotifications/processors/0");
    }

    @Test
    @Issue("MULE-14285")
    public void flowRefSubFlowErrorNotifications() throws Exception {
        List<MessageProcessorNotification> synchronizedList = Collections.synchronizedList(new ArrayList());
        setupMessageProcessorNotificationListener(synchronizedList);
        Assert.assertThat(flowRunner("flowRefSubFlowErrorNotifications").runExpectingException().getCause(), Matchers.instanceOf(IllegalStateException.class));
        assertNotifications(synchronizedList, "flowRefSubFlowErrorNotifications/processors/0");
    }

    private void setupMessageProcessorNotificationListener(List<MessageProcessorNotification> list) {
        muleContext.getNotificationManager().addInterfaceToType(MessageProcessorNotificationListener.class, MessageProcessorNotification.class);
        muleContext.getNotificationManager().addListener(notification -> {
            list.add((MessageProcessorNotification) notification);
        });
    }

    private void assertNotifications(List<MessageProcessorNotification> list, String str) {
        PollingProber.probe(() -> {
            Assert.assertThat(list.toString(), list, Matchers.hasSize(4));
            MessageProcessorNotification messageProcessorNotification = (MessageProcessorNotification) list.get(0);
            Assert.assertThat(Integer.valueOf(messageProcessorNotification.getAction().getActionId()), CoreMatchers.equalTo(1601));
            Assert.assertThat(messageProcessorNotification.getComponent().getLocation().getLocation(), CoreMatchers.equalTo(str));
            Assert.assertThat(messageProcessorNotification.getException(), Matchers.is(CoreMatchers.nullValue()));
            MessageProcessorNotification messageProcessorNotification2 = (MessageProcessorNotification) list.get(3);
            Assert.assertThat(Integer.valueOf(messageProcessorNotification2.getAction().getActionId()), CoreMatchers.equalTo(1602));
            Assert.assertThat(messageProcessorNotification2.getComponent().getLocation().getLocation(), CoreMatchers.equalTo(str));
            Assert.assertThat(messageProcessorNotification2.getException().getCause(), Matchers.instanceOf(IllegalStateException.class));
            Assert.assertThat(Boolean.valueOf(messageProcessorNotification2.getEvent().getError().isPresent()), Matchers.is(true));
            Assert.assertThat(((Error) messageProcessorNotification2.getEvent().getError().get()).getCause(), Matchers.instanceOf(IllegalStateException.class));
            return true;
        });
    }

    @Test
    public void recursive() throws Exception {
        flowRunner("recursiveCaller").runExpectingException(ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void recursiveDynamic() throws Exception {
        flowRunner("recursiveDynamicCaller").runExpectingException(ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void recursiveSubFlow() throws Exception {
        flowRunner("recursiveSubFlowCaller").runExpectingException(ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void crossedRecursiveSubFlow() throws Exception {
        flowRunner("crossedRecursiveSubflow").runExpectingException(ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void tripleCrossedRecursiveSubFlow() throws Exception {
        flowRunner("tripleCrossedRecursiveSubflow").runExpectingException(ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    public void recursiveSubFlowDynamic() throws Exception {
        flowRunner("recursiveSubFlowDynamicCaller").runExpectingException(ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(CONTEXT_DEPTH_MESSAGE)));
    }

    @Test
    @Story("Backpressure")
    @Ignore("How to handle backpressure on flow-ref's is not defined yet, but this test will provide a starting point in the future...")
    public void backpressureFlowRef() throws Exception {
        HttpRequest build = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRef?ref=backpressureFlowRefInner", Integer.valueOf(this.port.getNumber()))).method(HttpConstants.Method.GET).build();
        int availableProcessors = (Runtime.getRuntime().availableProcessors() * 4) + 1;
        for (int i = 0; i < availableProcessors; i++) {
            this.sendAsyncs.add(this.httpClient.sendAsync(build, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() >= Runtime.getRuntime().availableProcessors() * 2);
        });
        PollingProber.probe(5000L, 50L, () -> {
            Assert.assertThat(Integer.valueOf(this.httpClient.send(build, HttpRequestOptions.builder().responseTimeout(1000).build()).getStatusCode()), Matchers.is(Integer.valueOf(HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode())));
            return true;
        });
    }

    @Test
    @Story("Backpressure")
    @Ignore("How to handle backpressure on flow-ref's is not defined yet, but this test will provide a starting point in the future...")
    public void backpressureFlowRefSub() throws Exception {
        HttpRequest build = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRef?ref=backpressureFlowRefInnerSub", Integer.valueOf(this.port.getNumber()))).method(HttpConstants.Method.GET).build();
        int availableProcessors = (Runtime.getRuntime().availableProcessors() * 4) + 1;
        for (int i = 0; i < availableProcessors; i++) {
            this.sendAsyncs.add(this.httpClient.sendAsync(build, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() >= Runtime.getRuntime().availableProcessors() * 2);
        });
        PollingProber.probe(5000L, 50L, () -> {
            Assert.assertThat(Integer.valueOf(this.httpClient.send(build, HttpRequestOptions.builder().responseTimeout(1000).build()).getStatusCode()), Matchers.is(Integer.valueOf(HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode())));
            return true;
        });
    }

    @Test
    @Story("Backpressure")
    public void backpressureFlowRefMaxConcurrency() throws Exception {
        HttpRequest build = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRefMaxConcurrency?ref=backpressureFlowRefInner", Integer.valueOf(this.port.getNumber()))).method(HttpConstants.Method.GET).build();
        for (int i = 0; i < 2; i++) {
            this.sendAsyncs.add(this.httpClient.sendAsync(build, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() >= 1);
        });
        Assert.assertThat(Integer.valueOf(this.httpClient.send(build).getStatusCode()), Matchers.is(Integer.valueOf(HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode())));
    }

    @Test
    @Story("Backpressure")
    public void backpressureFlowRefMaxConcurrencySub() throws Exception {
        HttpRequest build = HttpRequest.builder().uri(String.format("http://localhost:%s/backpressureFlowRefMaxConcurrency?ref=backpressureFlowRefInnerSub", Integer.valueOf(this.port.getNumber()))).method(HttpConstants.Method.GET).build();
        for (int i = 0; i < 2; i++) {
            this.sendAsyncs.add(this.httpClient.sendAsync(build, HttpRequestOptions.builder().responseTimeout(10000).build()));
        }
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() >= 1);
        });
        Assert.assertThat(Integer.valueOf(this.httpClient.send(build).getStatusCode()), Matchers.is(Integer.valueOf(HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode())));
    }

    @Test
    @Story("Backpressure")
    @Issue("MULE-19328")
    public void backpressureMustNotBeTriggeredAfterFlowRestart() throws Exception {
        flowRunner("outerFlowWithMaxConcurrency").dispatchAsync(this.asyncFlowRunnerScheduler);
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() == 1);
        });
        this.referencedFlowWithMaxConcurrency.stop();
        this.referencedFlowWithMaxConcurrency.start();
        latch.countDown();
        flowRunner("outerFlowWithMaxConcurrency").run();
    }

    @Test
    public void flowWithStoppedTargetFlowFailsToProcess() throws Exception {
        flowRunner("stoppedTargetFlow1").runExpectingException(ErrorTypeMatcher.errorType("MULE", "UNKNOWN"));
    }

    private void testRecursiveFlowrefsAreDetectedFor(String str, String str2) {
        try {
            flowRunner(str);
            Assert.fail("Expected and error regarding a flowref cycle from " + str + ", and with the offending flow being " + str2);
        } catch (Exception e) {
            Assert.assertThat(Throwables.getRootCause(e).getMessage(), CoreMatchers.endsWith(String.format("Found a possible infinite recursion involving flow named %s", str2)));
        }
    }

    @Test
    @Story("Backpressure")
    @Description("The maxConcurrency of a target flow called via flow-ref is enforced")
    @Issue("MULE-18178")
    public void backpressureFlowRefMaxConcurrencyStatic() throws Exception {
        flowRunner("backpressureFlowRefOuterMaxConcurrencyStatic").dispatchAsync(this.asyncFlowRunnerScheduler);
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() == 1);
        });
        flowRunner("backpressureFlowRefOuterMaxConcurrencyStatic").dispatchAsync(this.asyncFlowRunnerScheduler);
        Thread.sleep(5000L);
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() == 1);
        });
        latch.countDown();
        PollingProber.probe(5000L, 50L, () -> {
            return Boolean.valueOf(awaiting.get() == 2);
        });
    }

    @Test
    @Description("Verify that operations inner fluxes are not terminated when within a dynamically invoked sub-flow.")
    @Issue("MULE-18304")
    public void dynamicFlowRefWithSdkOperation() throws Exception {
        flowRunner("dynamicFlowRefWithSdkOperation").run();
        flowRunner("dynamicFlowRefWithSdkOperation").run();
    }

    @Test
    @Description("For each with a flow ref and max concurrency finish processing")
    @Issue("MULE-19319")
    public void forEachWithFlowRefAndMaxConcurrency() throws Exception {
        Integer[] numArr = {1, 2, 3};
        Assert.assertThat(flowRunner("foreachWithFlowRefAndMaxConcurrency").withPayload(numArr).run().getMessage().getPayload().getValue(), Matchers.is(numArr));
    }

    public static int getCallbackInFlight() {
        return callbackInFlight.get();
    }
}
