package org.mule.service.http.netty.impl.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.AttributeKey;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.netty.impl.message.HttpResponseCreator;
import org.mule.service.http.netty.impl.util.HttpUtils;
import org.mule.service.http.netty.impl.util.MuleToNettyUtils;
import org.mule.service.http.netty.impl.util.ReactorNettyUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;

/* loaded from: input_file:lib/mule-netty-http-service-0.1.2.jar:org/mule/service/http/netty/impl/client/ReactorNettyClient.class */
public class ReactorNettyClient {
    private final Executor resultExecutor = Executors.newFixedThreadPool(10);
    private final HttpClient httpClient;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ReactorNettyClient.class);
    private static final int DEFAULT_RESPONSE_RECEPTION_BUFFER_SIZE = DataUnit.KB.toBytes(10);
    private static final int CONTENT_LENGTH_TO_HANDLE_AGGREGATED = DataUnit.KB.toBytes(10);
    static final AttributeKey<HttpEntity> REQUEST_ENTITY_KEY = AttributeKey.valueOf("REQUEST_ENTITY");

    public ReactorNettyClient(HttpClient httpClient) {
        this.httpClient = httpClient;
    }

    public Flux<ByteBuf> sendAsyncRequest(HttpRequest httpRequest, HttpRequestOptions httpRequestOptions, HttpHeaders httpHeaders, BiFunction<HttpClientResponse, ByteBufFlux, Publisher<ByteBuf>> biFunction, CompletableFuture<HttpResponse> completableFuture) {
        URI uriWithQueryParams = uriWithQueryParams(httpRequest);
        LOGGER.debug("Sending request to {} with headers {}", uriWithQueryParams, httpHeaders);
        Flux onErrorMap = ((HttpClient.RequestSender) this.httpClient.followRedirect(httpRequestOptions.isFollowsRedirect()).doOnConnected(connection -> {
            connection.channel().attr(AttributeKey.valueOf("removeContentLength")).set(Boolean.valueOf(MuleToNettyUtils.calculateShouldRemoveContentLength(httpRequest)));
            if (httpRequest.getEntity() == null || !isExpect(httpHeaders)) {
                return;
            }
            connection.channel().attr(REQUEST_ENTITY_KEY).set(httpRequest.getEntity());
        }).responseTimeout(Duration.ofMillis(httpRequestOptions.getResponseTimeout())).headers(httpHeaders2 -> {
            httpHeaders2.add(httpHeaders);
        }).request(HttpMethod.valueOf(httpRequest.getMethod())).uri(uriWithQueryParams)).send((httpClientRequest, nettyOutbound) -> {
            if (isExpect(httpClientRequest.requestHeaders())) {
                return nettyOutbound.send(Mono.empty());
            }
            if (httpRequest.getEntity() != null) {
                return nettyOutbound.send(entityPublisher(httpRequest.getEntity()));
            }
            return null;
        }).response(biFunction).onErrorMap(ReactorNettyUtils::onErrorMap);
        Objects.requireNonNull(completableFuture);
        return onErrorMap.doOnError(completableFuture::completeExceptionally).onErrorComplete();
    }

    private boolean isExpect(HttpHeaders httpHeaders) {
        return httpHeaders.contains("expect");
    }

    private static URI uriWithQueryParams(HttpRequest httpRequest) {
        MultiMap<String, String> queryParams = httpRequest.getQueryParams();
        return queryParams.isEmpty() ? httpRequest.getUri() : URI.create(HttpUtils.buildUriString(httpRequest.getUri(), queryParams));
    }

    private Publisher<? extends ByteBuf> entityPublisher(HttpEntity httpEntity) {
        return (httpEntity.getBytesLength().isPresent() && httpEntity.getBytesLength().getAsLong() == 0) ? Mono.empty() : new ChunkedHttpEntityPublisher(httpEntity);
    }

    public Publisher<ByteBuf> receiveContent(HttpClientResponse httpClientResponse, ByteBufFlux byteBufFlux, CompletableFuture<HttpResponse> completableFuture) {
        LOGGER.debug("Received response with headers {} and status {}", httpClientResponse.responseHeaders(), httpClientResponse.status());
        return responseIsShortEnough(httpClientResponse) ? handleShortResponse(httpClientResponse, byteBufFlux, completableFuture) : handleResponseStreaming(httpClientResponse, byteBufFlux, completableFuture);
    }

    private Publisher<ByteBuf> handleResponseStreaming(HttpClientResponse httpClientResponse, ByteBufFlux byteBufFlux, CompletableFuture<HttpResponse> completableFuture) {
        PipedOutputStream pipedOutputStream = new PipedOutputStream();
        try {
            PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream, DEFAULT_RESPONSE_RECEPTION_BUFFER_SIZE);
            Flux<ByteBuf> doOnError = byteBufFlux.retain().doOnNext(byteBuf -> {
                try {
                    byte[] bArr = new byte[byteBuf.readableBytes()];
                    byteBuf.readBytes(bArr);
                    pipedOutputStream.write(bArr);
                    byteBuf.release();
                    if (!completableFuture.isDone()) {
                        LOGGER.debug("Marked response as completed but still waiting on content");
                        completeAsync(completableFuture, () -> {
                            return new HttpResponseCreator().create(httpClientResponse, pipedInputStream);
                        }, this.resultExecutor);
                    }
                } catch (IOException e) {
                    completableFuture.completeExceptionally(e);
                    byteBuf.release();
                }
            }).doOnComplete(() -> {
                try {
                    LOGGER.debug("Marked response as completed");
                    pipedOutputStream.close();
                    if (!completableFuture.isDone()) {
                        completeAsync(completableFuture, () -> {
                            return new HttpResponseCreator().create(httpClientResponse, pipedInputStream);
                        }, this.resultExecutor);
                    }
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            }).doOnError(th -> {
                completableFuture.completeExceptionally(ReactorNettyUtils.onErrorMap(th));
            });
            if (!completableFuture.isDone()) {
                completeAsync(completableFuture, () -> {
                    return new HttpResponseCreator().create(httpClientResponse, pipedInputStream);
                }, this.resultExecutor);
            }
            return doOnError;
        } catch (IOException e) {
            completableFuture.completeExceptionally(e);
            return byteBufFlux;
        }
    }

    private Publisher<ByteBuf> handleShortResponse(HttpClientResponse httpClientResponse, ByteBufFlux byteBufFlux, CompletableFuture<HttpResponse> completableFuture) {
        return byteBufFlux.aggregate().doOnError(th -> {
            completableFuture.completeExceptionally(ReactorNettyUtils.onErrorMap(th));
        }).doOnSuccess(byteBuf -> {
            completableFuture.complete(new HttpResponseCreator().create(httpClientResponse, new ByteArrayInputStream(byteBuf != null ? ByteBufUtil.getBytes(byteBuf) : new byte[0])));
        });
    }

    private boolean responseIsShortEnough(HttpClientResponse httpClientResponse) {
        String str = httpClientResponse.responseHeaders().get("Content-Length");
        return str != null && Integer.parseInt(str) < CONTENT_LENGTH_TO_HANDLE_AGGREGATED;
    }

    public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> completableFuture, Callable<? extends T> callable, Executor executor) {
        if (completableFuture == null) {
            throw new NullPointerException();
        }
        CompletableFuture callAsync = callAsync(callable == null ? null : () -> {
            if (completableFuture.isDone()) {
                return null;
            }
            return callable.call();
        }, executor);
        if (callAsync == null) {
            return null;
        }
        completableFuture.whenComplete((BiConsumer) (obj, th) -> {
            if (th == null) {
                callAsync.complete(obj);
            } else {
                callAsync.completeExceptionally(th);
            }
        });
        callAsync.whenComplete((BiConsumer) (obj2, th2) -> {
            if (th2 == null) {
                completableFuture.complete(obj2);
            } else {
                completableFuture.completeExceptionally(th2);
            }
        });
        return completableFuture;
    }

    public static <U> CompletableFuture<U> callAsync(Callable<? extends U> callable, Executor executor) {
        return CompletableFuture.supplyAsync(callable == null ? null : () -> {
            try {
                return callable.call();
            } catch (Error | RuntimeException e) {
                throw e;
            } catch (Throwable th) {
                throw new CompletionException(th);
            }
        }, executor);
    }
}
