package org.mule.runtime.core.internal.construct;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.BiFunction;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessage;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.message.InternalMessage;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.construct.AbstractFlowConstructTestCase;
import org.mule.runtime.core.internal.construct.DefaultFlowBuilder;
import org.mule.runtime.core.processor.ResponseMessageProcessorAdapter;
import org.mule.runtime.core.processor.strategy.LegacyAsynchronousProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.SynchronousProcessingStrategyFactory;
import org.mule.runtime.core.transformer.simple.StringAppendTransformer;
import org.mule.tck.SensingNullMessageProcessor;
import org.mule.test.core.lifecycle.LifecycleTrackerProcessor;
import reactor.core.publisher.Mono;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/mule/runtime/core/internal/construct/DefaultFlowTestCase.class */
public class DefaultFlowTestCase extends AbstractFlowConstructTestCase {
    private static final String FLOW_NAME = "test-flow";
    private DefaultFlowBuilder.DefaultFlow flow;
    private SensingNullMessageProcessor sensingMessageProcessor;
    private BiFunction<Processor, Event, Event> triggerFunction;

    public DefaultFlowTestCase(BiFunction<Processor, Event, Event> biFunction) {
        this.triggerFunction = biFunction;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{(processor, event) -> {
            return (Event) Mono.just(event).transform(processor).block();
        }}, new Object[]{(processor2, event2) -> {
            try {
                return processor2.process(event2);
            } catch (MuleException e) {
                throw new RuntimeException((Throwable) e);
            }
        }});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.construct.AbstractFlowConstructTestCase
    public void doSetUp() throws Exception {
        super.doSetUp();
        this.sensingMessageProcessor = getSensingNullMessageProcessor();
        this.flow = new DefaultFlowBuilder.DefaultFlow(FLOW_NAME, muleContext);
        this.flow.setMessageSource(this.directInboundMessageSource);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ResponseMessageProcessorAdapter(new StringAppendTransformer("f")));
        arrayList.add(new ResponseMessageProcessorAdapter(new StringAppendTransformer("e")));
        arrayList.add(new ResponseMessageProcessorAdapter(new StringAppendTransformer("d")));
        arrayList.add(new StringAppendTransformer("a"));
        arrayList.add(new StringAppendTransformer("b"));
        arrayList.add(new StringAppendTransformer("c"));
        arrayList.add(event -> {
            return Event.builder(event).addVariable("thread", Thread.currentThread()).build();
        });
        arrayList.add(this.sensingMessageProcessor);
        this.flow.setMessageProcessors(arrayList);
    }

    @Override // org.mule.runtime.core.construct.AbstractFlowConstructTestCase
    protected AbstractFlowConstruct getFlowConstruct() throws Exception {
        return this.flow;
    }

    @After
    public void after() throws MuleException {
        if (this.flow.isStarted()) {
            this.flow.stop();
        }
        if (this.flow.getLifecycleState().isInitialised()) {
            this.flow.dispose();
        }
    }

    @Test
    public void testProcessOneWayEndpoint() throws Exception {
        this.flow.initialise();
        this.flow.start();
        assertSucessfulProcessing(this.triggerFunction.apply(this.directInboundMessageSource.getListener(), eventBuilder().message(InternalMessage.of("test")).build()));
    }

    @Test
    public void testProcessRequestResponseEndpoint() throws Exception {
        this.flow.initialise();
        this.flow.start();
        assertSucessfulProcessing(this.triggerFunction.apply(this.directInboundMessageSource.getListener(), testEvent()));
    }

    private void assertSucessfulProcessing(Event event) throws MuleException {
        Assert.assertThat(event.getMessageAsString(muleContext), CoreMatchers.equalTo("testabcdef"));
        Assert.assertThat(event.getVariable("thread").getValue(), CoreMatchers.not(CoreMatchers.sameInstance(Thread.currentThread())));
        Assert.assertThat(this.sensingMessageProcessor.event.getMessageAsString(muleContext), CoreMatchers.equalTo("testabc"));
        Assert.assertThat(this.sensingMessageProcessor.event.getVariable("thread").getValue(), CoreMatchers.not(CoreMatchers.sameInstance(Thread.currentThread())));
    }

    @Test
    public void testProcessStopped() throws Exception {
        this.flow.initialise();
        try {
            this.triggerFunction.apply(this.directInboundMessageSource.getListener(), testEvent());
            Assert.fail("exception expected");
        } catch (Exception e) {
        }
    }

    @Test
    public void restartWithSynchronousProcessingStrategy() throws Exception {
        this.flow.setProcessingStrategyFactory(new SynchronousProcessingStrategyFactory());
        this.flow.initialise();
        this.flow.start();
        this.flow.stop();
        this.flow.start();
        Assert.assertThat(this.triggerFunction.apply(this.directInboundMessageSource.getListener(), testEvent()), CoreMatchers.not(CoreMatchers.nullValue()));
    }

    @Test
    public void restartWithAsynchronousProcessingStrategy() throws Exception {
        this.flow.setProcessingStrategyFactory(new LegacyAsynchronousProcessingStrategyFactory());
        this.flow.initialise();
        this.flow.start();
        this.flow.stop();
        this.flow.start();
        Assert.assertThat(this.triggerFunction.apply(this.directInboundMessageSource.getListener(), eventBuilder().message(InternalMessage.of("test")).build()), CoreMatchers.not(CoreMatchers.nullValue()));
    }

    @Test
    public void testFailStartingMessageSourceOnLifecycleShouldStopStartedPipelineProcesses() throws Exception {
        muleContext.start();
        Stoppable stoppable = (MessageSource) Mockito.mock(MessageSource.class, Mockito.withSettings().extraInterfaces(new Class[]{Startable.class, Stoppable.class}));
        ((Startable) Mockito.doThrow(new LifecycleException((I18nMessage) Mockito.mock(I18nMessage.class), "Error starting component")).when((Startable) stoppable)).start();
        this.flow.setMessageSource(stoppable);
        Stoppable stoppable2 = (Processor) Mockito.spy(new LifecycleTrackerProcessor());
        this.flow.getMessageProcessors().add(stoppable2);
        this.flow.initialise();
        try {
            this.flow.start();
            Assert.fail();
        } catch (LifecycleException e) {
        }
        ((Startable) Mockito.verify((Startable) stoppable2, Mockito.times(1))).start();
        ((Stoppable) Mockito.verify(stoppable2, Mockito.times(1))).stop();
        ((Startable) Mockito.verify((Startable) stoppable, Mockito.times(1))).start();
        ((Stoppable) Mockito.verify(stoppable, Mockito.times(1))).stop();
    }
}
