package org.mule.service.scheduler.internal;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.IsCollectionContaining;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.scheduler.SchedulerBusyException;
import org.mule.runtime.core.api.scheduler.SchedulerConfig;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.util.concurrent.Latch;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;

@Features({"SchedulerService"})
/* loaded from: input_file:org/mule/service/scheduler/internal/DefaultSchedulerServiceTestCase.class */
public class DefaultSchedulerServiceTestCase extends AbstractMuleTestCase {

    @Rule
    public ExpectedException expected = ExpectedException.none();
    private DefaultSchedulerService service;

    @Before
    public void before() throws MuleException {
        this.service = new DefaultSchedulerService();
        this.service.start();
    }

    @After
    public void after() throws MuleException {
        if (this.service == null) {
            return;
        }
        Iterator it = new ArrayList(this.service.getSchedulers()).iterator();
        while (it.hasNext()) {
            ((Scheduler) it.next()).stop(0L, TimeUnit.SECONDS);
        }
        this.service.stop();
    }

    @Test
    @Description("Tests that the threads of the SchedulerService are correcly created and destroyed.")
    public void serviceStop() throws MuleException {
        Assert.assertThat(collectThreadNames(), IsCollectionContaining.hasItem(StringStartsWith.startsWith(SchedulerService.class.getSimpleName())));
        this.service.stop();
        this.service = null;
        new PollingProber(500L, 50L).check(new JUnitLambdaProbe(() -> {
            Assert.assertThat(collectThreadNames(), CoreMatchers.not(IsCollectionContaining.hasItem(StringStartsWith.startsWith(SchedulerService.class.getSimpleName()))));
            return true;
        }));
    }

    @Test
    @Description("Tests that dispatching a task to a throttled scheduler already running its maximum tasks throws the appropriate exception.")
    public void executorRejects() throws MuleException {
        Latch latch = new Latch();
        Scheduler customScheduler = this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1));
        customScheduler.execute(() -> {
            try {
                latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        this.expected.expect(SchedulerBusyException.class);
        Runnable runnable = () -> {
        };
        try {
            customScheduler.submit(runnable);
            Assert.assertThat(customScheduler.shutdownNow(), CoreMatchers.not(IsCollectionContaining.hasItem(runnable)));
        } catch (Throwable th) {
            Assert.assertThat(customScheduler.shutdownNow(), CoreMatchers.not(IsCollectionContaining.hasItem(runnable)));
            throw th;
        }
    }

    @Test
    @Description("Tests that a dispatched task has inherited the context classloader.")
    public void classLoaderPropagates() throws Exception {
        Scheduler cpuLightScheduler = this.service.cpuLightScheduler();
        ClassLoader classLoader = (ClassLoader) Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(classLoader);
        cpuLightScheduler.submit(() -> {
            Assert.assertThat(Thread.currentThread().getContextClassLoader(), CoreMatchers.sameInstance(classLoader));
        }).get(60L, TimeUnit.SECONDS);
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define waitDispatchingToBusyScheduler");
        this.service.cpuLightScheduler(SchedulerConfig.config().withRejectionAction(SchedulerConfig.RejectionAction.WAIT));
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define waitDispatchingToBusyScheduler");
        this.service.cpuIntensiveScheduler(SchedulerConfig.config().withRejectionAction(SchedulerConfig.RejectionAction.WAIT));
    }

    @Test
    public void onlyCustomMayConfigureWaitIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define waitDispatchingToBusyScheduler");
        this.service.ioScheduler(SchedulerConfig.config().withRejectionAction(SchedulerConfig.RejectionAction.WAIT));
    }

    @Test
    @Description("Tests that tasks dispatched from a CPU Light thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuLight() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Future submit = this.service.cpuLightScheduler().submit(threadsConsumer(this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1)), new Latch()));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from a CPU Intensive thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuIntensive() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Future submit = this.service.cpuIntensiveScheduler().submit(threadsConsumer(this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1)), new Latch()));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from an IO thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyIO() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler ioScheduler = this.service.ioScheduler();
        Scheduler customScheduler = this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1));
        Latch latch = new Latch();
        Future submit = ioScheduler.submit(threadsConsumer(customScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (TimeoutException e) {
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from a Custom scheduler thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustom() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Future submit = this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1)).submit(threadsConsumer(this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1)), new Latch()));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from a Custom scheduler with 'Wait' allowed thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustomWithConfig() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler customScheduler = this.service.customScheduler(SchedulerConfig.config().withRejectionAction(SchedulerConfig.RejectionAction.WAIT).withMaxConcurrentTasks(1), 1);
        Scheduler customScheduler2 = this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1));
        Latch latch = new Latch();
        Future submit = customScheduler.submit(threadsConsumer(customScheduler2, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (TimeoutException e) {
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from any other thread to a busy Scheduler are rejected.")
    public void rejectionPolicyOther() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Future submit = Executors.newSingleThreadExecutor().submit(threadsConsumer(this.service.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1)), new Latch()));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    private Callable<Object> threadsConsumer(Scheduler scheduler, Latch latch) {
        return () -> {
            while (latch.getCount() > 0) {
                scheduler.submit(() -> {
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
            return null;
        };
    }
}
