package org.mule.service.http.netty.impl.streaming;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.io.InputStream;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.service.http.netty.impl.server.FinishStreamingListener;
import org.mule.service.http.netty.impl.server.SendNextChunkListener;

/* loaded from: input_file:lib/mule-netty-http-service-0.1.2-SNAPSHOT.jar:org/mule/service/http/netty/impl/streaming/StreamingEntitySender.class */
public class StreamingEntitySender {
    public static final int ENTITY_STREAMING_BUFFER_SIZE = 8192;
    private final ChannelHandlerContext ctx;
    private final Runnable beforeWrite;
    private final StatusCallback statusCallback;
    private final InputStream contentAsInputStream;
    private final OptionalLong entityLength;
    private final int bufferSize = calculateBufferSize();
    private final AtomicLong bytesAlreadySent = new AtomicLong(0);

    public StreamingEntitySender(HttpEntity httpEntity, ChannelHandlerContext channelHandlerContext, Runnable runnable, StatusCallback statusCallback) {
        this.ctx = channelHandlerContext;
        this.contentAsInputStream = httpEntity.getContent();
        this.entityLength = httpEntity.getBytesLength();
        this.beforeWrite = runnable;
        this.statusCallback = statusCallback;
    }

    public void sendNextChunk() throws IOException {
        if (this.bufferSize == 0) {
            sendEmptyContentAndFinish(this.contentAsInputStream);
            return;
        }
        byte[] bArr = new byte[this.bufferSize];
        int readChunk = readChunk(this.contentAsInputStream, bArr);
        if (readChunk == -1) {
            sendEmptyContentAndFinish(this.contentAsInputStream);
            return;
        }
        ByteBuf createBuffer = createBuffer(readChunk);
        createBuffer.writeBytes(bArr, 0, readChunk);
        if (weKnowItIsTheLastChunk(readChunk, this.bytesAlreadySent.get())) {
            sendBufferWithPromise(new DefaultLastHttpContent(createBuffer), finishStreamingPromise(this.contentAsInputStream));
        } else {
            sendBufferWithPromise(new DefaultHttpContent(createBuffer), sendNextChunkPromise());
        }
        this.bytesAlreadySent.addAndGet(readChunk);
    }

    private boolean weKnowItIsTheLastChunk(int i, long j) {
        return this.entityLength.isPresent() && j + ((long) i) >= this.entityLength.getAsLong();
    }

    private int calculateBufferSize() {
        return Math.min(Math.toIntExact(this.entityLength.orElse(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE)), 8192);
    }

    private ByteBuf createBuffer(int i) {
        return this.ctx.alloc().buffer(i, i);
    }

    private ChannelPromise createPromise(ChannelFutureListener channelFutureListener) {
        ChannelPromise newPromise = this.ctx.newPromise();
        newPromise.addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFutureListener);
        return newPromise;
    }

    private void sendEmptyContentAndFinish(InputStream inputStream) {
        this.beforeWrite.run();
        this.ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, finishStreamingPromise(inputStream));
    }

    private ChannelPromise finishStreamingPromise(InputStream inputStream) {
        return createPromise(new FinishStreamingListener(inputStream, this.statusCallback));
    }

    private ChannelPromise sendNextChunkPromise() {
        ChannelPromise newPromise = this.ctx.newPromise();
        newPromise.addListener2((GenericFutureListener<? extends Future<? super Void>>) new SendNextChunkListener(this, this.statusCallback));
        return newPromise;
    }

    private void sendBufferWithPromise(HttpContent httpContent, ChannelPromise channelPromise) {
        this.beforeWrite.run();
        this.ctx.writeAndFlush(httpContent, channelPromise);
    }

    private static int readChunk(InputStream inputStream, byte[] bArr) throws IOException {
        try {
            return inputStream.read(bArr);
        } catch (IllegalStateException e) {
            if ("Buffer is closed".equals(e.getMessage())) {
                return -1;
            }
            throw e;
        }
    }
}
