package org.mule.test.module.extension.streaming;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mule.metadata.api.model.UnionType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.meta.model.config.ConfigurationModel;
import org.mule.runtime.api.meta.model.operation.OperationModel;
import org.mule.runtime.api.meta.model.parameter.ParameterModel;
import org.mule.runtime.api.meta.model.parameter.ParameterizedModel;
import org.mule.runtime.api.meta.model.source.SourceModel;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.api.streaming.bytes.InMemoryCursorStreamConfig;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.dsl.api.component.config.DefaultComponentLocation;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.JUnitProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.test.module.extension.internal.util.ExtensionsTestUtils;

@Story("Bytes Streaming")
@Feature("Streaming")
/* loaded from: input_file:org/mule/test/module/extension/streaming/AbstractBytesStreamingExtensionTestCase.class */
public abstract class AbstractBytesStreamingExtensionTestCase extends AbstractStreamingExtensionTestCase {
    private static final String BARGAIN_SPELL = "dormammu i've come to bargain";
    private static final String TOO_BIG = "Too big!";
    private static final int TIMEOUT = 2000;
    private static final int DELAY = 200;
    private static List<String> CASTED_SPELLS = new LinkedList();

    @Inject
    @Named("bytesCaster")
    private Flow bytesCaster;

    @Inject
    @Named("sourceWithExceededBuffer")
    private Flow sourceWithExceededBuffer;

    @Inject
    @Named("bytesCasterInTx")
    private Flow bytesCasterInTx;

    @Inject
    @Named("bytesCasterWithoutStreaming")
    private Flow bytesCasterWithoutStreaming;

    @Inject
    @Named("toNonRepeatableStream")
    private Flow toNonRepeatableStream;

    @Inject
    private StreamingManager streamingManager;

    @Rule
    public SystemProperty configName;
    private String data = RandomStringUtils.randomAlphabetic(2048);

    public static void addSpell(String str) {
        synchronized (CASTED_SPELLS) {
            CASTED_SPELLS.add(str);
        }
    }

    public AbstractBytesStreamingExtensionTestCase(String str) {
        this.configName = new SystemProperty("configName", str);
    }

    protected String getConfigFile() {
        return "bytes-streaming-extension-config.xml";
    }

    protected void doSetUp() throws Exception {
        setDisposeContextPerClass(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.test.module.extension.streaming.AbstractStreamingExtensionTestCase
    public void doTearDownAfterMuleContextDispose() throws Exception {
        super.doTearDownAfterMuleContextDispose();
        CASTED_SPELLS.clear();
    }

    protected boolean isDisposeContextPerClass() {
        return true;
    }

    @Test
    @Description("Fully consume a cursor stream")
    public void consumeGeneratedCursorAndCloseIt() throws Exception {
        Assert.assertThat(flowRunner("consumeGeneratedStream").withPayload(this.data).run().getMessage().getPayload().getValue(), CoreMatchers.is(this.data));
    }

    @Test
    @Description("Operation with disabled streaming")
    public void operationWithDisabledStreaming() throws Exception {
        Object value = flowRunner("toSimpleStream").withPayload(this.data).keepStreamsOpen().run().getMessage().getPayload().getValue();
        Assert.assertThat(value, CoreMatchers.is(CoreMatchers.instanceOf(InputStream.class)));
        Assert.assertThat(IOUtils.toString((InputStream) value), CoreMatchers.is(this.data));
    }

    @Test(expected = Exception.class)
    @Description("If the flow fails, all cursors should be closed")
    public void allStreamsClosedInCaseOfException() throws Exception {
        flowRunner("crashCar").withPayload(this.data).run();
    }

    @Test(expected = Exception.class)
    @Description("If the flow fails, all non repeatable streams should be closed")
    public void nonRepeatableStreamsClosedInCaseOfException() throws Exception {
        flowRunner("nonRepeatableCrashCar").withPayload(this.data).run();
    }

    @Test
    @Description("If the flow fails, all non repeatable streams should be closed")
    public void allStreamsClosedInCaseOfHandledException() throws Exception {
        flowRunner("handledCrashCar").withPayload(this.data).run();
    }

    @Test
    @Description("If the flow fails, all non repeatable streams should be closed")
    public void nonRepeatableStreamsClosedInCaseOfHandledException() throws Exception {
        flowRunner("nonRepeatableHandledCrashCar").withPayload(this.data).run();
    }

    @Test(expected = Exception.class)
    @Description("If a cursor is open in a transaction, it should be closed if the flow fails")
    public void allStreamsClosedInCaseOfExceptionInTx() throws Exception {
        flowRunner("crashCarTx").withPayload(this.data).run();
    }

    @Test
    @Description("Read a stream from a random position")
    public void seek() throws Exception {
        doSeek("seekStream");
    }

    @Test
    @Description("Rewing a stream and consume it twice")
    public void rewind() throws Exception {
        CoreEvent run = flowRunner("rewind").withPayload(this.data).run();
        Message message = (Message) ((TypedValue) run.getVariables().get("firstRead")).getValue();
        Message message2 = (Message) ((TypedValue) run.getVariables().get("secondRead")).getValue();
        Assert.assertThat(message.getPayload().getValue(), CoreMatchers.equalTo(this.data));
        Assert.assertThat(message2.getPayload().getValue(), CoreMatchers.equalTo(this.data));
    }

    @Test
    @Description("Read from a random position inside a transaction")
    public void seekInTx() throws Exception {
        doSeek("seekStreamTx");
    }

    @Test
    @Description("When the max buffer size is exceeded, the correct type of error is mapped")
    public void throwsBufferSizeExceededError() throws Exception {
        this.data = RandomStringUtils.randomAlphabetic(DataUnit.KB.toBytes(60));
        Assert.assertThat(flowRunner("bufferExceeded").withPayload(this.data).run().getMessage().getPayload().getValue(), CoreMatchers.is(TOO_BIG));
    }

    private void doSeek(String str) throws Exception {
        Assert.assertThat(flowRunner(str).withPayload(this.data).withVariable("position", 10).run().getMessage().getPayload().getValue(), CoreMatchers.is(this.data.substring(10)));
    }

    @Test
    @Description("A source generates a cursor stream")
    public void sourceStreaming() throws Exception {
        startSourceAndListenSpell(this.bytesCaster, bargainPredicate());
    }

    @Test
    @Description("When the max buffer size is exceeded on a stream generated in a source, the correct type of error is mapped")
    public void sourceThrowsBufferSizeExceededError() throws Exception {
        startSourceAndListenSpell(this.sourceWithExceededBuffer, str -> {
            return TOO_BIG.equals(str);
        });
    }

    @Test
    @Description("A source generates a cursor in a transaction")
    public void sourceStreamingInTx() throws Exception {
        startSourceAndListenSpell(this.bytesCasterInTx, bargainPredicate());
    }

    @Test
    @Description("A source is configured not to stream")
    public void sourceWithoutStreaming() throws Exception {
        startSourceAndListenSpell(this.bytesCasterWithoutStreaming, bargainPredicate());
    }

    @Test
    @Description("New cursor is open for Object parameter which receives a CursorProvider")
    public void resolveCursorsFromObjectParams() throws Exception {
        Assert.assertThat(new String((byte[]) muleContext.getObjectSerializer().getInternalProtocol().deserialize(muleContext.getObjectSerializer().getInternalProtocol().serialize((CursorStreamProvider) flowRunner("objectToStream").keepStreamsOpen().withPayload(this.streamingManager.forBytes().getInMemoryCursorProviderFactory(InMemoryCursorStreamConfig.getDefault()).of(testEvent().getContext(), new ByteArrayInputStream(this.data.getBytes()), DefaultComponentLocation.fromSingleComponent("objectToStream"))).run().getMessage().getPayload().getValue())), Charset.defaultCharset()), CoreMatchers.equalTo(this.data));
    }

    @Test
    @Description("A stream provider is serialized as a byte[]")
    public void streamProviderSerialization() throws Exception {
        Assert.assertThat(new String((byte[]) muleContext.getObjectSerializer().getInternalProtocol().deserialize(muleContext.getObjectSerializer().getInternalProtocol().serialize((CursorStreamProvider) flowRunner("toStream").keepStreamsOpen().withPayload(this.data).run().getMessage().getPayload().getValue())), Charset.defaultCharset()), CoreMatchers.equalTo(this.data));
    }

    @Test
    @Description("A non repeatable stream is closed automatically when flow completes")
    public void nonRepeatableStreamIsAutomaticallyClosed() throws Exception {
        final InputStream inputStream = (InputStream) flowRunner("toNonRepeatableStream").withPayload(this.data).run().getMessage().getPayload().getValue();
        new PollingProber(2000L, 200L).check(new JUnitProbe() { // from class: org.mule.test.module.extension.streaming.AbstractBytesStreamingExtensionTestCase.1
            protected boolean test() throws Exception {
                Assert.assertThat(Integer.valueOf(inputStream.read()), CoreMatchers.is(-1));
                return true;
            }

            public String describeFailure() {
                return "Stream was not automatically closed.";
            }
        });
    }

    @Test
    @Description("A non repeatable stream is not automatically closed when event completes async")
    public void nonRepeatableStreamClosesAsync() throws Exception {
        InputStream inputStream = (InputStream) flowRunner("toNonRepeatableStream").keepStreamsOpen().withPayload(this.data).run().getMessage().getPayload().getValue();
        byte[] bArr = new byte[this.data.length()];
        Assert.assertThat(Integer.valueOf(inputStream.read(bArr)), CoreMatchers.is(Integer.valueOf(this.data.length())));
        Assert.assertThat(Integer.valueOf(inputStream.read()), CoreMatchers.is(-1));
        Assert.assertThat(new String(bArr), CoreMatchers.equalTo(this.data));
    }

    @Test
    @Description("Streaming operation has a streaming strategy parameter")
    public void streamingStrategyParameterInOperation() throws Exception {
        assertStreamingStrategyParameter(getStreamingStrategyParameterModel(() -> {
            return (OperationModel) getConfigurationModel().getOperationModel("toStream").get();
        }));
    }

    @Test
    @Description("Streaming source has a streaming strategy parameter")
    public void streamingStrategyParameterInSource() throws Exception {
        assertStreamingStrategyParameter(getStreamingStrategyParameterModel(() -> {
            return (SourceModel) getConfigurationModel().getSourceModel("bytes-caster").get();
        }));
    }

    @Test
    @Description("Call operation multiple times in the flow")
    public void operationCalledAndOutputConsumedMultipleTimes() throws Exception {
        Object value = flowRunner("toStreamMultipleTimes").withPayload(this.data).keepStreamsOpen().run().getMessage().getPayload().getValue();
        Assert.assertThat(value, CoreMatchers.is(CoreMatchers.instanceOf(InputStream.class)));
        Assert.assertThat(IOUtils.toString((InputStream) value), CoreMatchers.is(this.data));
    }

    private ParameterModel getStreamingStrategyParameterModel(Supplier<ParameterizedModel> supplier) {
        return (ParameterModel) supplier.get().getAllParameterModels().stream().filter(parameterModel -> {
            return parameterModel.getName().equals("streamingStrategy");
        }).findFirst().get();
    }

    private ConfigurationModel getConfigurationModel() {
        return (ConfigurationModel) getExtensionModel("Marvel").map(extensionModel -> {
            return (ConfigurationModel) extensionModel.getConfigurationModel("dr-strange").get();
        }).get();
    }

    private void assertStreamingStrategyParameter(ParameterModel parameterModel) {
        ExtensionsTestUtils.assertType(parameterModel.getType(), Object.class, UnionType.class);
    }

    private void startSourceAndListenSpell(Flow flow, Predicate<String> predicate) throws Exception {
        if (!flow.getLifecycleState().isStarted()) {
            flow.start();
        }
        new PollingProber(4000L, 100L).check(new JUnitLambdaProbe(() -> {
            Boolean valueOf;
            synchronized (CASTED_SPELLS) {
                valueOf = Boolean.valueOf(CASTED_SPELLS.stream().anyMatch(predicate));
            }
            return valueOf;
        }));
    }

    private Predicate<String> bargainPredicate() {
        return str -> {
            return str.equals(BARGAIN_SPELL);
        };
    }

    protected boolean isGracefulShutdown() {
        return true;
    }
}
