package com.mulesoft.mule.test.cluster.router;

import com.mulesoft.mule.test.cluster.AbstractClusterTestCase;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mule.functional.util.FlowExecutionLogger;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.test.runner.ArtifactClassLoaderRunnerConfig;

@ArtifactClassLoaderRunnerConfig(testRunnerExportedRuntimeLibs = {"org.mule.tests:mule-tests-functional"})
/* loaded from: input_file:com/mulesoft/mule/test/cluster/router/ClusteredAggregatorTestCase.class */
public class ClusteredAggregatorTestCase extends AbstractClusterTestCase {
    private static final String GROUP_ID_VARIABLE_KEY = "gid";
    private static final String AGGREGATION_COMPLETE_ROUTE_KEY = "aggregationComplete";
    private static final String INCREMENTAL_AGGREGATION_ROUTE_KEY = "incrementalAggregation";
    private static final String LISTENER_ROUTE_KEY = "listenerCalled";
    private static final String PROCESSED_GROUP_ID_KEY = "idOfGroup";
    private static final String DEFAULT_GROUP_ID = "clusterizedGroupId";

    @Rule
    public SystemProperty schedulingTasksPeriod = new SystemProperty("mule.aggregatorsSchedulingPeriod", "1000");

    @Before
    public void reset() {
        FlowExecutionLogger.resetLogsMap();
    }

    protected String getConfigFile() {
        return "com/mulesoft/mule/cluster/functional/aggregators-config.xml";
    }

    @Test
    public void incrementalAggregationByGroup() throws Exception {
        assertIncrementalAggregation("incrementalAggregationByGroup");
    }

    @Test
    public void incrementalAggregationBySize() throws Exception {
        assertIncrementalAggregation("incrementalAggregationBySize");
    }

    @Test
    public void incrementalAggregationByTime() throws Exception {
        assertIncrementalAggregation("incrementalAggregationByTime");
    }

    @Test
    public void sameGroupAggregatedInDifferentNodesByGroup() throws Exception {
        flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "aggregationCompleteByGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
        FlowExecutionLogger.assertRouteNeverExecuted(AGGREGATION_COMPLETE_ROUTE_KEY);
        flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "aggregationCompleteByGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(2).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(AGGREGATION_COMPLETE_ROUTE_KEY, 1);
        FlowExecutionLogger.assertRouteNthExecution(AGGREGATION_COMPLETE_ROUTE_KEY, 1, new Object[]{1, 2});
    }

    @Test
    public void groupIdIsTheSameForEveryIncrementalAggregationByGroup() throws Exception {
        CoreEvent run = flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "processedGroupIdByGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 1);
        MatcherAssert.assertThat(((TypedValue) run.getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue(), Matchers.is(DEFAULT_GROUP_ID));
        CoreEvent run2 = flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "processedGroupIdByGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 2);
        MatcherAssert.assertThat(((TypedValue) run2.getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue(), Matchers.is(DEFAULT_GROUP_ID));
        CoreEvent run3 = flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "processedGroupIdByGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(AGGREGATION_COMPLETE_ROUTE_KEY, 1);
        MatcherAssert.assertThat(((TypedValue) run3.getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue(), Matchers.is(DEFAULT_GROUP_ID));
    }

    @Test
    public void groupIdIsTheSameForEveryIncrementalAggregationBySize() throws Exception {
        String str = (String) ((TypedValue) flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "processedGroupIdBySize").withPayload(1).run().getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue();
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 1);
        MatcherAssert.assertThat(((TypedValue) flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "processedGroupIdBySize").withPayload(2).run().getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue(), Matchers.is(Matchers.equalTo(str)));
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 2);
        MatcherAssert.assertThat(((TypedValue) flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "processedGroupIdBySize").withPayload(3).run().getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue(), Matchers.is(Matchers.equalTo(str)));
        FlowExecutionLogger.assertRouteExecutedNTimes(AGGREGATION_COMPLETE_ROUTE_KEY, 1);
        FlowExecutionLogger.assertRouteNthExecution(AGGREGATION_COMPLETE_ROUTE_KEY, 1, new Object[]{1, 2, 3});
    }

    @Test
    public void groupIdIsTheSameForEveryIncrementalAggregationByTime() throws Exception {
        String str = (String) ((TypedValue) flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "processedGroupIdByTime").withPayload(1).run().getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue();
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 1);
        CoreEvent run = flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "processedGroupIdByTime").withPayload(2).run();
        MatcherAssert.assertThat(((TypedValue) run.getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue(), Matchers.is(Matchers.equalTo(str)));
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 2);
        flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "processedGroupIdByTime").withPayload(3).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(AGGREGATION_COMPLETE_ROUTE_KEY, 1);
        FlowExecutionLogger.assertRouteNthExecution(AGGREGATION_COMPLETE_ROUTE_KEY, 1, new Object[]{((TypedValue) run.getVariables().get(PROCESSED_GROUP_ID_KEY)).getValue()});
    }

    @Test
    public void onCompleteListenerCalledOneTimeForGroupAggregator() throws Exception {
        flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "aggregationCompleteByGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
        flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "aggregationCompleteByGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(2).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(LISTENER_ROUTE_KEY, 1);
    }

    @Test
    public void onPeriodCompleteListenerCalledOneTimeForTimeAggregator() throws Exception {
        flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "periodComplete").withPayload(1).run();
        flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "periodComplete").withPayload(1).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(LISTENER_ROUTE_KEY, 1);
    }

    @Test
    public void rescheduleOnPrimaryNodeDownForGroupBasedAggregator() throws Exception {
        flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), "failOverTestFlowWithGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(AGGREGATION_COMPLETE_ROUTE_KEY, 1);
        try {
            flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "failOverTestFlowWithGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
            Assert.fail("Group should've been complete");
        } catch (MuleException e) {
        }
        killPrimaryInstance();
        waitUntilThereIsPrimaryPollingNode();
        Thread.sleep(4000L);
        flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), "failOverTestFlowWithGroup").withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
    }

    @Test
    public void rescheduleOnPrimaryNodeDownForSizeBasedAggregator() throws Exception {
        testPeriodSchedulingOnPrimaryNodeDown("failOverTestFlowWithSize");
    }

    @Test
    public void rescheduleOnPrimaryNodeDownForTimeBasedAggregator() throws Exception {
        testPeriodSchedulingOnPrimaryNodeDown("failOverTestFlowWithTime");
    }

    private void testPeriodSchedulingOnPrimaryNodeDown(String str) throws Exception {
        flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), str).withPayload(1).run();
        killPrimaryInstance();
        waitUntilThereIsPrimaryPollingNode();
        FlowExecutionLogger.assertRouteExecutedNTimes(LISTENER_ROUTE_KEY, 1);
    }

    private void assertIncrementalAggregation(String str) throws Exception {
        flowRunner(getClusterInstanceInfrastructure(0).getRegistry(), str).withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(1).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 1);
        FlowExecutionLogger.assertRouteNthExecution(INCREMENTAL_AGGREGATION_ROUTE_KEY, 1, new Object[]{1});
        flowRunner(getClusterInstanceInfrastructure(1).getRegistry(), str).withVariable(GROUP_ID_VARIABLE_KEY, DEFAULT_GROUP_ID).withPayload(2).run();
        FlowExecutionLogger.assertRouteExecutedNTimes(INCREMENTAL_AGGREGATION_ROUTE_KEY, 2);
        FlowExecutionLogger.assertRouteNthExecution(INCREMENTAL_AGGREGATION_ROUTE_KEY, 2, new Object[]{1, 2});
    }
}
