package org.mule.service.scheduler.internal;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsCollectionContaining;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerBusyException;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.service.scheduler.internal.config.ContainerThreadPoolsConfig;
import org.mule.service.scheduler.internal.threads.SchedulerThreadPools;
import org.mule.service.scheduler.internal.util.Delegator;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;

@Feature("Scheduler Service")
/* loaded from: input_file:org/mule/service/scheduler/internal/SchedulerThreadPoolsTestCase.class */
public class SchedulerThreadPoolsTestCase extends AbstractMuleTestCase {
    private static final int CORES = Runtime.getRuntime().availableProcessors();
    private static final long GC_POLLING_TIMEOUT = 10000;
    private ContainerThreadPoolsConfig threadPoolsConfig;
    private SchedulerThreadPools service;

    @Rule
    public ExpectedException expected = ExpectedException.none();
    private long prestarCallbackSleepTime = 0;

    @Before
    public void before() throws MuleException {
        this.threadPoolsConfig = ContainerThreadPoolsConfig.loadThreadPoolsConfig();
        this.service = new SchedulerThreadPools(SchedulerThreadPoolsTestCase.class.getName(), this.threadPoolsConfig) { // from class: org.mule.service.scheduler.internal.SchedulerThreadPoolsTestCase.1
            protected void prestartCallback(CountDownLatch countDownLatch) {
                super.prestartCallback(countDownLatch);
                try {
                    Thread.sleep(SchedulerThreadPoolsTestCase.this.prestarCallbackSleepTime);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new MuleRuntimeException(e);
                }
            }
        };
        this.service.start();
    }

    @After
    public void after() throws MuleException, InterruptedException {
        if (this.service == null) {
            return;
        }
        Iterator it = new ArrayList(this.service.getSchedulers()).iterator();
        while (it.hasNext()) {
            ((Scheduler) it.next()).stop();
        }
        this.service.stop();
    }

    @Test
    @Description("Tests that the threads of the SchedulerService are correcly created and destroyed.")
    public void serviceStop() throws MuleException, InterruptedException {
        Assert.assertThat(collectThreadNames(), IsCollectionContaining.hasItem(StringStartsWith.startsWith("[MuleRuntime].")));
        this.service.stop();
        this.service = null;
        new PollingProber(500L, 50L).check(new JUnitLambdaProbe(() -> {
            Assert.assertThat(collectThreadNames(), CoreMatchers.not(IsCollectionContaining.hasItem(StringStartsWith.startsWith("[MuleRuntime]."))));
            return true;
        }));
    }

    @Test
    @Description("Tests that dispatching a task to a throttled scheduler already running its maximum tasks throws the appropriate exception.")
    public void executorRejects() throws MuleException, ExecutionException, InterruptedException {
        Latch latch = new Latch();
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        });
        Scheduler createCustomScheduler2 = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        });
        createCustomScheduler2.execute(() -> {
            awaitLatch(latch);
        });
        createCustomScheduler2.execute(() -> {
            awaitLatch(latch);
        });
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        Runnable runnable = () -> {
        };
        createCustomScheduler.submit(() -> {
            try {
                createCustomScheduler2.submit(runnable);
            } finally {
                Assert.assertThat(createCustomScheduler2.shutdownNow(), CoreMatchers.not(IsCollectionContaining.hasItem(runnable)));
            }
        }).get();
    }

    @Test
    @Description("Tests that a dispatched task has inherited the context classloader.")
    public void classLoaderPropagates() throws Exception {
        Scheduler createCpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        ClassLoader classLoader = (ClassLoader) Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(classLoader);
        createCpuLightScheduler.submit(() -> {
            Assert.assertThat(Thread.currentThread().getContextClassLoader(), CoreMatchers.sameInstance(classLoader));
        }).get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that a scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesScheduled() throws Exception {
        Scheduler createCpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        ClassLoader classLoader = (ClassLoader) Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(classLoader);
        Latch latch = new Latch();
        ScheduledFuture scheduledFuture = null;
        try {
            scheduledFuture = createCpuLightScheduler.scheduleWithFixedDelay(() -> {
                Assert.assertThat(Thread.currentThread().getContextClassLoader(), CoreMatchers.sameInstance(classLoader));
                latch.countDown();
            }, 0L, 60L, TimeUnit.SECONDS);
            latch.await(10L, TimeUnit.SECONDS);
            scheduledFuture.get(10L, TimeUnit.SECONDS);
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
        } catch (Throwable th) {
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
            throw th;
        }
    }

    @Test
    @Description("Tests that a cron-scheduled task has inherited the context classloader.")
    public void classLoaderPropagatesCron() throws Exception {
        Scheduler createCpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        ClassLoader classLoader = (ClassLoader) Mockito.mock(ClassLoader.class);
        Thread.currentThread().setContextClassLoader(classLoader);
        Latch latch = new Latch();
        ScheduledFuture scheduledFuture = null;
        try {
            scheduledFuture = createCpuLightScheduler.scheduleWithCronExpression(() -> {
                Assert.assertThat(Thread.currentThread().getContextClassLoader(), CoreMatchers.sameInstance(classLoader));
                latch.countDown();
            }, "*/1 * * ? * *");
            latch.await(10L, TimeUnit.SECONDS);
            scheduledFuture.get(10L, TimeUnit.SECONDS);
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
        } catch (Throwable th) {
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
            throw th;
        }
    }

    @Test
    @Description("Tests that a custom scheduler doesn't hold a reference to the context classloader that was in the context when it was created.")
    public void customPoolThreadsDontReferenceCreatorClassLoader() throws Exception {
        ClassLoader classLoader = new ClassLoader(getClass().getClassLoader()) { // from class: org.mule.service.scheduler.internal.SchedulerThreadPoolsTestCase.2
        };
        PhantomReference<ClassLoader> phantomReference = new PhantomReference<>(classLoader, new ReferenceQueue());
        scheduleToCustomWithClassLoader(classLoader);
        assertNoClassLoaderReferenceHeld(phantomReference, GC_POLLING_TIMEOUT);
    }

    public void scheduleToCustomWithClassLoader(ClassLoader classLoader) throws InterruptedException, ExecutionException {
        AtomicReference atomicReference = new AtomicReference();
        ClassUtils.withContextClassLoader(classLoader, () -> {
            atomicReference.set(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> {
                return 1000L;
            }));
            try {
                ((Scheduler) atomicReference.get()).submit(() -> {
                    Assert.assertThat(Thread.currentThread().getContextClassLoader(), Is.is(classLoader));
                }).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
        ((Scheduler) atomicReference.get()).submit(() -> {
            Assert.assertThat(Thread.currentThread().getContextClassLoader(), Is.is(classLoader.getParent()));
        }).get();
    }

    @Test
    @Description("Tests that a scheduler Executor thread doesn't hold a reference to an artifact classloader through the `inheritedAccessControlContext` when executing.")
    public void threadsDontReferenceClassLoaderFromAccessControlContext() throws Exception {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> {
            return 1000L;
        });
        ClassLoader createDelegatorClassLoader = createDelegatorClassLoader();
        PhantomReference<ClassLoader> phantomReference = new PhantomReference<>(createDelegatorClassLoader, new ReferenceQueue());
        ((Consumer) createDelegatorClassLoader.loadClass(Delegator.class.getName()).newInstance()).accept(() -> {
            createCustomScheduler.execute(() -> {
            });
        });
        assertNoClassLoaderReferenceHeld(phantomReference, GC_POLLING_TIMEOUT);
    }

    @Test
    @Description("Tests that a scheduler Executor thread doesn't hold a reference to an artifact classloader through the `inheritedAccessControlContext` when created.")
    public void threadsDontReferenceClassLoaderFromAccessControlContextWhenCreated() throws Exception {
        ClassLoader createDelegatorClassLoader = createDelegatorClassLoader();
        PhantomReference<ClassLoader> phantomReference = new PhantomReference<>(createDelegatorClassLoader, new ReferenceQueue());
        AtomicReference atomicReference = new AtomicReference();
        ((Consumer) createDelegatorClassLoader.loadClass(Delegator.class.getName()).newInstance()).accept(() -> {
            atomicReference.set(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> {
                return 1000L;
            }));
        });
        assertNoClassLoaderReferenceHeld(phantomReference, GC_POLLING_TIMEOUT);
    }

    @Test
    @Description("Tests that IO threads in excess of the core size don't hold a reference to an artifact classloader through the inheritedAccessControlContext.")
    public void elasticIoThreadsDontReferenceClassLoaderFromAccessControlContext() throws Exception {
        Assert.assertThat(Long.valueOf(this.threadPoolsConfig.getIoKeepAlive().getAsLong()), Matchers.greaterThan(Long.valueOf(GC_POLLING_TIMEOUT)));
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config(), this.threadPoolsConfig.getIoCorePoolSize().getAsInt() + 1, () -> {
            return 1000L;
        });
        ClassLoader createDelegatorClassLoader = createDelegatorClassLoader();
        PhantomReference<ClassLoader> phantomReference = new PhantomReference<>(createDelegatorClassLoader, new ReferenceQueue());
        Consumer consumer = (Consumer) createDelegatorClassLoader.loadClass(Delegator.class.getName()).newInstance();
        for (int i = 0; i < this.threadPoolsConfig.getIoCorePoolSize().getAsInt() + 1; i++) {
            consumer.accept(() -> {
                createIoScheduler.execute(() -> {
                });
            });
        }
        assertNoClassLoaderReferenceHeld(phantomReference, GC_POLLING_TIMEOUT);
    }

    private ClassLoader createDelegatorClassLoader() {
        return new ClassLoader(getClass().getClassLoader()) { // from class: org.mule.service.scheduler.internal.SchedulerThreadPoolsTestCase.3
            @Override // java.lang.ClassLoader
            public Class<?> loadClass(String str) throws ClassNotFoundException {
                if (!Delegator.class.getName().equals(str)) {
                    return super.loadClass(str);
                }
                try {
                    byte[] byteArray = IOUtils.toByteArray(getClass().getResourceAsStream("/org/mule/service/scheduler/internal/util/Delegator.class"));
                    return defineClass(null, byteArray, 0, byteArray.length);
                } catch (Exception e) {
                    return super.loadClass(str);
                }
            }
        };
    }

    private void assertNoClassLoaderReferenceHeld(PhantomReference<ClassLoader> phantomReference, long j) {
        new PollingProber(j, 100L).check(new JUnitLambdaProbe(() -> {
            System.gc();
            Assert.assertThat(Boolean.valueOf(phantomReference.isEnqueued()), Is.is(true));
            return true;
        }, "A hard reference is being mantained to the child ClassLoader."));
    }

    @Test
    public void threadGroupOfCustomSchedulerNotLeakedAfterShutdownNow() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> {
            return 1000L;
        });
        List<PhantomReference> recordReferences = recordReferences(createCustomScheduler);
        createCustomScheduler.shutdownNow();
        assertNoThreadGroupReferenceHeld(recordReferences);
    }

    @Test
    public void threadGroupOfCustomSchedulerNotLeakedAfterStop() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> {
            return 1000L;
        });
        List<PhantomReference> recordReferences = recordReferences(createCustomScheduler);
        createCustomScheduler.stop();
        assertNoThreadGroupReferenceHeld(recordReferences);
    }

    private List<PhantomReference> recordReferences(Scheduler scheduler) throws InterruptedException, ExecutionException, TimeoutException {
        ArrayList arrayList = new ArrayList();
        scheduler.submit(() -> {
            arrayList.add(new PhantomReference(Thread.currentThread(), new ReferenceQueue()));
            arrayList.add(new PhantomReference(Thread.currentThread().getThreadGroup(), new ReferenceQueue()));
            return true;
        }).get(5L, TimeUnit.SECONDS);
        return arrayList;
    }

    private void assertNoThreadGroupReferenceHeld(List<PhantomReference> list) {
        new PollingProber(GC_POLLING_TIMEOUT, 100L).check(new JUnitLambdaProbe(() -> {
            System.gc();
            list.forEach(phantomReference -> {
                Assert.assertThat(phantomReference.toString(), Boolean.valueOf(phantomReference.isEnqueued()), Is.is(true));
            });
            return true;
        }, "A hard reference is being mantained to the scheduler threads/thread group."));
    }

    @Test
    public void customSchedulerPrestarted() throws Exception {
        this.prestarCallbackSleepTime = 1000L;
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> {
            return 1000L;
        }).submit(() -> {
            SchedulerConfig withMaxConcurrentTasks = SchedulerConfig.config().withMaxConcurrentTasks(1);
            for (int i = 0; i < 10; i++) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                Scheduler createCustomScheduler = this.service.createCustomScheduler(withMaxConcurrentTasks, 1, () -> {
                    return 0L;
                });
                try {
                    createCustomScheduler.submit(() -> {
                        countDownLatch.countDown();
                    });
                    try {
                        countDownLatch.await(5L, TimeUnit.SECONDS);
                        createCustomScheduler.stop();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        createCustomScheduler.stop();
                        return;
                    }
                } catch (Throwable th) {
                    createCustomScheduler.stop();
                    throw th;
                }
            }
        }).get();
    }

    @Test
    public void customSchedulerShutdownFromWithin() throws Exception {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), 1, () -> {
            return 1000L;
        });
        AtomicReference atomicReference = new AtomicReference();
        Future submit = createCustomScheduler.submit(() -> {
            atomicReference.set(Thread.currentThread().getThreadGroup());
            createCustomScheduler.stop();
        });
        this.expected.expect(CancellationException.class);
        try {
            submit.get(10L, TimeUnit.SECONDS);
            new PollingProber().check(new JUnitLambdaProbe(() -> {
                Assert.assertThat("Shutdown", Boolean.valueOf(createCustomScheduler.isShutdown()), Is.is(true));
                Assert.assertThat("Terminated", Boolean.valueOf(createCustomScheduler.isTerminated()), Is.is(true));
                Assert.assertThat("Destroyed", Boolean.valueOf(((ThreadGroup) atomicReference.get()).isDestroyed()), Is.is(true));
                return true;
            }));
        } catch (Throwable th) {
            new PollingProber().check(new JUnitLambdaProbe(() -> {
                Assert.assertThat("Shutdown", Boolean.valueOf(createCustomScheduler.isShutdown()), Is.is(true));
                Assert.assertThat("Terminated", Boolean.valueOf(createCustomScheduler.isTerminated()), Is.is(true));
                Assert.assertThat("Destroyed", Boolean.valueOf(((ThreadGroup) atomicReference.get()).isDestroyed()), Is.is(true));
                return true;
            }));
            throw th;
        }
    }

    @Test
    public void customSchedulerShutdownFromWithinDelayed() throws Exception {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(2), 2, () -> {
            return 1000L;
        });
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        createCustomScheduler.submit(() -> {
            while (!atomicBoolean.get()) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        Future submit = createCustomScheduler.submit(() -> {
            atomicReference.set(Thread.currentThread().getThreadGroup());
            createCustomScheduler.stop();
        });
        this.expected.expect(CancellationException.class);
        try {
            submit.get(10L, TimeUnit.SECONDS);
            atomicBoolean.set(true);
            new PollingProber().check(new JUnitLambdaProbe(() -> {
                Assert.assertThat("Shutdown", Boolean.valueOf(createCustomScheduler.isShutdown()), Is.is(true));
                Assert.assertThat("Terminated", Boolean.valueOf(createCustomScheduler.isTerminated()), Is.is(true));
                Assert.assertThat("Destroyed", Boolean.valueOf(((ThreadGroup) atomicReference.get()).isDestroyed()), Is.is(true));
                return true;
            }));
        } catch (Throwable th) {
            atomicBoolean.set(true);
            new PollingProber().check(new JUnitLambdaProbe(() -> {
                Assert.assertThat("Shutdown", Boolean.valueOf(createCustomScheduler.isShutdown()), Is.is(true));
                Assert.assertThat("Terminated", Boolean.valueOf(createCustomScheduler.isTerminated()), Is.is(true));
                Assert.assertThat("Destroyed", Boolean.valueOf(((ThreadGroup) atomicReference.get()).isDestroyed()), Is.is(true));
                return true;
            }));
            throw th;
        }
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuLightScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> {
            return 1000L;
        });
    }

    @Test
    public void onlyCustomMayConfigureWaitCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> {
            return 1000L;
        });
    }

    @Test
    public void onlyCustomMayConfigureWaitIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'waitAllowed' behaviour");
        this.service.createIoScheduler(SchedulerConfig.config().withWaitAllowed(true), CORES, () -> {
            return 1000L;
        });
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyCpuLight() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createCpuLightScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> {
            return 1000L;
        });
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyCpuIntensive() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> {
            return 1000L;
        });
    }

    @Test
    public void onlyCustomMayConfigureDirectRunCpuLightWhenTargetBusyIO() {
        this.expected.expect(IllegalArgumentException.class);
        this.expected.expectMessage("Only custom schedulers may define 'directRunCpuLightWhenTargetBusy' behaviour");
        this.service.createIoScheduler(SchedulerConfig.config().withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> {
            return 1000L;
        });
    }

    @Test
    @Description("Tests that tasks dispatched from a CPU Light thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuLight() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Future submit = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        }).submit(threadsConsumer(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        }), new Latch()));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from a CPU Intensive thread to a busy Scheduler are rejected.")
    public void rejectionPolicyCpuIntensive() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Future submit = this.service.createCpuIntensiveScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        }).submit(threadsConsumer(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        }), new Latch()));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from an IO thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyIO() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        Future submit = createIoScheduler.submit(threadsConsumer(createCustomScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (TimeoutException e) {
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that when the IO pool is full, any task dispatched from IO to IO runs in the caller thread instead of being queued, which can cause a deadlock.")
    public void ioToFullIoDoesntWait() throws InterruptedException, ExecutionException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        Latch latch2 = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt() - 1; i++) {
            consumeThread(createIoScheduler, latch);
        }
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Future submit = createIoScheduler.submit(() -> {
            atomicReference.set(Thread.currentThread());
            createIoScheduler.submit(() -> {
                atomicReference2.set(Thread.currentThread());
                latch2.countDown();
            });
            return Boolean.valueOf(awaitLatch(latch));
        });
        Assert.assertThat(Boolean.valueOf(latch2.await(5L, TimeUnit.SECONDS)), Is.is(true));
        latch.countDown();
        Assert.assertThat(submit.get(), Is.is(true));
        Assert.assertThat(atomicReference2.get(), Is.is(atomicReference.get()));
    }

    @Test
    @Description("Tests that when the IO pool is full, any task dispatched from a CUSTOM pool with WAIT rejection action to IO is queued.")
    public void customWaitToFullIoWaits() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withWaitAllowed(true), CORES, () -> {
            return 1000L;
        });
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt(); i++) {
            consumeThread(createIoScheduler, latch);
        }
        Future submit = createCustomScheduler.submit(() -> {
            createIoScheduler.submit(() -> {
            });
            Assert.fail("Didn't wait");
            return null;
        });
        this.expected.expect(TimeoutException.class);
        try {
            submit.get(5L, TimeUnit.SECONDS);
            latch.countDown();
        } catch (Throwable th) {
            latch.countDown();
            throw th;
        }
    }

    @Test
    @Description("Tests that when the CPU-lite pool is full, any task dispatched from a CUSTOM pool with DirectRunToFullCpuLight falg to CPU-lite is run directlyi in the caller thread.")
    public void customDirectRunToFullCpuLight() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> {
            return 1000L;
        });
        Scheduler createCpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getCpuLightPoolSize().getAsInt() + this.threadPoolsConfig.getCpuLightQueueSize().getAsInt(); i++) {
            consumeThread(createCpuLightScheduler, latch);
        }
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        try {
            createCustomScheduler.submit(() -> {
                atomicReference.set(Thread.currentThread());
                createCpuLightScheduler.submit(() -> {
                    atomicReference2.set(Thread.currentThread());
                });
                return null;
            }).get(5L, TimeUnit.SECONDS);
            latch.countDown();
            Assert.assertThat(atomicReference2.get(), CoreMatchers.sameInstance(atomicReference.get()));
        } catch (Throwable th) {
            latch.countDown();
            throw th;
        }
    }

    @Test
    @Description("Tests that the behavior of combining runCpuLightWhenTargetBusy and waitAllowed depends on the target thread.")
    public void customWaitToFullIoWaitsAndWaitToFullIoWaits() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withWaitAllowed(true).withDirectRunCpuLightWhenTargetBusy(true), CORES, () -> {
            return 1000L;
        });
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        Scheduler createCpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        for (int i = 0; i < this.threadPoolsConfig.getIoMaxPoolSize().getAsInt(); i++) {
            consumeThread(createIoScheduler, latch);
        }
        for (int i2 = 0; i2 < this.threadPoolsConfig.getCpuLightPoolSize().getAsInt() + this.threadPoolsConfig.getCpuLightQueueSize().getAsInt(); i2++) {
            consumeThread(createCpuLightScheduler, latch);
        }
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Future submit = createCustomScheduler.submit(() -> {
            atomicReference.set(Thread.currentThread());
            createCpuLightScheduler.submit(() -> {
                atomicReference2.set(Thread.currentThread());
            });
            return null;
        });
        Future submit2 = createCustomScheduler.submit(() -> {
            createIoScheduler.submit(() -> {
            });
            Assert.fail("Didn't wait");
            return null;
        });
        try {
            submit.get(5L, TimeUnit.SECONDS);
            Assert.assertThat(atomicReference2.get(), CoreMatchers.sameInstance(atomicReference.get()));
            this.expected.expect(TimeoutException.class);
            submit2.get(5L, TimeUnit.SECONDS);
            latch.countDown();
        } catch (Throwable th) {
            latch.countDown();
            throw th;
        }
    }

    @Test
    @Description("Tests that periodic tasks scheduled to a busy Scheduler are skipped but the job continues executing.")
    public void rejectionPolicyScheduledPeriodic() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(2), CORES, () -> {
            return 1000L;
        });
        Scheduler createCpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        try {
            createCustomScheduler.submit(threadsConsumer(createCpuLightScheduler, latch)).get(1L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertThat(e.getCause(), IsInstanceOf.instanceOf(SchedulerBusyException.class));
        }
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicReference atomicReference = new AtomicReference(null);
        createCustomScheduler.submit(() -> {
            atomicReference.set(createCpuLightScheduler.scheduleWithFixedDelay(() -> {
                countDownLatch.countDown();
            }, 0L, 1L, TimeUnit.SECONDS));
            return null;
        });
        new PollingProber().check(new JUnitLambdaProbe(() -> {
            Assert.assertThat(Boolean.valueOf(((ScheduledFuture) atomicReference.get()).isDone()), Is.is(true));
            return true;
        }));
        latch.countDown();
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(5L, TimeUnit.SECONDS)), Is.is(true));
    }

    @Test
    @Description("Tests that tasks dispatched from a Custom scheduler thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustom() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Future submit = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        }).submit(threadsConsumer(this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        }), new Latch()));
        this.expected.expect(ExecutionException.class);
        this.expected.expectCause(IsInstanceOf.instanceOf(SchedulerBusyException.class));
        submit.get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from a Custom scheduler with 'Wait' allowed thread to a busy Scheduler waits for execution.")
    public void rejectionPolicyCustomWithConfig() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withWaitAllowed(true).withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        }, 1);
        Scheduler createCustomScheduler2 = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        Future submit = createCustomScheduler.submit(threadsConsumer(createCustomScheduler2, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (TimeoutException e) {
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that tasks dispatched from any other thread to a busy Scheduler are rejected.")
    public void rejectionPolicyOther() throws MuleException, InterruptedException, ExecutionException, TimeoutException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        });
        Latch latch = new Latch();
        Future submit = newSingleThreadExecutor.submit(threadsConsumer(createCustomScheduler, latch));
        try {
            submit.get(1L, TimeUnit.SECONDS);
            Assert.fail();
        } catch (TimeoutException e) {
        }
        latch.countDown();
        submit.get(5L, TimeUnit.SECONDS);
    }

    @Test
    public void customSchedulerThreadGroupDestroy() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Scheduler createCustomScheduler = this.service.createCustomScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1), CORES, () -> {
            return 1000L;
        });
        createCustomScheduler.submit(() -> {
            atomicReference.set(Executors.newCachedThreadPool());
        });
        PollingProber.probe(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Latch latch = new Latch();
        ((ExecutorService) atomicReference.get()).submit(() -> {
            try {
                return Boolean.valueOf(latch.await(getTestTimeoutSecs(), TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                atomicBoolean.set(true);
                Thread.currentThread().interrupt();
                return false;
            }
        });
        createCustomScheduler.stop();
        latch.countDown();
        PollingProber.probe(5000L, 100L, () -> {
            return Boolean.valueOf(atomicBoolean.get());
        });
    }

    @Test
    @Description("Tests that ThrottledScheduler is not used for CPU light schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxCpuLightConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat(this.service.createCpuLightScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuLightPoolSize().getAsInt()), 1, () -> {
            return 1L;
        }), CoreMatchers.not(IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat(this.service.createCpuLightScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuLightPoolSize().getAsInt() - 1), 1, () -> {
            return 1L;
        }), IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    @Test
    @Description("Tests that ThrottledScheduler is not used for CPU intensive schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxCpuIntensiveConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat(this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuIntensivePoolSize().getAsInt()), 1, () -> {
            return 1L;
        }), CoreMatchers.not(IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat(this.service.createCpuIntensiveScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getCpuIntensivePoolSize().getAsInt() - 1), 1, () -> {
            return 1L;
        }), IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    @Test
    @Description("Tests that ThrottledScheduler is not used for IO schedulers unless maxConcurrency is less than backing pool max size.")
    public void maxIOConcurrencyMoreThanMaxPoolSizeDoesntUseThrottlingScheduler() {
        Assert.assertThat(this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getIoMaxPoolSize().getAsInt()), 1, () -> {
            return 1L;
        }), CoreMatchers.not(IsInstanceOf.instanceOf(ThrottledScheduler.class)));
        Assert.assertThat(this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.threadPoolsConfig.getIoMaxPoolSize().getAsInt() - 1), 1, () -> {
            return 1L;
        }), IsInstanceOf.instanceOf(ThrottledScheduler.class));
    }

    private Callable<Object> threadsConsumer(Scheduler scheduler, Latch latch) {
        return () -> {
            while (latch.getCount() > 0) {
                consumeThread(scheduler, latch);
            }
            return null;
        };
    }

    private void consumeThread(Scheduler scheduler, Latch latch) {
        scheduler.submit(() -> {
            awaitLatch(latch);
        });
    }

    private boolean awaitLatch(Latch latch) {
        try {
            return latch.await(getTestTimeoutSecs(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}
