package org.mule.test.module.extension.streaming;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mule.functional.api.flow.FlowRunner;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.streaming.iterator.ConsumerStreamingIterator;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;

@Story("Object Streaming")
@Feature("Streaming")
/* loaded from: input_file:org/mule/test/module/extension/streaming/ObjectStreamingExtensionTestCase.class */
public class ObjectStreamingExtensionTestCase extends AbstractStreamingExtensionTestCase {
    private static final int DATA_SIZE = 100;
    private static final String MY_STREAM_VAR = "myStreamVar";
    private List<String> data;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Rule
    public SystemProperty withStatistics = new SystemProperty("mule.disable.payload.statistics", "true");

    @Rule
    public SystemProperty withPayloadStatistics = new SystemProperty("mule.disable.payload.statistics", "false");

    protected void doSetUp() throws Exception {
        super.doSetUp();
        this.data = new ArrayList(100);
        for (int i = 0; i < 100; i++) {
            this.data.add(RandomStringUtils.randomAlphabetic(100));
        }
    }

    protected String getConfigFile() {
        return "streaming/objects-streaming-extension-config.xml";
    }

    @Test
    @Description("Consume an object stream")
    public void getObjectStream() throws Exception {
        assertStreamMatchesData("getStreamWithoutStreaming");
    }

    @Test
    @Description("Stores an object stream in a variable leaving without modifying the original payload")
    public void getObjectStreamWithTargetValue() throws Exception {
        CoreEvent run = flowRunner("getStreamWithTargetValue").withPayload(this.data).run();
        Assert.assertThat(((TypedValue) run.getVariables().get(MY_STREAM_VAR)).getValue(), CoreMatchers.is(CoreMatchers.instanceOf(String.class)));
        Assert.assertThat(((TypedValue) run.getVariables().get(MY_STREAM_VAR)).getValue(), CoreMatchers.equalTo(this.data.get(0)));
    }

    @Test
    @Description("Stores an object stream in a variable leaving without modifying the original payload")
    public void getObjectStreamWithTargetVariable() throws Exception {
        CoreEvent run = flowRunner("getStreamWithTarget").keepStreamsOpen().withPayload(this.data).run();
        Assert.assertThat(((TypedValue) run.getVariables().get(MY_STREAM_VAR)).getValue(), CoreMatchers.is(CoreMatchers.instanceOf(CursorIteratorProvider.class)));
        Assert.assertThat(StreamSupport.stream(Spliterators.spliteratorUnknownSize(((CursorIteratorProvider) ((TypedValue) run.getVariables().get(MY_STREAM_VAR)).getValue()).openCursor(), 16), false).collect(Collectors.toList()), CoreMatchers.equalTo(this.data));
        Assert.assertThat(run.getMessage().getPayload().getValue(), CoreMatchers.is(CoreMatchers.instanceOf(List.class)));
        Assert.assertThat(run.getMessage().getPayload().getValue(), CoreMatchers.equalTo(this.data));
    }

    @Test
    @Description("Operation is configured not to stream")
    public void operationWithDisabledStreaming() throws Exception {
        assertStreamMatchesData("getStreamWithoutStreaming");
    }

    @Test
    @Description("Operation is configured not to stream and stream gets closed automatically even if not consumed")
    public void nonRepeatableStreamIsManaged() throws Exception {
        Object objectStream = getObjectStream("getStreamWithoutStreaming", false);
        Assert.assertThat(objectStream, CoreMatchers.is(CoreMatchers.instanceOf(ConsumerStreamingIterator.class)));
        ConsumerStreamingIterator consumerStreamingIterator = (ConsumerStreamingIterator) objectStream;
        new PollingProber(1000L, 100L).check(new JUnitLambdaProbe(() -> {
            Assert.assertThat(Boolean.valueOf(consumerStreamingIterator.hasNext()), CoreMatchers.is(false));
            return true;
        }));
        this.expectedException.expect(new BaseMatcher<Throwable>() { // from class: org.mule.test.module.extension.streaming.ObjectStreamingExtensionTestCase.1
            public boolean matches(Object obj) {
                return obj.getClass().getName().equals("org.mule.runtime.core.internal.streaming.object.iterator.ClosedConsumerException");
            }

            public void describeTo(org.hamcrest.Description description) {
                description.appendText("Exception was not a ClosedConsumerException");
            }
        });
        consumerStreamingIterator.next();
    }

    @Test
    @Description("Consume a stream generated in a transaction")
    public void getStreamInTx() throws Exception {
        assertStreamMatchesData("getStreamInTx");
    }

    @Test(expected = Exception.class)
    @Description("All cursors closed when the flow fails")
    public void allStreamsClosedInCaseOfException() throws Exception {
        flowRunner("crashCar").withPayload(this.data).run();
    }

    @Test(expected = Exception.class)
    @Description("All cursors closed when the flow fails in a transaction")
    public void allStreamsClosedInCaseOfExceptionInTx() throws Exception {
        flowRunner("crashCarTx").withPayload(this.data).run();
    }

    private Object getObjectStream(String str, boolean z) throws Exception {
        FlowRunner withPayload = flowRunner(str).withPayload(this.data);
        if (z) {
            withPayload.keepStreamsOpen();
        }
        return withPayload.run().getMessage().getPayload().getValue();
    }

    private List<String> consumeObjectStream(String str, boolean z) throws Exception {
        Object objectStream = getObjectStream(str, z);
        if (!(objectStream instanceof Iterator)) {
            if (objectStream instanceof List) {
                return (List) objectStream;
            }
            throw new IllegalStateException("Stream of unknown type: " + objectStream.getClass());
        }
        Iterator it = (Iterator) objectStream;
        LinkedList linkedList = new LinkedList();
        linkedList.getClass();
        it.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        return linkedList;
    }

    private void assertStreamMatchesData(String str) throws Exception {
        Assert.assertThat(consumeObjectStream(str, true), CoreMatchers.equalTo(this.data));
    }
}
