package org.mule.runtime.core.processor.strategy;

import java.util.Arrays;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.processor.strategy.ProactorProcessingStrategyFactory;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import org.mule.tck.testmodels.mule.TestTransaction;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;
import ru.yandex.qatools.allure.annotations.Stories;

@Stories({"Proactor Processing Strategy"})
@Features({"Processing Strategies"})
/* loaded from: input_file:org/mule/runtime/core/processor/strategy/ProactorProcessingStrategyTestCase.class */
public class ProactorProcessingStrategyTestCase extends AbstractProcessingStrategyTestCase {
    public ProactorProcessingStrategyTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    protected ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String str) {
        return new ProactorProcessingStrategyFactory.ProactorProcessingStrategy(() -> {
            return this.cpuLight;
        }, () -> {
            return this.blocking;
        }, () -> {
            return this.cpuIntensive;
        }, scheduler -> {
        }, Integer.MAX_VALUE, () -> {
            return this.blocking;
        }, AbstractRingBufferProcessingStrategyFactory.DEFAULT_BUFFER_SIZE, AbstractRingBufferProcessingStrategyFactory.DEFAULT_SUBSCRIBER_COUNT, AbstractRingBufferProcessingStrategyFactory.DEFAULT_WAIT_STRATEGY, muleContext);
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, when all processor are CPU_LIGHT then they are all exectured in a single  cpu light thread.")
    public void singleCpuLight() throws Exception {
        super.singleCpuLight();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("When ProactorProcessingStrategy is configured, two concurrent requests may be processed by two different  cpu light threads. MULE-11132 is needed for true reactor behaviour.")
    public void singleCpuLightConcurrent() throws Exception {
        super.singleCpuLightConcurrent();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1), Matchers.lessThanOrEqualTo(2)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuLight");
        }).count()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1L), Matchers.lessThanOrEqualTo(2L)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, when all processor are CPU_LIGHT then they are all exectured in a single  cpu light thread.")
    public void multipleCpuLight() throws Exception {
        super.multipleCpuLight();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, a BLOCKING message processor is scheduled on a IO thread.")
    public void singleBlocking() throws Exception {
        super.singleBlocking();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, each BLOCKING message processor is scheduled on a IO thread. These may, or may not, be the same thread.")
    public void multipleBlocking() throws Exception {
        super.multipleBlocking();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1), Matchers.lessThanOrEqualTo(3)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("I/O");
        }).count()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1L), Matchers.lessThanOrEqualTo(3L)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, a CPU_INTENSIVE message processor is scheduled on a CPU intensive thread.")
    public void singleCpuIntensive() throws Exception {
        super.singleCpuIntensive();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, each CPU_INTENSIVE message processor is scheduled on a CPU Intensive thread. These may, or may not, be the same thread.")
    public void multipleCpuIntensive() throws Exception {
        super.multipleCpuIntensive();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1), Matchers.lessThanOrEqualTo(3)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuIntensive");
        }).count()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1L), Matchers.lessThanOrEqualTo(3L)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(0L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, when there is a mix of processor processing types, each processor is scheduled on the correct scheduler.")
    public void mix() throws Exception {
        super.mix();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.equalTo(3));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuIntensive");
        }).count()), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuLight");
        }).count()), CoreMatchers.equalTo(1L));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("With the ProactorProcessingStrategy, when there is a mix of processor processing types, each processor is scheduled on the correct scheduler.")
    public void mix2() throws Exception {
        super.mix2();
        MatcherAssert.assertThat(Integer.valueOf(this.threads.size()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(3), Matchers.lessThanOrEqualTo(7)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str -> {
            return str.startsWith("cpuIntensive");
        }).count()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1L), Matchers.lessThanOrEqualTo(2L)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str2 -> {
            return str2.startsWith("I/O");
        }).count()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1L), Matchers.lessThanOrEqualTo(2L)));
        MatcherAssert.assertThat(Long.valueOf(this.threads.stream().filter(str3 -> {
            return str3.startsWith("cpuLight");
        }).count()), CoreMatchers.allOf(Matchers.greaterThanOrEqualTo(1L), Matchers.lessThanOrEqualTo(3L)));
    }

    @Override // org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase
    @Description("When the ProactorProcessingStrategy is configured and a transaction is active processing fails with an error")
    public void tx() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        TransactionCoordination.getInstance().bindTransaction(new TestTransaction(muleContext));
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Unable to process a transactional flow asynchronously")));
        process(this.flow, testEvent());
    }
}
