package com.mulesoft.mule.test.batch;

import com.mulesoft.mule.runtime.module.batch.api.BatchManager;
import com.mulesoft.mule.runtime.module.batch.api.notification.BatchNotification;
import com.mulesoft.mule.runtime.module.batch.api.notification.BatchNotificationListener;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import io.qameta.allure.Description;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.notification.CustomNotification;
import org.mule.runtime.api.streaming.HasSize;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/mule/test/batch/BatchStopTestCase.class */
public class BatchStopTestCase extends AbstractBatchTestCase {
    private static final int STOP_AWAIT_TIMEOUT_SECS = 10;
    private static boolean stopped;
    private static Thread stoppingThread;
    private static int stoppedInvocations;
    private static BatchJobInstanceAdapter jobInstance;
    private static BatchManager batchManager;
    private StopListener stopListener;
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchStopTestCase.class);
    private static Latch stopLatch = new Latch();
    private static Latch stoppedLatch = new Latch();

    /* loaded from: input_file:com/mulesoft/mule/test/batch/BatchStopTestCase$AggregatorMessageProcessor.class */
    public static class AggregatorMessageProcessor extends AbstractComponent implements Processor {
        public synchronized CoreEvent process(CoreEvent coreEvent) throws MuleException {
            Object value = coreEvent.getMessage().getPayload().getValue();
            if (value instanceof List) {
                BatchStopTestCase.stoppedInvocations += ((List) value).size();
            } else if (value instanceof HasSize) {
                BatchStopTestCase.stoppedInvocations += ((HasSize) value).getSize();
            }
            return coreEvent;
        }
    }

    /* loaded from: input_file:com/mulesoft/mule/test/batch/BatchStopTestCase$StopFlowMessageProcessor.class */
    public static class StopFlowMessageProcessor extends AbstractComponent implements Processor {
        private final AtomicInteger count = new AtomicInteger(0);
        public static AtomicInteger countAfterStop = new AtomicInteger(0);
        public static AtomicBoolean flowStopped = new AtomicBoolean(false);

        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            if (this.count.incrementAndGet() == 50) {
                BatchStopTestCase.stopLatch.countDown();
            }
            if (flowStopped.get()) {
                countAfterStop.incrementAndGet();
            }
            return coreEvent;
        }
    }

    /* loaded from: input_file:com/mulesoft/mule/test/batch/BatchStopTestCase$StopListener.class */
    private class StopListener implements BatchNotificationListener {
        private StopListener() {
        }

        public boolean isBlocking() {
            return true;
        }

        public synchronized void onNotification(CustomNotification customNotification) {
            if (customNotification.getAction().getActionId() != BatchNotification.PROGRESS_UPDATE || BatchStopTestCase.stopped) {
                return;
            }
            try {
                BatchStopTestCase.batchManager.stop(((BatchNotification) customNotification).getJobInstance().getOwnerJobName(), ((BatchNotification) customNotification).getJobInstance().getId());
                boolean unused = BatchStopTestCase.stopped = true;
                BatchStopTestCase.stoppedLatch.countDown();
            } catch (Exception e) {
                BatchStopTestCase.LOGGER.error("Exception stopping context", e);
            }
        }
    }

    /* loaded from: input_file:com/mulesoft/mule/test/batch/BatchStopTestCase$StopMessageProcessor.class */
    public static class StopMessageProcessor extends AbstractComponent implements Processor {
        private final AtomicInteger count = new AtomicInteger(0);

        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            if (BatchStopTestCase.stoppingThread != Thread.currentThread() && this.count.get() > 50) {
                try {
                    BatchStopTestCase.stoppedLatch.await(10L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    throw new DefaultMuleException(e);
                }
            }
            if (BatchStopTestCase.jobInstance != null && this.count.incrementAndGet() == 50) {
                Thread unused = BatchStopTestCase.stoppingThread = Thread.currentThread();
                BatchStopTestCase.batchManager.stop(BatchStopTestCase.jobInstance.getOwnerJobName(), BatchStopTestCase.jobInstance.getId());
            }
            return coreEvent;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.mulesoft.mule.test.batch.AbstractBatchTestCase
    public void doSetUp() throws Exception {
        super.doSetUp();
        stopped = false;
        stopLatch = new Latch();
        stoppedLatch = new Latch();
        this.stopListener = new StopListener();
        stoppedInvocations = 0;
        jobInstance = null;
        batchManager = (BatchManager) this.registry.lookupByName("batch.manager").get();
        this.notificationListenerRegistry.registerListener(this.stopListener);
    }

    protected String getConfigFile() {
        return "batch-stop-config.xml";
    }

    @Test
    public void aggregatorFlushedUponStop() throws Exception {
        assertAggregatorIsFlushedUponStop("aggregatorFlushedUponStop", Matchers.greaterThanOrEqualTo(Integer.valueOf(getJob("aggregatorFlushedUponStop").getBlockSize())));
    }

    @Test
    public void streamingAggregatorFlushedUponStop() throws Exception {
        assertAggregatorIsFlushedUponStop("streamingAggregatorFlushedUponStop", CoreMatchers.is(0));
    }

    private void assertAggregatorIsFlushedUponStop(String str, Matcher<Integer> matcher) throws Exception {
        jobInstance = doTest(str, createRandomPayload(1000));
        awaitJobTermination();
        stoppedLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertThat(Integer.valueOf(stoppedInvocations), matcher);
    }

    @Test
    @Description("Batch work is not dispatched for batch jobs in stopped flows")
    public void stopFlowStopsDispatching() throws Exception {
        jobInstance = doTest("stopFlowStopsDispatching", createRandomPayload(1000));
        stopLatch.await(10L, TimeUnit.SECONDS);
        LifecycleUtils.stopIfNeeded(this.locator.find(Location.builder().globalName("stopFlowStopsDispatching").build()).get());
        StopFlowMessageProcessor.flowStopped.set(true);
        Assert.assertThat(Integer.valueOf(StopFlowMessageProcessor.countAfterStop.get()), CoreMatchers.is(0));
    }

    @Override // com.mulesoft.mule.test.batch.AbstractBatchTestCase
    protected boolean isTestMemoryLeaks() {
        return false;
    }
}
