package org.mule.service.http.impl.service.client.async;

import com.ning.http.client.AsyncHandler;
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.Response;
import com.ning.http.client.providers.grizzly.GrizzlyResponseHeaders;
import com.ning.http.client.providers.grizzly.PauseHandler;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.glassfish.grizzly.http.HttpResponsePacket;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.impl.service.client.HttpResponseCreator;
import org.mule.service.http.impl.service.client.NonBlockingStreamWriter;
import org.mule.service.http.impl.service.util.ThreadContext;
import org.mule.service.http.impl.util.TimedPipedInputStream;
import org.mule.service.http.impl.util.TimedPipedOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.xmlresolver.logging.AbstractLogger;

/* loaded from: input_file:lib/mule-service-http-1.10.2.jar:org/mule/service/http/impl/service/client/async/ResponseBodyDeferringAsyncHandler.class */
public class ResponseBodyDeferringAsyncHandler implements AsyncHandler<Response> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ResponseBodyDeferringAsyncHandler.class);
    private static final String PIPE_READ_TIMEOUT_PROPERTY_NAME = "mule.http.responseStreaming.pipeReadTimeoutMillis";
    private static long PIPE_READ_TIMEOUT_MILLIS = Integer.parseInt(System.getProperty(PIPE_READ_TIMEOUT_PROPERTY_NAME, "20000"));
    private static Field responseField;
    private volatile Response response;
    private int bufferSize;
    private final NonBlockingStreamWriter nonBlockingStreamWriter;
    private final ExecutorService workerScheduler;
    private OutputStream output;
    private final CompletableFuture<HttpResponse> future;
    private Optional<TimedPipedInputStream> input = Optional.empty();
    private final Response.ResponseBuilder responseBuilder = new Response.ResponseBuilder();
    private final HttpResponseCreator httpResponseCreator = new HttpResponseCreator();
    private final AtomicBoolean handled = new AtomicBoolean(false);
    private final AtomicBoolean throwableReceived = new AtomicBoolean(false);
    private final Map<String, String> mdc = MDC.getCopyOfContextMap();

    public ResponseBodyDeferringAsyncHandler(CompletableFuture<HttpResponse> completableFuture, int i, ExecutorService executorService, NonBlockingStreamWriter nonBlockingStreamWriter) {
        this.future = completableFuture;
        this.bufferSize = i;
        this.workerScheduler = executorService;
        this.nonBlockingStreamWriter = nonBlockingStreamWriter;
    }

    @Override // com.ning.http.client.AsyncHandler
    public void onThrowable(Throwable th) {
        this.throwableReceived.set(true);
        try {
            MDC.setContextMap(this.mdc);
            LOGGER.debug("Error caught handling response body", th);
            try {
                closeOut();
            } catch (IOException e) {
                LOGGER.debug("Error closing HTTP response stream", (Throwable) e);
            }
            if (this.handled.getAndSet(true)) {
                if (th.getMessage() == null || !th.getMessage().contains("Pipe closed")) {
                    LOGGER.warn("Error handling HTTP response stream. Set log level to DEBUG for details.");
                } else {
                    LOGGER.error("HTTP response stream was closed before being read but response streams must always be consumed. Set log level to DEBUG for details.");
                }
                LOGGER.debug("HTTP response stream error was ", th);
            } else {
                this.future.completeExceptionally(th instanceof TimeoutException ? (TimeoutException) th : th instanceof IOException ? (IOException) th : new IOException(th.getMessage(), th));
            }
        } finally {
            MDC.clear();
        }
    }

    @Override // com.ning.http.client.AsyncHandler
    public AsyncHandler.STATE onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
        try {
            MDC.setContextMap(this.mdc);
            if (errorDetected()) {
                AsyncHandler.STATE closeAndAbort = closeAndAbort();
                MDC.clear();
                return closeAndAbort;
            }
            this.responseBuilder.reset();
            this.responseBuilder.accumulate(httpResponseStatus);
            AsyncHandler.STATE state = AsyncHandler.STATE.CONTINUE;
            MDC.clear();
            return state;
        } catch (Throwable th) {
            MDC.clear();
            throw th;
        }
    }

    private AsyncHandler.STATE closeAndAbort() throws IOException {
        closeOut();
        return AsyncHandler.STATE.ABORT;
    }

    @Override // com.ning.http.client.AsyncHandler
    public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders httpResponseHeaders) throws Exception {
        try {
            MDC.setContextMap(this.mdc);
            if (errorDetected()) {
                AsyncHandler.STATE closeAndAbort = closeAndAbort();
                MDC.clear();
                return closeAndAbort;
            }
            this.responseBuilder.accumulate(httpResponseHeaders);
            if (this.bufferSize < 0) {
                LOGGER.debug("onHeadersReceived. No configured buffer size, resolving buffer size dynamically.");
                calculateBufferSize(httpResponseHeaders);
            } else {
                LOGGER.debug("onHeadersReceived. Using user configured buffer size of '{} bytes'.", Integer.valueOf(this.bufferSize));
            }
            AsyncHandler.STATE state = AsyncHandler.STATE.CONTINUE;
            MDC.clear();
            return state;
        } catch (Throwable th) {
            MDC.clear();
            throw th;
        }
    }

    private void calculateBufferSize(HttpResponseHeaders httpResponseHeaders) {
        int i = TCPNIOTransport.MAX_RECEIVE_BUFFER_SIZE;
        String firstValue = httpResponseHeaders.getHeaders().getFirstValue("Content-Length");
        if (StringUtils.isEmpty(firstValue) || !StringUtils.isEmpty(httpResponseHeaders.getHeaders().getFirstValue("Transfer-Encoding"))) {
            this.bufferSize = DataUnit.KB.toBytes(32) + 10;
        } else {
            long parseLong = Long.parseLong(firstValue);
            try {
                if (responseField != null && (httpResponseHeaders instanceof GrizzlyResponseHeaders)) {
                    i = ((HttpResponsePacket) responseField.get(httpResponseHeaders)).getRequest().getConnection().getReadBufferSize();
                }
            } catch (IllegalAccessException e) {
                LOGGER.debug("Unable to access connection buffer size.");
            }
            this.bufferSize = (int) Math.min(i, parseLong);
        }
        LOGGER.debug("Max buffer size = {} bytes, Connection buffer size = {} bytes, Content-length = {} bytes, Calculated buffer size = {} bytes", Integer.valueOf(TCPNIOTransport.MAX_RECEIVE_BUFFER_SIZE), Integer.valueOf(i), firstValue, Integer.valueOf(this.bufferSize));
    }

    @Override // com.ning.http.client.AsyncHandler
    public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
        try {
            MDC.setContextMap(this.mdc);
            if (errorDetected()) {
                AsyncHandler.STATE closeAndAbort = closeAndAbort();
                MDC.clear();
                return closeAndAbort;
            }
            if (!this.input.isPresent()) {
                if (httpResponseBodyPart.isLast()) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Single part (size = {}bytes).", Integer.valueOf(httpResponseBodyPart.getBodyByteBuffer().remaining()));
                    }
                    this.responseBuilder.accumulate(httpResponseBodyPart);
                    handleIfNecessary();
                    AsyncHandler.STATE state = AsyncHandler.STATE.CONTINUE;
                    MDC.clear();
                    return state;
                }
                this.output = new TimedPipedOutputStream();
                this.input = Optional.of(new TimedPipedInputStream(this.bufferSize, PIPE_READ_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, (TimedPipedOutputStream) this.output));
            }
            handleIfNecessary();
            if (errorDetected()) {
                AsyncHandler.STATE closeAndAbort2 = closeAndAbort();
                MDC.clear();
                return closeAndAbort2;
            }
            try {
                AsyncHandler.STATE writeBodyPartToPipe = writeBodyPartToPipe(httpResponseBodyPart);
                MDC.clear();
                return writeBodyPartToPipe;
            } catch (IOException e) {
                onThrowable(e);
                AsyncHandler.STATE state2 = AsyncHandler.STATE.ABORT;
                MDC.clear();
                return state2;
            }
        } catch (Throwable th) {
            MDC.clear();
            throw th;
        }
    }

    private AsyncHandler.STATE writeBodyPartToPipe(HttpResponseBodyPart httpResponseBodyPart) throws IOException {
        int length = httpResponseBodyPart.length();
        int availableSpaceInPipe = availableSpaceInPipe();
        if (availableSpaceInPipe < 0 || availableSpaceInPipe >= length) {
            httpResponseBodyPart.writeTo(this.output);
        } else {
            PauseHandler pauseHandler = httpResponseBodyPart.getPauseHandler();
            pauseHandler.requestPause();
            this.nonBlockingStreamWriter.addDataToWrite(this.output, httpResponseBodyPart.getBodyPartBytes(), this::availableSpaceInPipe).whenComplete((BiConsumer<? super Void, ? super Throwable>) resumeCallback(pauseHandler));
        }
        return AsyncHandler.STATE.CONTINUE;
    }

    private BiConsumer<Void, Throwable> resumeCallback(PauseHandler pauseHandler) {
        return (r5, th) -> {
            if (th != null) {
                onThrowable(th);
            }
            try {
                pauseHandler.resume();
            } catch (Exception e) {
                onThrowable(e);
            }
        };
    }

    private int availableSpaceInPipe() {
        if (this.input.isPresent() && !this.input.get().isClosed()) {
            return this.bufferSize - this.input.get().available();
        }
        return -1;
    }

    private boolean errorDetected() {
        return this.future.isCompletedExceptionally() || this.throwableReceived.get();
    }

    protected void closeOut() throws IOException {
        if (this.output != null) {
            try {
                this.output.flush();
            } finally {
                this.output.close();
            }
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.ning.http.client.AsyncHandler
    public Response onCompleted() throws IOException {
        try {
            MDC.setContextMap(this.mdc);
            handleIfNecessary();
            closeOut();
            MDC.clear();
            return null;
        } catch (Throwable th) {
            MDC.clear();
            throw th;
        }
    }

    private void handleIfNecessary() {
        if (this.handled.getAndSet(true)) {
            return;
        }
        if (!shouldCompleteAsync()) {
            completeResponseFuture();
            return;
        }
        try {
            LOGGER.debug("Scheduling response future completion to workers scheduler");
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            this.workerScheduler.submit(() -> {
                ThreadContext threadContext = new ThreadContext(contextClassLoader, this.mdc);
                try {
                    completeResponseFuture();
                    threadContext.close();
                } catch (Throwable th) {
                    try {
                        threadContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            });
        } catch (RejectedExecutionException e) {
            LOGGER.warn("Couldn't schedule completion to workers scheduler, completing it synchronously");
            completeResponseFuture();
        }
    }

    private boolean shouldCompleteAsync() {
        return this.input.isPresent();
    }

    private void completeResponseFuture() {
        this.response = this.responseBuilder.build();
        try {
            if (this.input.isPresent()) {
                this.future.complete(this.httpResponseCreator.create(this.response, this.input.get()));
            } else {
                this.future.complete(this.httpResponseCreator.create(this.response, this.response.getResponseBodyAsStream()));
            }
        } catch (IOException e) {
            onThrowable(e);
            this.future.completeExceptionally(e);
        }
    }

    @Deprecated
    static void refreshSystemProperties() {
        PIPE_READ_TIMEOUT_MILLIS = Integer.parseInt(System.getProperty(PIPE_READ_TIMEOUT_PROPERTY_NAME, "20000"));
    }

    static {
        try {
            responseField = GrizzlyResponseHeaders.class.getDeclaredField(AbstractLogger.RESPONSE);
            responseField.setAccessible(true);
        } catch (Throwable th) {
            LOGGER.warn("Unable to use reflection to access connection buffer size to optimize streaming.", th);
        }
    }
}
