package org.mule.runtime.core.internal.streaming.bytes;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mule.runtime.api.streaming.CursorStream;
import org.mule.runtime.api.streaming.CursorStreamProvider;
import org.mule.runtime.api.util.DataSize;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.streaming.bytes.FileStoreCursorStreamConfig;
import org.mule.runtime.core.streaming.bytes.InMemoryCursorStreamConfig;
import org.mule.runtime.core.util.func.CheckedConsumer;
import org.mule.runtime.core.util.func.CheckedRunnable;
import org.mule.tck.size.SmallTest;

@RunWith(Parameterized.class)
@SmallTest
/* loaded from: input_file:org/mule/runtime/core/internal/streaming/bytes/CursorStreamProviderTestCase.class */
public class CursorStreamProviderTestCase extends AbstractByteStreamingTestCase {
    private final int halfDataLength;
    private final int bufferSize;
    private final ScheduledExecutorService executorService;
    private CursorStreamProvider streamProvider;
    private CountDownLatch controlLatch;
    private CountDownLatch mainThreadLatch;
    private ByteBufferManager bufferManager;

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{"In Memory Without expansion", 262144, 1048576, 1048576}, new Object[]{"In Memory With expansion", 262144, 1048576, 2097152}, new Object[]{"File Store", 2097152, 262144, 262144});
    }

    public CursorStreamProviderTestCase(String str, int i, int i2, int i3) {
        super(i);
        this.bufferManager = new PoolingByteBufferManager();
        this.executorService = Executors.newScheduledThreadPool(2);
        this.bufferSize = i2;
        this.halfDataLength = this.data.length() / 2;
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(this.data.getBytes());
        if (i <= i2) {
            this.streamProvider = new InMemoryCursorStreamProvider(byteArrayInputStream, new InMemoryCursorStreamConfig(new DataSize(i2, DataUnit.BYTE), new DataSize(i2 / 2, DataUnit.BYTE), new DataSize(i3, DataUnit.BYTE)), this.bufferManager, (Event) Mockito.mock(Event.class));
        } else {
            this.streamProvider = new FileStoreCursorStreamProvider(byteArrayInputStream, new FileStoreCursorStreamConfig(new DataSize(i3, DataUnit.BYTE)), (Event) Mockito.mock(Event.class), this.bufferManager, this.executorService);
        }
        resetLatches();
    }

    @After
    public void after() {
        this.streamProvider.close();
        this.executorService.shutdownNow();
    }

    @Test
    public void readFullyWithInSingleCursor() throws IOException {
        withCursor(cursorStream -> {
            Assert.assertThat(IOUtils.toString(cursorStream), CoreMatchers.equalTo(this.data));
        });
    }

    @Test
    public void readFullyByteByByteWithSingleCursor() throws IOException {
        withCursor(cursorStream -> {
            for (int i = 0; i < this.data.length(); i++) {
                Assert.assertThat(Character.valueOf((char) cursorStream.read()), CoreMatchers.equalTo(Character.valueOf(this.data.charAt(i))));
            }
        });
    }

    @Test
    public void partialReadOnSingleCursor() throws Exception {
        byte[] bArr = new byte[this.halfDataLength];
        withCursor(cursorStream -> {
            cursorStream.read(bArr, 0, this.halfDataLength);
            Assert.assertThat(toString(bArr), CoreMatchers.equalTo(this.data.substring(0, this.halfDataLength)));
        });
    }

    @Test
    public void partialReadWithOffsetOnSingleCursor() throws Exception {
        byte[] bArr = new byte[this.halfDataLength + 2];
        bArr[0] = "!".getBytes()[0];
        bArr[1] = bArr[0];
        withCursor(cursorStream -> {
            cursorStream.read(bArr, 2, this.halfDataLength);
            Assert.assertThat(toString(bArr), CoreMatchers.equalTo("!!" + this.data.substring(0, this.halfDataLength)));
        });
    }

    @Test
    public void randomSeekWithOneOpenCursor() throws Exception {
        withCursor(cursorStream -> {
            Assert.assertThat(IOUtils.toString(cursorStream), CoreMatchers.equalTo(this.data));
            seekAndAssert(cursorStream, 0L, 10);
            seekAndAssert(cursorStream, this.halfDataLength, this.halfDataLength);
        });
    }

    @Test
    public void twoOpenCursorsConsumingTheStreamInSingleThread() throws Exception {
        withCursor(cursorStream -> {
            withCursor(cursorStream -> {
                seekAndAssert(cursorStream, 0L, this.data.length());
                seekAndAssert(cursorStream, 0L, this.data.length());
            });
        });
    }

    @Test
    public void twoOpenCursorsReadingOppositeEndsOfTheStreamInSingleThread() throws Exception {
        withCursor(cursorStream -> {
            withCursor(cursorStream -> {
                seekAndAssert(cursorStream, 0L, this.data.length() / 2);
                seekAndAssert(cursorStream, this.halfDataLength, this.halfDataLength);
            });
        });
    }

    @Test
    public void twoOpenCursorsConsumingTheStreamConcurrently() throws Exception {
        withCursor(cursorStream -> {
            withCursor(cursorStream -> {
                doAsync(() -> {
                    seekAndAssert(cursorStream, 0L, this.data.length());
                }, () -> {
                    seekAndAssert(cursorStream, 0L, this.data.length());
                });
            });
        });
    }

    @Test
    public void twoOpenCursorsReadingOppositeEndsOfTheStreamConcurrently() throws Exception {
        withCursor(cursorStream -> {
            withCursor(cursorStream -> {
                doAsync(() -> {
                    seekAndAssert(cursorStream, 0L, this.data.length() / 2);
                }, () -> {
                    seekAndAssert(cursorStream, this.halfDataLength, this.halfDataLength);
                });
            });
        });
    }

    @Test
    public void getPosition() throws Exception {
        withCursor(cursorStream -> {
            Assert.assertThat(Long.valueOf(cursorStream.getPosition()), CoreMatchers.is(0L));
            cursorStream.seek(10L);
            Assert.assertThat(Long.valueOf(cursorStream.getPosition()), CoreMatchers.is(10L));
            cursorStream.seek(0L);
            Assert.assertThat(Long.valueOf(cursorStream.getPosition()), CoreMatchers.is(0L));
        });
    }

    @Test
    public void isClosed() throws Exception {
        withCursor(cursorStream -> {
            Assert.assertThat(Boolean.valueOf(cursorStream.isClosed()), CoreMatchers.is(false));
            IOUtils.toString(cursorStream);
            Assert.assertThat(Boolean.valueOf(cursorStream.isClosed()), CoreMatchers.is(true));
            cursorStream.seek(0L);
            Assert.assertThat(Boolean.valueOf(cursorStream.isClosed()), CoreMatchers.is(false));
        });
    }

    @Test
    public void getSliceWhichStartsInCurrentSegmentButEndsInTheNext() throws Exception {
        if (this.data.length() < this.bufferSize) {
            return;
        }
        int i = this.bufferSize - 10;
        int i2 = this.bufferSize / 2;
        byte[] bArr = new byte[i2];
        withCursor(cursorStream -> {
            cursorStream.seek(i);
            Assert.assertThat(Integer.valueOf(cursorStream.read(bArr, 0, i2)), CoreMatchers.is(Integer.valueOf(i2)));
            Assert.assertThat(toString(bArr), CoreMatchers.equalTo(this.data.substring(i, i + i2)));
        });
    }

    private void doAsync(CheckedRunnable checkedRunnable, CheckedRunnable checkedRunnable2) throws Exception {
        resetLatches();
        Future doAsync = doAsync(() -> {
            this.controlLatch.await();
            checkedRunnable.run();
            this.mainThreadLatch.countDown();
        });
        Future doAsync2 = doAsync(() -> {
            this.controlLatch.countDown();
            checkedRunnable2.run();
            this.mainThreadLatch.countDown();
        });
        awaitMainThreadLatch();
        Assert.assertThat(doAsync.get(), CoreMatchers.is(CoreMatchers.nullValue()));
        Assert.assertThat(doAsync2.get(), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    private Future doAsync(CheckedRunnable checkedRunnable) {
        return this.executorService.submit(() -> {
            try {
                checkedRunnable.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void awaitMainThreadLatch() throws InterruptedException {
        this.mainThreadLatch.await(1L, TimeUnit.SECONDS);
    }

    private void seekAndAssert(CursorStream cursorStream, long j, int i) throws Exception {
        byte[] bArr = new byte[i];
        cursorStream.seek(j);
        cursorStream.read(bArr, 0, i);
        Assert.assertThat(toString(bArr), CoreMatchers.equalTo(this.data.substring(Math.toIntExact(j), Math.toIntExact(j + i))));
    }

    private void resetLatches() {
        this.controlLatch = new CountDownLatch(1);
        this.mainThreadLatch = new CountDownLatch(2);
    }

    private void withCursor(CheckedConsumer<CursorStream> checkedConsumer) throws IOException {
        CursorStream openCursor = this.streamProvider.openCursor();
        Throwable th = null;
        try {
            try {
                checkedConsumer.accept(openCursor);
                if (openCursor != null) {
                    if (0 == 0) {
                        openCursor.close();
                        return;
                    }
                    try {
                        openCursor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openCursor != null) {
                if (th != null) {
                    try {
                        openCursor.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openCursor.close();
                }
            }
            throw th4;
        }
    }
}
