package org.mule.runtime.core.processor.strategy;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.registry.RegistrationException;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.util.concurrent.Latch;
import org.mule.runtime.core.util.concurrent.NamedThreadFactory;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;

/* loaded from: input_file:org/mule/runtime/core/processor/strategy/AbstractProcessingStrategyTestCase.class */
public abstract class AbstractProcessingStrategyTestCase extends AbstractReactiveProcessorTestCase {
    protected static final String CPU_LIGHT = "cpuLight";
    protected static final String IO = "I/O";
    protected static final String CPU_INTENSIVE = "cpuIntensive";
    protected Flow flow;
    protected volatile Set<String> threads;
    protected Processor cpuLightProcessor;
    protected Processor cpuIntensiveProcessor;
    protected Processor blockingProcessor;
    protected Scheduler cpuLight;
    protected Scheduler blocking;
    protected Scheduler cpuIntensive;
    protected Scheduler custom;
    private ExecutorService asyncExecutor;

    @Rule
    public ExpectedException expectedException;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/mule/runtime/core/processor/strategy/AbstractProcessingStrategyTestCase$FirstInvocationLatchedProcessor.class */
    public class FirstInvocationLatchedProcessor implements Processor {
        private ReactiveProcessor.ProcessingType type;
        private volatile Latch latch = new Latch();
        private volatile Latch firstCalledLatch = new Latch();
        private volatile Latch secondCalledLatch = new Latch();
        private AtomicBoolean firstCalled = new AtomicBoolean();

        public FirstInvocationLatchedProcessor(ReactiveProcessor.ProcessingType processingType) {
            this.type = processingType;
        }

        public Event process(Event event) throws MuleException {
            AbstractProcessingStrategyTestCase.this.threads.add(Thread.currentThread().getName());
            if (this.firstCalled.compareAndSet(false, true)) {
                this.firstCalledLatch.release();
                try {
                    this.latch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                this.secondCalledLatch.countDown();
            }
            return event;
        }

        public ReactiveProcessor.ProcessingType getProcessingType() {
            return this.type;
        }

        public void releaseFirst() {
            this.latch.release();
        }

        public CountDownLatch getFirstCalledLatch() throws InterruptedException {
            return this.firstCalledLatch;
        }

        public CountDownLatch getSecondCalledLatch() throws InterruptedException {
            return this.secondCalledLatch;
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/processor/strategy/AbstractProcessingStrategyTestCase$TestScheduler.class */
    static class TestScheduler extends ScheduledThreadPoolExecutor implements Scheduler {
        public TestScheduler(int i, String str) {
            super(i, (ThreadFactory) new NamedThreadFactory(str));
        }

        @Override // java.util.concurrent.ScheduledThreadPoolExecutor, java.util.concurrent.AbstractExecutorService, java.util.concurrent.ExecutorService
        public Future<?> submit(Runnable runnable) {
            return super.submit(runnable);
        }

        public void stop(long j, TimeUnit timeUnit) {
        }

        public ScheduledFuture<?> scheduleWithCronExpression(Runnable runnable, String str) {
            throw new UnsupportedOperationException("Cron expression scheduling is not supported in unit tests. You need the productive service implementation.");
        }

        public ScheduledFuture<?> scheduleWithCronExpression(Runnable runnable, String str, TimeZone timeZone) {
            throw new UnsupportedOperationException("Cron expression scheduling is not supported in unit tests. You need the productive service implementation.");
        }

        public String getName() {
            return TestScheduler.class.getSimpleName();
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/processor/strategy/AbstractProcessingStrategyTestCase$ThreadTrackingProcessor.class */
    class ThreadTrackingProcessor implements Processor {
        ThreadTrackingProcessor() {
        }

        public Event process(Event event) {
            AbstractProcessingStrategyTestCase.this.threads.add(Thread.currentThread().getName());
            return event;
        }
    }

    public AbstractProcessingStrategyTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
        this.threads = new HashSet();
        this.cpuLightProcessor = new ThreadTrackingProcessor() { // from class: org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase.1
            public ReactiveProcessor.ProcessingType getProcessingType() {
                return ReactiveProcessor.ProcessingType.CPU_LITE;
            }
        };
        this.cpuIntensiveProcessor = new ThreadTrackingProcessor() { // from class: org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase.2
            public ReactiveProcessor.ProcessingType getProcessingType() {
                return ReactiveProcessor.ProcessingType.CPU_INTENSIVE;
            }
        };
        this.blockingProcessor = new ThreadTrackingProcessor() { // from class: org.mule.runtime.core.processor.strategy.AbstractProcessingStrategyTestCase.3
            public ReactiveProcessor.ProcessingType getProcessingType() {
                return ReactiveProcessor.ProcessingType.BLOCKING;
            }
        };
        this.expectedException = ExpectedException.none();
    }

    @Before
    public void before() throws RegistrationException {
        this.cpuLight = new TestScheduler(3, CPU_LIGHT);
        this.blocking = new TestScheduler(3, IO);
        this.cpuIntensive = new TestScheduler(3, CPU_INTENSIVE);
        this.custom = new TestScheduler(10, CPU_LIGHT);
        this.asyncExecutor = ((SchedulerService) muleContext.getRegistry().lookupObject(SchedulerService.class)).ioScheduler();
        this.flow = Flow.builder("test", muleContext).processingStrategyFactory((muleContext, str) -> {
            return createProcessingStrategy(muleContext, str);
        }).build();
    }

    protected abstract ProcessingStrategy createProcessingStrategy(MuleContext muleContext, String str);

    @After
    public void after() {
        this.flow.dispose();
        this.cpuLight.shutdownNow();
        this.blocking.shutdownNow();
        this.cpuIntensive.shutdownNow();
        this.asyncExecutor.shutdownNow();
    }

    @Test
    public void singleCpuLight() throws Exception {
        this.flow.setMessageProcessors(Collections.singletonList(this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public void singleCpuLightConcurrent() throws Exception {
        internalSingleCpuLightConcurrent(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalSingleCpuLightConcurrent(boolean z) throws MuleException, InterruptedException {
        FirstInvocationLatchedProcessor firstInvocationLatchedProcessor = new FirstInvocationLatchedProcessor(ReactiveProcessor.ProcessingType.CPU_LITE);
        this.flow.setMessageProcessors(Collections.singletonList(firstInvocationLatchedProcessor));
        this.flow.initialise();
        this.flow.start();
        this.asyncExecutor.submit(() -> {
            return process(this.flow, testEvent());
        });
        firstInvocationLatchedProcessor.getFirstCalledLatch().await();
        this.asyncExecutor.submit(() -> {
            return process(this.flow, testEvent());
        });
        Assert.assertThat(Boolean.valueOf(firstInvocationLatchedProcessor.getSecondCalledLatch().await(50L, TimeUnit.MILLISECONDS)), CoreMatchers.is(Boolean.valueOf(!z)));
        firstInvocationLatchedProcessor.releaseFirst();
        if (z) {
            Assert.assertThat(Boolean.valueOf(firstInvocationLatchedProcessor.getSecondCalledLatch().await(50L, TimeUnit.MILLISECONDS)), CoreMatchers.is(true));
        }
    }

    @Test
    public void multipleCpuLight() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuLightProcessor, this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public void singleBlocking() throws Exception {
        this.flow.setMessageProcessors(Collections.singletonList(this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public void multipleBlocking() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.blockingProcessor, this.blockingProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public void singleCpuIntensive() throws Exception {
        this.flow.setMessageProcessors(Collections.singletonList(this.cpuIntensiveProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public void multipleCpuIntensive() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuIntensiveProcessor, this.cpuIntensiveProcessor, this.cpuIntensiveProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public void mix() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuIntensiveProcessor, this.blockingProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public void mix2() throws Exception {
        this.flow.setMessageProcessors(Arrays.asList(this.cpuLightProcessor, this.cpuLightProcessor, this.blockingProcessor, this.blockingProcessor, this.cpuLightProcessor, this.cpuIntensiveProcessor, this.cpuIntensiveProcessor, this.cpuLightProcessor));
        this.flow.initialise();
        this.flow.start();
        process(this.flow, testEvent());
    }

    @Test
    public abstract void tx() throws Exception;
}
