package org.mule.service.scheduler.internal;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsCollectionWithSize;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.service.scheduler.internal.config.ContainerThreadPoolsConfig;
import org.mule.service.scheduler.internal.executor.SchedulerTaskThrottledException;
import org.mule.service.scheduler.internal.threads.SchedulerThreadPools;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;

@Feature("Scheduler Throttling")
/* loaded from: input_file:org/mule/service/scheduler/internal/ThrottledSchedulerThrottleTestCase.class */
public class ThrottledSchedulerThrottleTestCase extends BaseDefaultSchedulerTestCase {
    private static final int THROTTLE_SIZE = 2;
    private static final int SINGLE_TASK_THROTTLE_SIZE = 1;
    private ExecutorService outerExecutor;
    private ContainerThreadPoolsConfig threadPoolsConfig;
    private SchedulerThreadPools service;

    @Override // org.mule.service.scheduler.internal.BaseDefaultSchedulerTestCase
    @Before
    public void before() throws Exception {
        super.before();
        this.outerExecutor = Executors.newSingleThreadExecutor();
        this.threadPoolsConfig = ContainerThreadPoolsConfig.loadThreadPoolsConfig();
        this.service = new SchedulerThreadPools(SchedulerThreadPoolsTestCase.class.getName(), this.threadPoolsConfig);
        this.service.start();
    }

    @Override // org.mule.service.scheduler.internal.BaseDefaultSchedulerTestCase
    @After
    public void after() throws Exception {
        if (this.service == null) {
            return;
        }
        Iterator it = new ArrayList(this.service.getSchedulers()).iterator();
        while (it.hasNext()) {
            ((Scheduler) it.next()).stop();
        }
        this.service.stop();
        this.outerExecutor.shutdownNow();
        this.outerExecutor.awaitTermination(5L, TimeUnit.SECONDS);
        super.after();
    }

    @Test
    @Description("Tests that a 'maxConcurrentTasks=1' configuration allows to execute a single task")
    public void oneConcurrentTaskSupported() throws InterruptedException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        Latch latch = new Latch();
        createIoScheduler.submit(() -> {
            latch.countDown();
        });
        if (latch.await(200L, TimeUnit.MILLISECONDS)) {
            return;
        }
        Assert.fail("Task never executed");
    }

    @Test
    @Description("Tests that the throttler count is consistent after task cancellation")
    public void interruptionUpdatesThrottleCounterCorrectly() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        this.service.createCpuLightScheduler(SchedulerConfig.config(), THROTTLE_SIZE, () -> {
            return 5000L;
        }).submit(() -> {
            createIoScheduler.submit(() -> {
                try {
                    Thread.sleep(60000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).cancel(true);
            CountDownLatch countDownLatch = new CountDownLatch(THROTTLE_SIZE);
            doSchedule(createIoScheduler, countDownLatch);
            try {
                doSchedule(createIoScheduler, countDownLatch);
                Assert.fail("Not rejected: " + createIoScheduler.toString());
            } catch (RejectedExecutionException e) {
            }
        }).get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that the throttler count is consistent after scheduled task pre-cancellation")
    public void cancellationOfScheduledUpdatesThrottleCounterCorrectly() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        this.service.createCpuLightScheduler(SchedulerConfig.config(), THROTTLE_SIZE, () -> {
            return 5000L;
        }).submit(() -> {
            createIoScheduler.schedule(() -> {
            }, 1L, TimeUnit.HOURS).cancel(true);
            CountDownLatch countDownLatch = new CountDownLatch(THROTTLE_SIZE);
            doSchedule(createIoScheduler, countDownLatch);
            try {
                doSchedule(createIoScheduler, countDownLatch);
                Assert.fail("Not rejected: " + createIoScheduler.toString());
            } catch (RejectedExecutionException e) {
            }
        }).get(60L, TimeUnit.SECONDS);
    }

    @Test
    @Description("Tests that the throttler count is consistent after scheduled task cancellation")
    public void interruptionDuringExecutionOfScheduledUpdatesThrottleCounterCorrectly() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        this.service.createCpuLightScheduler(SchedulerConfig.config(), THROTTLE_SIZE, () -> {
            return 5000L;
        }).submit(() -> {
            ScheduledFuture<?> schedule = createIoScheduler.schedule(() -> {
                try {
                    Thread.sleep(60000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, 10L, TimeUnit.MILLISECONDS);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assert.fail("Interrupted");
            }
            schedule.cancel(true);
            CountDownLatch countDownLatch = new CountDownLatch(THROTTLE_SIZE);
            doSchedule(createIoScheduler, countDownLatch);
            try {
                doSchedule(createIoScheduler, countDownLatch);
                Assert.fail("Not rejected: " + createIoScheduler.toString());
            } catch (RejectedExecutionException e2) {
            }
        }).get(60L, TimeUnit.SECONDS);
    }

    private void doSchedule(ScheduledExecutorService scheduledExecutorService, CountDownLatch countDownLatch) {
        scheduledExecutorService.submit(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    @Test
    @Description("Tests that the throttler count is decreased after scheduled task completion")
    @Issue("MULE-18909")
    public void scheduledTaskMustDecrementThrottlingCounterAfterExecution() {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        CountDownLatch countDownLatch = new CountDownLatch(THROTTLE_SIZE);
        countDownLatch.getClass();
        createIoScheduler.schedule(countDownLatch::countDown, 1L, TimeUnit.MILLISECONDS);
        countDownLatch.getClass();
        createIoScheduler.schedule(countDownLatch::countDown, 1000L, TimeUnit.MILLISECONDS);
        Assert.assertThat("Second task should have been executed", Boolean.valueOf(awaitLatch(countDownLatch)), CoreMatchers.is(true));
    }

    @Test
    @Description("Tests that the throttler count is decreased after scheduled task completion")
    @Issue("MULE-18909")
    public void scheduledTaskMustDecrementThrottlingCounterAfterExecutionNested() {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        CountDownLatch countDownLatch = new CountDownLatch(SINGLE_TASK_THROTTLE_SIZE);
        createIoScheduler.schedule(() -> {
            countDownLatch.getClass();
            return createIoScheduler.schedule(countDownLatch::countDown, 1000L, TimeUnit.MILLISECONDS);
        }, 1000L, TimeUnit.MILLISECONDS);
        Assert.assertThat("Second task should have been executed", Boolean.valueOf(awaitLatch(countDownLatch)), CoreMatchers.is(true));
    }

    @Test
    @Description("Tests that a task submitted in excess of 'maxConcurrentTasks' waits until another task finishes before executing.")
    public void throttledTask() throws InterruptedException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(THROTTLE_SIZE), THROTTLE_SIZE, () -> {
            return 5000L;
        });
        Latch latch = new Latch();
        for (int i = 0; i < THROTTLE_SIZE; i += SINGLE_TASK_THROTTLE_SIZE) {
            createIoScheduler.execute(() -> {
                awaitLatch(latch);
            });
        }
        Future<?> submit = this.outerExecutor.submit(() -> {
            createIoScheduler.execute(() -> {
            });
        });
        Thread.sleep(10L);
        Assert.assertThat(Boolean.valueOf(submit.isDone()), CoreMatchers.is(false));
        latch.countDown();
        new PollingProber(100L, 10L).check(new JUnitLambdaProbe(() -> {
            Assert.assertThat(Boolean.valueOf(submit.isDone()), CoreMatchers.is(true));
            return true;
        }));
    }

    @Test
    @Description("Tests that a task submitted in excess of 'maxConcurrentTasks' is rejected when called from a cpu-processing thread.")
    public void throttledTaskRejected() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        Latch latch = new Latch();
        for (int i = 0; i < THROTTLE_SIZE; i += SINGLE_TASK_THROTTLE_SIZE) {
            createIoScheduler.execute(() -> {
                awaitLatch(latch);
            });
        }
        this.service.createCpuLightScheduler(SchedulerConfig.config(), THROTTLE_SIZE, () -> {
            return 5000L;
        }).submit(() -> {
            try {
                createIoScheduler.execute(() -> {
                });
                Assert.fail("Expected the task to be rejected with a 'SchedulerTaskThrottledException'");
            } catch (SchedulerTaskThrottledException e) {
            }
        }).get(1L, TimeUnit.SECONDS);
    }

    @Test
    @Description("A deadlock does not happen when dispatching max+2 tasks to a throttled scheduler")
    @Issue("MULE-17938")
    public void maxPlusTwoNoDeadlockWaitGroup() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        Scheduler createIoScheduler2 = this.service.createIoScheduler(SchedulerConfig.config(), 3, () -> {
            return 5000L;
        });
        Latch latch = new Latch();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i += SINGLE_TASK_THROTTLE_SIZE) {
            createIoScheduler2.execute(() -> {
                arrayList.add(createIoScheduler.submit(() -> {
                    return Boolean.valueOf(awaitLatch(latch));
                }));
            });
        }
        Thread.sleep(1000L);
        latch.countDown();
        PollingProber.probe(() -> {
            Assert.assertThat(arrayList, IsCollectionWithSize.hasSize(3));
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Assert.assertThat(((Future) it.next()).get(1L, TimeUnit.SECONDS), CoreMatchers.is(true));
            }
            return true;
        });
    }

    @Test
    @Description("A deadlock does not happen when dispatching max+2 tasks to a throttled scheduler")
    @Issue("MULE-17938")
    public void maxPlusTwoNoDeadlockNotWaitGroup() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        Scheduler createCpuLightScheduler = this.service.createCpuLightScheduler(SchedulerConfig.config(), 3, () -> {
            return 5000L;
        });
        Latch latch = new Latch();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i += SINGLE_TASK_THROTTLE_SIZE) {
            createCpuLightScheduler.execute(() -> {
                arrayList.add(createIoScheduler.submit(() -> {
                    return Boolean.valueOf(awaitLatch(latch));
                }));
            });
        }
        Thread.sleep(1000L);
        latch.countDown();
        PollingProber.probe(() -> {
            Assert.assertThat(arrayList, IsCollectionWithSize.hasSize(SINGLE_TASK_THROTTLE_SIZE));
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Assert.assertThat(((Future) it.next()).get(1L, TimeUnit.SECONDS), CoreMatchers.is(true));
            }
            return true;
        });
        createCpuLightScheduler.submit(() -> {
            arrayList.add(createIoScheduler.submit(() -> {
                return true;
            }));
        }).get(1L, TimeUnit.SECONDS);
    }

    @Test
    @Description("A throttled scheduler may accept many scheduled tasks and throttle them when they actually execute.")
    @Issue("MULE-18053")
    public void scheduleOnThrottledScheduler() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        Latch latch = new Latch();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < THROTTLE_SIZE; i += SINGLE_TASK_THROTTLE_SIZE) {
            arrayList.add(createIoScheduler.schedule(() -> {
                return Boolean.valueOf(awaitLatch(latch));
            }, 1L, TimeUnit.SECONDS));
        }
        PollingProber.probe(() -> {
            Assert.assertThat(arrayList, IsCollectionWithSize.hasSize(THROTTLE_SIZE));
            return true;
        });
        latch.countDown();
        PollingProber.probe(() -> {
            Assert.assertThat(createIoScheduler.toString(), createIoScheduler.toString(), Matchers.endsWith("(throttling: 0/1)"));
            return true;
        });
    }

    @Test
    @Description("A throttled scheduler may accept many scheduled tasks and throttle them when they actually execute.")
    @Issue("MULE-18053")
    public void scheduleOnThrottledSchedulerCancelled() throws InterruptedException, ExecutionException, TimeoutException {
        Scheduler createIoScheduler = this.service.createIoScheduler(SchedulerConfig.config().withMaxConcurrentTasks(SINGLE_TASK_THROTTLE_SIZE), SINGLE_TASK_THROTTLE_SIZE, () -> {
            return 5000L;
        });
        Latch latch = new Latch();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < THROTTLE_SIZE; i += SINGLE_TASK_THROTTLE_SIZE) {
            arrayList.add(createIoScheduler.schedule(() -> {
                return Boolean.valueOf(awaitLatch(latch));
            }, 1L, TimeUnit.SECONDS));
        }
        PollingProber.probe(() -> {
            Assert.assertThat(arrayList, IsCollectionWithSize.hasSize(THROTTLE_SIZE));
            return true;
        });
        arrayList.forEach(future -> {
            future.cancel(true);
        });
        latch.countDown();
        PollingProber.probe(() -> {
            Assert.assertThat(createIoScheduler.toString(), createIoScheduler.toString(), Matchers.endsWith("(throttling: 0/1)"));
            return true;
        });
    }
}
