/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.netty.impl.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.util.AttributeKey;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.netty.impl.client.ChunkedHttpEntityPublisher;
import org.mule.service.http.netty.impl.message.HttpResponseCreator;
import org.mule.service.http.netty.impl.streaming.BlockingBidirectionalStream;
import org.mule.service.http.netty.impl.streaming.CancelableOutputStream;
import org.mule.service.http.netty.impl.util.HttpUtils;
import org.mule.service.http.netty.impl.util.MuleToNettyUtils;
import org.mule.service.http.netty.impl.util.ReactorNettyUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;

public class ReactorNettyClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorNettyClient.class);
    private static final int DEFAULT_RESPONSE_RECEPTION_BUFFER_SIZE = DataUnit.KB.toBytes(10);
    private static final int CONTENT_LENGTH_TO_HANDLE_AGGREGATED = DataUnit.KB.toBytes(10);
    private final Executor resultExecutor = Executors.newFixedThreadPool(10);
    private final HttpClient httpClient;
    static final AttributeKey<HttpEntity> REQUEST_ENTITY_KEY = AttributeKey.valueOf((String)"REQUEST_ENTITY");

    public ReactorNettyClient(HttpClient httpClient) {
        this.httpClient = httpClient;
    }

    public Flux<ByteBuf> sendAsyncRequest(HttpRequest request, HttpRequestOptions options, HttpHeaders headersToAdd, BiFunction<HttpClientResponse, ByteBufFlux, Publisher<ByteBuf>> responseFunction, CompletableFuture<HttpResponse> result) {
        URI uri = ReactorNettyClient.uriWithQueryParams(request);
        LOGGER.debug("Sending request to {} with headers {}", (Object)uri, (Object)headersToAdd);
        return ((HttpClient.RequestSender)((HttpClient)this.httpClient.followRedirect(options.isFollowsRedirect()).doOnConnected(connection -> {
            connection.channel().attr(AttributeKey.valueOf((String)"removeContentLength")).set((Object)MuleToNettyUtils.calculateShouldRemoveContentLength(request));
            if (request.getEntity() != null && this.isExpect(headersToAdd)) {
                connection.channel().attr(REQUEST_ENTITY_KEY).set((Object)request.getEntity());
            }
        })).responseTimeout(Duration.ofMillis(options.getResponseTimeout())).headers(h -> h.add(headersToAdd)).request(HttpMethod.valueOf((String)request.getMethod())).uri(uri)).send((httpClientRequest, nettyOutbound) -> {
            if (this.isExpect(httpClientRequest.requestHeaders())) {
                return nettyOutbound.send((Publisher)Mono.empty());
            }
            if (request.getEntity() != null) {
                return nettyOutbound.send(this.entityPublisher(request.getEntity()));
            }
            return null;
        }).response(responseFunction).onErrorMap(ReactorNettyUtils::onErrorMap).doOnError(result::completeExceptionally).onErrorComplete();
    }

    private boolean isExpect(HttpHeaders entries) {
        return entries.contains("expect");
    }

    private static URI uriWithQueryParams(HttpRequest request) {
        MultiMap queryParams = request.getQueryParams();
        if (queryParams.isEmpty()) {
            return request.getUri();
        }
        return URI.create(HttpUtils.buildUriString(request.getUri(), (MultiMap<String, String>)queryParams));
    }

    private Publisher<? extends ByteBuf> entityPublisher(HttpEntity entity) {
        if (entity.getBytesLength().isPresent() && entity.getBytesLength().getAsLong() == 0L) {
            return Mono.empty();
        }
        return new ChunkedHttpEntityPublisher(entity);
    }

    public Publisher<ByteBuf> receiveContent(HttpClientResponse response, ByteBufFlux content, CompletableFuture<HttpResponse> result) {
        LOGGER.debug("Received response with headers {} and status {}", (Object)response.responseHeaders(), (Object)response.status());
        if (this.responseIsShortEnough(response)) {
            return this.handleShortResponse(response, content, result);
        }
        return this.handleResponseStreaming(response, content, result);
    }

    private Publisher<ByteBuf> handleResponseStreaming(HttpClientResponse response, ByteBufFlux content, CompletableFuture<HttpResponse> result) {
        try {
            BlockingBidirectionalStream bidirectionalStream = new BlockingBidirectionalStream();
            InputStream in = bidirectionalStream.getInputStream();
            CancelableOutputStream out = bidirectionalStream.getOutputStream();
            Flux contentFlux = content.retain().doOnNext(data -> {
                try {
                    byte[] bytes = new byte[data.readableBytes()];
                    data.readBytes(bytes);
                    out.write(bytes);
                    data.release();
                    if (!result.isDone()) {
                        LOGGER.debug("Marked response as completed but still waiting on content");
                        ReactorNettyClient.completeAsync(result, () -> new HttpResponseCreator().create(response, in), this.resultExecutor);
                    }
                }
                catch (IOException e) {
                    result.completeExceptionally(e);
                    data.release();
                }
            }).doOnComplete(() -> {
                try {
                    LOGGER.debug("Marked response as completed");
                    out.close();
                    if (!result.isDone()) {
                        ReactorNettyClient.completeAsync(result, () -> new HttpResponseCreator().create(response, in), this.resultExecutor);
                    }
                }
                catch (Exception e) {
                    result.completeExceptionally(e);
                }
            }).doOnError(error -> {
                out.cancel((Throwable)error);
                Throwable mappedError = ReactorNettyUtils.onErrorMap(error);
                result.completeExceptionally(mappedError);
            });
            if (!result.isDone()) {
                ReactorNettyClient.completeAsync(result, () -> new HttpResponseCreator().create(response, in), this.resultExecutor);
            }
            return contentFlux;
        }
        catch (Exception e) {
            result.completeExceptionally(e);
            return content;
        }
    }

    private Publisher<ByteBuf> handleShortResponse(HttpClientResponse response, ByteBufFlux content, CompletableFuture<HttpResponse> result) {
        return content.aggregate().doOnError(error -> {
            Throwable mappedError = ReactorNettyUtils.onErrorMap(error);
            result.completeExceptionally(mappedError);
        }).doOnSuccess(byteBuf -> {
            byte[] bytes = byteBuf != null ? ByteBufUtil.getBytes((ByteBuf)byteBuf) : new byte[]{};
            result.complete(new HttpResponseCreator().create(response, (InputStream)new ByteArrayInputStream(bytes)));
        });
    }

    private boolean responseIsShortEnough(HttpClientResponse response) {
        String contentLength = response.responseHeaders().get("Content-Length");
        if (contentLength == null) {
            return false;
        }
        return Integer.parseInt(contentLength) < CONTENT_LENGTH_TO_HANDLE_AGGREGATED;
    }

    public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> result, Callable<? extends T> callable, Executor executor) {
        if (result == null) {
            throw new NullPointerException();
        }
        CompletableFuture<Object> delegate = ReactorNettyClient.callAsync(callable == null ? null : () -> result.isDone() ? null : callable.call(), executor);
        if (delegate == null) {
            return null;
        }
        result.whenComplete((v, t) -> {
            if (t == null) {
                delegate.complete(v);
                return;
            }
            delegate.completeExceptionally((Throwable)t);
        });
        delegate.whenComplete((v, t) -> {
            if (t == null) {
                result.complete(v);
                return;
            }
            result.completeExceptionally((Throwable)t);
        });
        return result;
    }

    public static <U> CompletableFuture<U> callAsync(Callable<? extends U> callable, Executor executor) {
        return CompletableFuture.supplyAsync(callable == null ? null : () -> {
            try {
                return callable.call();
            }
            catch (Error | RuntimeException e) {
                throw e;
            }
            catch (Throwable t) {
                throw new CompletionException(t);
            }
        }, executor);
    }
}

