package org.mule.service.http.impl.service.client.async;

import com.ning.http.client.AsyncHandler;
import com.ning.http.client.FluentCaseInsensitiveStringsMap;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.providers.grizzly.GrizzlyResponseBodyPart;
import com.ning.http.client.providers.grizzly.PauseHandler;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.core.api.util.UUID;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.impl.AllureConstants;
import org.mule.service.http.impl.service.client.NonBlockingStreamWriter;
import org.mule.service.http.impl.util.TimedPipedInputStream;
import org.mule.service.http.impl.util.TimedPipedOutputStream;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.slf4j.MDC;

@Story(AllureConstants.HttpFeature.HttpStory.STREAMING)
@Feature(AllureConstants.HttpFeature.HTTP_SERVICE)
/* loaded from: input_file:org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandlerTestCase.class */
public class ResponseBodyDeferringAsyncHandlerTestCase extends AbstractMuleTestCase {
    private static final int PROBE_TIMEOUT = 5000;
    private static final int POLL_DELAY = 300;
    private static final int BUFFER_SIZE = 1024;
    private final PauseHandler pauseHandler = (PauseHandler) Mockito.mock(PauseHandler.class);
    private final ExecutorService testExecutor = Executors.newSingleThreadExecutor();
    private final PollingProber prober = new PollingProber(5000, 300);
    private final ExecutorService workersExecutor = Executors.newFixedThreadPool(5);
    private final NonBlockingStreamWriter nonBlockingStreamWriter = new NonBlockingStreamWriter();
    private static final String READ_TIMEOUT_PROPERTY_NAME = "mule.http.responseStreaming.pipeReadTimeoutMillis";

    @Before
    public void setup() {
        System.setProperty(READ_TIMEOUT_PROPERTY_NAME, "100");
        ResponseBodyDeferringAsyncHandler.refreshSystemProperties();
    }

    @After
    public void tearDown() {
        System.clearProperty(READ_TIMEOUT_PROPERTY_NAME);
        ResponseBodyDeferringAsyncHandler.refreshSystemProperties();
    }

    @Test
    public void doesNotStreamWhenPossible() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        Reference reference = new Reference();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(true, new byte[0]);
        completableFuture.whenComplete((httpResponse, th) -> {
        });
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        this.prober.check(new JUnitLambdaProbe(() -> {
            MatcherAssert.assertThat(reference.get(), Matchers.not(Matchers.instanceOf(TimedPipedInputStream.class)));
            return true;
        }));
    }

    @Test
    public void streamsWhenRequired() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        Reference reference = new Reference();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, new byte[0]);
        completableFuture.whenComplete((httpResponse, th) -> {
        });
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        this.prober.check(new JUnitLambdaProbe(() -> {
            MatcherAssert.assertThat(reference.get(), Matchers.instanceOf(TimedPipedInputStream.class));
            return true;
        }));
    }

    @Test
    @Issue("MULE-19208")
    public void handlerAbortsResponseWhenAnErrorOccurred() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, new byte[0]);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        responseBodyDeferringAsyncHandler.onThrowable(new TimeoutException("Timeout exceeded"));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.ABORT));
        PollingProber pollingProber = this.prober;
        completableFuture.getClass();
        pollingProber.check(new JUnitLambdaProbe(completableFuture::isCompletedExceptionally));
    }

    @Test
    @Issue("MULE-19208")
    public void handlerDoesNotTryToWriteAPartIfAnErrorOccurred() throws Exception {
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(new CompletableFuture(), BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, new byte[0]);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        responseBodyDeferringAsyncHandler.onThrowable(new TimeoutException("Timeout exceeded"));
        responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart);
        ((GrizzlyResponseBodyPart) Mockito.verify(mockBodyPart, Mockito.never())).writeTo((OutputStream) ArgumentMatchers.any(TimedPipedOutputStream.class));
    }

    @Test
    @Issue("MULE-19208")
    public void handlerClosesPipedStreamIfAnErrorOccurredBetweenTwoParts() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, new byte[0]);
        GrizzlyResponseBodyPart mockBodyPart2 = mockBodyPart(false, new byte[0]);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        responseBodyDeferringAsyncHandler.onThrowable(new TimeoutException("Timeout exceeded"));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart2), Matchers.is(AsyncHandler.STATE.ABORT));
        this.prober.check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(((HttpResponse) completableFuture.get()).getEntity().getContent().read(new byte[16]) == -1);
        }));
    }

    @Test
    @Issue("MULE-19208")
    public void handlerDoesNotTryToWriteAPartIfAnErrorOccurredBetweenTwoParts() throws Exception {
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(new CompletableFuture(), BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, new byte[0]);
        GrizzlyResponseBodyPart mockBodyPart2 = mockBodyPart(false, new byte[0]);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        responseBodyDeferringAsyncHandler.onThrowable(new TimeoutException("Timeout exceeded"));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart2), Matchers.is(AsyncHandler.STATE.ABORT));
        ((GrizzlyResponseBodyPart) Mockito.verify(mockBodyPart, Mockito.times(1))).writeTo((OutputStream) ArgumentMatchers.any(TimedPipedOutputStream.class));
        ((GrizzlyResponseBodyPart) Mockito.verify(mockBodyPart2, Mockito.never())).writeTo((OutputStream) ArgumentMatchers.any(TimedPipedOutputStream.class));
    }

    @Test
    @Issue("MULE-19208")
    public void readerDoesNotBlockWhenNobodyWroteInTheStreamYet() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        GrizzlyResponseBodyPart grizzlyResponseBodyPart = (GrizzlyResponseBodyPart) Mockito.mock(GrizzlyResponseBodyPart.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(Boolean.valueOf(grizzlyResponseBodyPart.isLast())).thenReturn(false);
        Mockito.when(grizzlyResponseBodyPart.getBodyPartBytes()).thenReturn("payload".getBytes());
        Mockito.when(grizzlyResponseBodyPart.getBodyByteBuffer()).thenReturn(ByteBuffer.allocateDirect(0));
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        Latch latch = new Latch();
        ((GrizzlyResponseBodyPart) Mockito.doAnswer(invocationOnMock -> {
            latch.await();
            return invocationOnMock.callRealMethod();
        }).when(grizzlyResponseBodyPart)).writeTo((OutputStream) ArgumentMatchers.any(TimedPipedOutputStream.class));
        this.testExecutor.submit(() -> {
            try {
                MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(grizzlyResponseBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        });
        this.prober.check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(((HttpResponse) completableFuture.get()).getEntity().getContent().read(new byte[16]) == 0);
        }));
        latch.release();
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(grizzlyResponseBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        responseBodyDeferringAsyncHandler.onCompleted();
        this.prober.check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(((HttpResponse) completableFuture.get()).getEntity().getContent().read(new byte[16]) == -1);
        }));
    }

    @Test
    public void abortsWhenPipeIsClosed() throws Exception {
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(new CompletableFuture(), BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, "You will call me Snowball because my fur is pretty and white.".getBytes());
        responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart);
        responseBodyDeferringAsyncHandler.closeOut();
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.ABORT));
    }

    @Test
    public void doesNotThrowExceptionIfContentLengthIsGreaterThanMaxInteger() throws Exception {
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(new CompletableFuture(), -1, this.workersExecutor, this.nonBlockingStreamWriter);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        FluentCaseInsensitiveStringsMap fluentCaseInsensitiveStringsMap = (FluentCaseInsensitiveStringsMap) Mockito.mock(FluentCaseInsensitiveStringsMap.class);
        Mockito.when(fluentCaseInsensitiveStringsMap.getFirstValue("Content-Length")).thenReturn(Long.toString(4294967294L));
        Mockito.when(fluentCaseInsensitiveStringsMap.getFirstValue("Transfer-Encoding")).thenReturn("");
        HttpResponseHeaders httpResponseHeaders = (HttpResponseHeaders) Mockito.mock(HttpResponseHeaders.class);
        Mockito.when(httpResponseHeaders.getHeaders()).thenReturn(fluentCaseInsensitiveStringsMap);
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onHeadersReceived(httpResponseHeaders), Matchers.is(AsyncHandler.STATE.CONTINUE));
    }

    @Test
    @Issue("W-16640190")
    public void readFromPipeInWhenCompleteDoesNotCauseADeadlock() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        Reference reference = new Reference();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, BUFFER_SIZE, this.workersExecutor, this.nonBlockingStreamWriter);
        responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS));
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, "Hello ".getBytes());
        GrizzlyResponseBodyPart mockBodyPart2 = mockBodyPart(true, "world".getBytes());
        completableFuture.whenComplete((httpResponse, th) -> {
            reference.set(IOUtils.toString(httpResponse.getEntity().getContent()));
        });
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart2), Matchers.is(AsyncHandler.STATE.CONTINUE));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onCompleted(), Matchers.is(Matchers.nullValue()));
        this.prober.check(new JUnitLambdaProbe(() -> {
            MatcherAssert.assertThat(reference.get(), Matchers.is("Hello world"));
            return true;
        }));
    }

    @Test
    @Issue("W-17048606")
    public void writePartBiggerThanBufferResultsInAsyncWrite() throws Exception {
        System.clearProperty(READ_TIMEOUT_PROPERTY_NAME);
        ResponseBodyDeferringAsyncHandler.refreshSystemProperties();
        CompletableFuture completableFuture = new CompletableFuture();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, 5, this.workersExecutor, this.nonBlockingStreamWriter);
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, "Hello ".getBytes());
        GrizzlyResponseBodyPart mockBodyPart2 = mockBodyPart(true, "world".getBytes());
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS)), Matchers.is(AsyncHandler.STATE.CONTINUE));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        ((PauseHandler) Mockito.verify(this.pauseHandler)).requestPause();
        InputStream content = ((HttpResponse) completableFuture.get()).getEntity().getContent();
        MatcherAssert.assertThat(content, Matchers.instanceOf(TimedPipedInputStream.class));
        MatcherAssert.assertThat(Integer.valueOf(content.available()), Matchers.is(5));
        ((PauseHandler) Mockito.verify(this.pauseHandler, Mockito.never())).resume();
        StringBuilder sb = new StringBuilder();
        this.testExecutor.submit(() -> {
            consumePipe(content, sb);
        });
        this.workersExecutor.submit((Runnable) this.nonBlockingStreamWriter);
        this.prober.check(new JUnitLambdaProbe(() -> {
            ((PauseHandler) Mockito.verify(this.pauseHandler, Mockito.atLeastOnce())).resume();
            return true;
        }));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart2), Matchers.is(AsyncHandler.STATE.CONTINUE));
        this.prober.check(new JUnitLambdaProbe(() -> {
            synchronized (sb) {
                MatcherAssert.assertThat(sb.toString(), Matchers.is("Hello world"));
            }
            return true;
        }));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onCompleted(), Matchers.is(Matchers.nullValue()));
        this.nonBlockingStreamWriter.stop();
    }

    @Test
    @Issue("W-17048606")
    public void asyncWriteHappensWithSameTCCL() throws Exception {
        System.clearProperty(READ_TIMEOUT_PROPERTY_NAME);
        ResponseBodyDeferringAsyncHandler.refreshSystemProperties();
        String uuid = UUID.getUUID();
        MDC.put(uuid, "TestValue");
        final HashMap hashMap = new HashMap();
        CompletableFuture completableFuture = new CompletableFuture();
        ResponseBodyDeferringAsyncHandler responseBodyDeferringAsyncHandler = new ResponseBodyDeferringAsyncHandler(completableFuture, 5, this.workersExecutor, this.nonBlockingStreamWriter) { // from class: org.mule.service.http.impl.service.client.async.ResponseBodyDeferringAsyncHandlerTestCase.1
            public void onThrowable(Throwable th) {
                hashMap.putAll(MDC.getCopyOfContextMap());
                super.onThrowable(th);
            }
        };
        GrizzlyResponseBodyPart mockBodyPart = mockBodyPart(false, "Hello ".getBytes());
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onStatusReceived((HttpResponseStatus) Mockito.mock(HttpResponseStatus.class, Mockito.RETURNS_DEEP_STUBS)), Matchers.is(AsyncHandler.STATE.CONTINUE));
        MatcherAssert.assertThat(responseBodyDeferringAsyncHandler.onBodyPartReceived(mockBodyPart), Matchers.is(AsyncHandler.STATE.CONTINUE));
        ((HttpResponse) completableFuture.get()).getEntity().getContent().close();
        this.workersExecutor.submit((Runnable) this.nonBlockingStreamWriter);
        this.prober.check(new JUnitLambdaProbe(() -> {
            MatcherAssert.assertThat(hashMap.get(uuid), Matchers.is("TestValue"));
            return true;
        }));
        MDC.remove(uuid);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void consumePipe(InputStream inputStream, StringBuilder sb) {
        boolean z = true;
        while (z) {
            try {
                int read = inputStream.read();
                if (read == -1) {
                    z = false;
                } else {
                    synchronized (sb) {
                        sb.append((char) read);
                    }
                }
            } catch (IOException e) {
                Assert.fail("Got exception reading from pipe");
            }
        }
    }

    private GrizzlyResponseBodyPart mockBodyPart(boolean z, byte[] bArr) throws IOException {
        GrizzlyResponseBodyPart grizzlyResponseBodyPart = (GrizzlyResponseBodyPart) Mockito.mock(GrizzlyResponseBodyPart.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(Boolean.valueOf(grizzlyResponseBodyPart.isLast())).thenReturn(Boolean.valueOf(z));
        Mockito.when(grizzlyResponseBodyPart.getBodyByteBuffer()).thenReturn(ByteBuffer.wrap(bArr));
        Mockito.when(grizzlyResponseBodyPart.getBodyPartBytes()).thenReturn(bArr);
        Mockito.when(Integer.valueOf(grizzlyResponseBodyPart.length())).thenReturn(Integer.valueOf(bArr.length));
        Mockito.when(grizzlyResponseBodyPart.getPauseHandler()).thenReturn(this.pauseHandler);
        ((GrizzlyResponseBodyPart) Mockito.doAnswer(invocationOnMock -> {
            ((OutputStream) invocationOnMock.getArgument(0)).write(bArr);
            return Integer.valueOf(bArr.length);
        }).when(grizzlyResponseBodyPart)).writeTo((OutputStream) ArgumentMatchers.any(OutputStream.class));
        return grizzlyResponseBodyPart;
    }
}
