package org.mule.test.module.extension.source;

import io.qameta.allure.Feature;
import io.qameta.allure.Stories;
import io.qameta.allure.Story;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.number.OrderingComparison;
import org.junit.ClassRule;
import org.junit.Test;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.tck.probe.PollingProber;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;
import org.mule.test.petstore.extension.WatermarkingPetAdoptionSource;

@Feature("Sources")
@Stories({@Story("Polling"), @Story("Component life cycle")})
/* loaded from: input_file:org/mule/test/module/extension/source/PollingSourceRestartingTestCase.class */
public class PollingSourceRestartingTestCase extends AbstractExtensionFunctionalTestCase {

    @ClassRule
    public static SystemProperty disableCacheComponentBuildingDefinitionRegistry = new SystemProperty("mule..functionalTest.cacheComponentBuildingDefinitionRegistry.disableOverride", "true");
    private static int PROBER_TIMEOUT = HeisenbergMessageSourceTestCase.TIMEOUT_MILLIS;
    private static int CHECK_NOT_PROBER_TIMEOUT = 5000;
    private static int PROBER_FREQUENCY = 500;
    protected static MultiMap<Integer, String> ADOPTIONS = new MultiMap<>();

    /* loaded from: input_file:org/mule/test/module/extension/source/PollingSourceRestartingTestCase$AdoptionProcessor.class */
    public static class AdoptionProcessor implements Processor {
        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            String str = (String) coreEvent.getMessage().getPayload().getValue();
            Integer num = (Integer) coreEvent.getMessage().getAttributes().getValue();
            synchronized (PollingSourceRestartingTestCase.ADOPTIONS) {
                PollingSourceRestartingTestCase.ADOPTIONS.put(num, str);
            }
            return coreEvent;
        }
    }

    protected boolean mustRegenerateExtensionModels() {
        return true;
    }

    protected Map<String, Object> getExtensionLoaderContextAdditionalParameters() {
        return Collections.singletonMap("EXTENSION_LOADER_ENABLE_POLLING_SOURCE_LIMIT", true);
    }

    protected Map<String, String> artifactProperties() {
        return Collections.singletonMap("EXTENSION_LOADER_ENABLE_POLLING_SOURCE_LIMIT", "true");
    }

    protected String getConfigFile() {
        return "source/polling-source-restarting-config.xml";
    }

    protected void doTearDown() throws Exception {
        ADOPTIONS.clear();
        WatermarkingPetAdoptionSource.resetSource();
    }

    @Test
    public void unprocessedItemsAreProcessedWhenSourceIsRestartedMidPoll() throws Exception {
        assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara", "Colonel Meow", "Daphne", "Elsa"), "unprocessedItemsAreProcessedWhenSourceIsRestartedMidPoll");
    }

    @Test
    public void processedItemsWithSameWatermarkAreNotReprocessedWhenSourceIsRestartedMidPoll() throws Exception {
        assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara"), "processedItemsWithSameWatermarkAreNotReprocessedWhenSourceIsRestartedMidPoll");
    }

    @Test
    public void processedItemsWithNewWatermarkAreReprocessedWhenSourceIsRestartedMidPoll() throws Exception {
        assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara", "ANIBAL", "BARBARA", "Colonel Meow"), "processedItemsWithNewWatermarkAreReprocessedWhenSourceIsRestartedMidPoll");
    }

    @Test
    public void unprocessedItemsAreProcessedWhenSourceIsRestartedMidPollWithLimit() throws Exception {
        assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara", "Colonel Meow", "Daphne", "Elsa"), "unprocessedItemsAreProcessedWhenSourceIsRestartedMidPollWithLimit");
        assertLimitIsApplied(3);
    }

    @Test
    public void processedItemsWithSameWatermarkAreNotReprocessedWhenSourceIsRestartedMidPollWithLimit() throws Exception {
        assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara"), "processedItemsWithSameWatermarkAreNotReprocessedWhenSourceIsRestartedMidPollWithLimit");
        assertLimitIsApplied(2);
    }

    @Test
    public void processedItemsWithNewWatermarkAreReprocessedWhenSourceIsRestartedMidPollWithLimit() throws Exception {
        assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara", "ANIBAL", "BARBARA", "Colonel Meow"), "processedItemsWithNewWatermarkAreReprocessedWhenSourceIsRestartedMidPollWithLimit");
        assertLimitIsApplied(2);
    }

    private void assertWatermarkingForStopStartScenario(List<String> list, String str) throws Exception {
        startFlow(str);
        WatermarkingPetAdoptionSource.beginLatch.await();
        stopFlow(str);
        PollingProber.check(PROBER_TIMEOUT, PROBER_FREQUENCY, () -> {
            return Boolean.valueOf(getFlowConstruct(str).getLifecycleState().isStopped());
        });
        startFlow(str);
        waitForAllPetsToBeAdopted(list);
        checkNoMorePetsAdopted(list);
        assertAdoptedPets(list);
    }

    private void waitForAllPetsToBeAdopted(List<String> list) {
        PollingProber.check(PROBER_TIMEOUT, PROBER_FREQUENCY, () -> {
            return Boolean.valueOf(ADOPTIONS.size() == list.size());
        });
    }

    private void checkNoMorePetsAdopted(List<String> list) {
        PollingProber.checkNot(CHECK_NOT_PROBER_TIMEOUT, PROBER_FREQUENCY, () -> {
            return Boolean.valueOf(ADOPTIONS.size() > list.size());
        });
    }

    private void assertAdoptedPets(List<String> list) {
        ArrayList arrayList = new ArrayList();
        Iterator it = ADOPTIONS.keySet().iterator();
        while (it.hasNext()) {
            arrayList.addAll(ADOPTIONS.getAll((Integer) it.next()));
        }
        MatcherAssert.assertThat(arrayList, Matchers.contains(list.toArray()));
    }

    private void assertLimitIsApplied(int i) {
        int size = ADOPTIONS.keySet().size();
        for (int i2 = 0; i2 < size - 1; i2++) {
            MatcherAssert.assertThat(ADOPTIONS.getAll(Integer.valueOf(i2)), Matchers.hasSize(i));
        }
        MatcherAssert.assertThat(ADOPTIONS.getAll(Integer.valueOf(size - 1)), Matchers.hasSize(OrderingComparison.lessThanOrEqualTo(Integer.valueOf(i))));
    }

    private void startFlow(String str) throws Exception {
        getFlowConstruct(str).start();
    }

    private void stopFlow(String str) throws Exception {
        getFlowConstruct(str).stop();
    }
}
