package com.mulesoft.mule.test.batch;

import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.test.batch.AbstractBatchTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsCollectionWithSize;
import org.junit.Assert;
import org.junit.Test;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.Closeable;
import org.mule.runtime.core.api.event.CoreEvent;

/* loaded from: input_file:com/mulesoft/mule/test/batch/StreamingAggregatorTestCase.class */
public class StreamingAggregatorTestCase extends AbstractBatchTestCase {
    private static List<Object> payloads;
    private static final int howMany = 1000;
    private static Iterator<Object> streamingIterator;
    private static int invokationCount;

    /* loaded from: input_file:com/mulesoft/mule/test/batch/StreamingAggregatorTestCase$PayloadAccumulateResultRecorder.class */
    public static final class PayloadAccumulateResultRecorder extends AbstractBatchTestCase.BatchResultRecorder {
        @Override // com.mulesoft.mule.test.batch.AbstractBatchTestCase.BatchResultRecorder
        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            StreamingAggregatorTestCase.payloads.add(coreEvent.getMessage().getPayload().getValue());
            return super.process(coreEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.mulesoft.mule.test.batch.AbstractBatchTestCase
    public void doSetUp() throws Exception {
        super.doSetUp();
        payloads = Collections.synchronizedList(new ArrayList());
        streamingIterator = null;
        invokationCount = 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.mulesoft.mule.test.batch.AbstractBatchTestCase
    public void doTearDown() throws Exception {
        try {
            assertStreamingIteratorIsClosed();
            assertStreamingChainOnlyInvokedOnce();
        } finally {
            super.doTearDown();
        }
    }

    protected String getConfigFile() {
        return "streaming-aggregator-config.xml";
    }

    @Test
    public void simpleStreamingAggregator() throws Exception {
        doTestSimpleScenario("simpleStreamingAggregator", howMany);
    }

    @Test
    public void simpleStreamingAggregatorParallel() throws Exception {
        doTestSimpleScenario("simpleStreamingAggregatorParallel", howMany);
    }

    @Test
    public void simpleStreamingAggregatorOnlyOneRecord() throws Exception {
        doTestSimpleScenario("simpleStreamingAggregator", 1);
    }

    @Test
    public void simpleStreamingAggregatorOnlyOneRecordParallel() throws Exception {
        doTestSimpleScenario("simpleStreamingAggregatorParallel", 1);
    }

    @Test
    public void streamingAggregatorNotInLastStep() throws Exception {
        doTestSimpleScenario("streamingAggregatorNotInLastStep", howMany);
    }

    @Test
    public void streamingAggregatorNotInLastStepOnlyOneRecord() throws Exception {
        doTestSimpleScenario("streamingAggregatorNotInLastStep", 1);
    }

    private void doTestSimpleScenario(String str, int i) throws Exception {
        doTest(str, createRandomPayload(i));
        awaitJobTermination();
        Assert.assertThat(Boolean.valueOf(wasJobSuccessful()), Matchers.is(true));
        Assert.assertThat(payloads, IsCollectionWithSize.hasSize(i));
        assertUpperCasePipes(payloads);
    }

    @Test
    public void streamingAggregatorWithFailure() throws Exception {
        doTest("streamingAggregatorWithFailure", createRandomPayload(howMany));
        awaitJobTermination();
        Assert.assertThat(Boolean.valueOf(wasJobSuccessful()), Matchers.is(false));
        Assert.assertThat(resultRecords, IsCollectionWithSize.hasSize(howMany));
        Assert.assertThat(payloads, IsCollectionWithSize.hasSize(howMany));
        for (Record record : resultRecords) {
            Assert.assertThat(Boolean.valueOf(record.hasErrors()), Matchers.is(true));
            Assert.assertThat(record.getExceptionForStep("failureStep"), Matchers.not(Matchers.nullValue()));
        }
    }

    @Test
    public void allQueuesDisposed() throws Exception {
        doTest("simpleStreamingAggregator", createRandomPayload(howMany));
        awaitJobTermination();
        Assert.assertThat(Boolean.valueOf(wasJobSuccessful()), Matchers.is(true));
    }

    @Test
    public void streamingAggregatorClosedWithChangedPayload() throws Exception {
        doTest("streamingAggregatorClosedWithChangedPayload", createRandomPayload(howMany));
        awaitJobTermination();
        Assert.assertThat(Boolean.valueOf(wasJobSuccessful()), Matchers.is(true));
    }

    private void assertStreamingIteratorIsClosed() throws MuleException {
        if (streamingIterator != null) {
            Assert.assertThat(streamingIterator, Matchers.instanceOf(Closeable.class));
            Assert.assertThat(Boolean.valueOf(streamingIterator.hasNext()), Matchers.is(false));
        }
    }

    private void assertStreamingChainOnlyInvokedOnce() {
        Assert.assertThat(Integer.valueOf(invokationCount), Matchers.is(1));
    }

    public static Object interceptIterator(Iterator<Object> it) {
        streamingIterator = it;
        invokationCount++;
        return it;
    }
}
