package org.mule.runtime.core.processor.strategy;

import java.util.Arrays;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.processor.strategy.AbstractRingBufferProcessingStrategyFactory;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import org.mule.tck.testmodels.mule.TestTransaction;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;
import ru.yandex.qatools.allure.annotations.Stories;

@Stories({"MultiReactor Processing Strategy"})
@Features({"Processing Strategies"})
/* loaded from: input_file:org/mule/runtime/core/processor/strategy/MultiReactorProcessingStrategyTestCase.class */
public class MultiReactorProcessingStrategyTestCase extends AbstractProcessingStrategyTestCase {
    public MultiReactorProcessingStrategyTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String str) {
        return new AbstractRingBufferProcessingStrategyFactory.RingBufferProcessingStrategy(() -> {
            return this.custom;
        }, AbstractRingBufferProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, 10, AbstractRingBufferProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, muleContext);
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void singleCpuLight() throws Exception {
        super.singleCpuLight();
        assertEverythingOnEventLoop();
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("When MultiReactorProcessingStrategy is configured, two concurrent requests may be processed by two different  cpu light threads.  This is why this strategy is called 'MultiReactor' and not 'Reactor`.  MULE-11132 is needed for true reactor behaviour.")
    public void singleCpuLightConcurrent() throws Exception {
        super.singleCpuLightConcurrent();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1), Matchers.lessThanOrEqualTo(2)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuLight");
        }).count()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1L), Matchers.lessThanOrEqualTo(2L)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void multipleCpuLight() throws Exception {
        super.multipleCpuLight();
        assertEverythingOnEventLoop();
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void singleBlocking() throws Exception {
        super.singleBlocking();
        assertEverythingOnEventLoop();
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void multipleBlocking() throws Exception {
        super.multipleBlocking();
        assertEverythingOnEventLoop();
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void singleCpuIntensive() throws Exception {
        super.singleCpuIntensive();
        assertEverythingOnEventLoop();
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void multipleCpuIntensive() throws Exception {
        super.multipleCpuIntensive();
        assertEverythingOnEventLoop();
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void mix() throws Exception {
        super.mix();
        assertEverythingOnEventLoop();
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("Regardless of processor type, when the MultiReactorProcessingStrategy is configured, the pipeline is executed synchronously in a single cpu light thread.")
    public void mix2() throws Exception {
        super.mix2();
        assertEverythingOnEventLoop();
    }

    private void assertEverythingOnEventLoop() {
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("When the MultiReactorProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void tx() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        TransactionCoordination.getInstance().bindTransaction(new TestTransaction(muleContext));
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Unable to process a transactional flow asynchronously")));
        process(this.flow, testEvent());
    }
}
