package org.mule.transport.nio.tcp.io;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.mule.transport.nio.tcp.ChannelReceiverResource;
import org.mule.transport.nio.tcp.TcpProtocol;
import org.mule.transport.nio.tcp.protocols.StreamingProtocol;
import org.mule.util.concurrent.Latch;

/* loaded from: input_file:org/mule/transport/nio/tcp/io/ChannelInputStream.class */
public class ChannelInputStream extends InputStream implements ChannelReceiverResource {
    protected static final long CHANNEL_BACK_PRESSURE_TIMEOUT_MILLIS = 30000;
    protected static final long STREAMING_MAX_DATA_WAIT_MILLIS = 100;
    protected static final int STREAMING_BUFFER_QUEUE_SIZE = 10;
    protected static final int DEFAULT_MAXIMUM_AVAILABLE_SIZE = 1048576;
    protected final Channel channel;
    protected final AtomicInteger availableBytes;
    protected final AtomicLong totalBytesReceived;
    protected final AtomicLong expectedBytes;
    protected final BlockingQueue<ByteBuffer> byteBuffers;
    protected volatile long maxDataAvailable = 1048576;
    protected volatile long maxDataWait;
    protected volatile boolean open;
    protected volatile Latch flowControlLatch;
    protected volatile boolean expectingBytes;
    protected volatile ByteBuffer currentByteBuffer;
    protected volatile Runnable beforeCloseAction;
    protected volatile Runnable afterCloseAction;
    protected static final Log LOGGER = LogFactory.getLog(ChannelInputStream.class);
    protected static final ByteBuffer CHANNEL_CLOSED_POISON_PILL = ByteBuffer.allocate(0);

    public ChannelInputStream(Channel channel, TcpProtocol tcpProtocol) {
        this.maxDataWait = STREAMING_MAX_DATA_WAIT_MILLIS;
        Validate.notNull(channel, "channel can't be null");
        this.channel = channel;
        this.maxDataWait = tcpProtocol instanceof StreamingProtocol ? STREAMING_MAX_DATA_WAIT_MILLIS : Long.MAX_VALUE;
        this.availableBytes = new AtomicInteger(0);
        this.expectedBytes = new AtomicLong(0L);
        this.totalBytesReceived = new AtomicLong(0L);
        this.byteBuffers = new LinkedBlockingQueue();
        this.open = true;
        this.currentByteBuffer = null;
        this.expectingBytes = false;
        channel.getCloseFuture().addListener(new ChannelFutureListener() { // from class: org.mule.transport.nio.tcp.io.ChannelInputStream.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                ChannelInputStream.this.byteBuffers.offer(ChannelInputStream.CHANNEL_CLOSED_POISON_PILL);
            }
        });
    }

    public String toString() {
        return ToStringBuilder.reflectionToString(this);
    }

    @Override // org.mule.transport.nio.tcp.ChannelReceiverResource
    public boolean isActive() {
        return this.channel.isOpen() || (!this.channel.isOpen() && isOpen() && this.availableBytes.get() > 0);
    }

    public void offer(byte[] bArr) throws IOException {
        Validate.notNull(bArr, "bytes can't be null");
        if (bArr.length == 0) {
            return;
        }
        if (!isOpen()) {
            throw new EOFException("Attempt to write to a closed stream, discarding bytes: " + Arrays.toString(bArr));
        }
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        if (available() >= this.maxDataAvailable) {
            ponderForSpace();
        }
        this.byteBuffers.offer(wrap);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Enqueued: %d bytes in: %s", Integer.valueOf(bArr.length), this));
        }
        this.availableBytes.addAndGet(bArr.length);
        this.totalBytesReceived.addAndGet(bArr.length);
    }

    protected void ponderForSpace() throws IOException, InterruptedIOException {
        long currentTimeMillis = System.currentTimeMillis();
        while (available() >= this.maxDataAvailable && System.currentTimeMillis() - currentTimeMillis < CHANNEL_BACK_PRESSURE_TIMEOUT_MILLIS) {
            this.flowControlLatch = new Latch();
            try {
                this.flowControlLatch.await(CHANNEL_BACK_PRESSURE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new InterruptedIOException("Failed to enqueue bytes (interrupted)");
            }
        }
        if (available() >= this.maxDataAvailable) {
            throw new InterruptedIOException("Failed to enqueue bytes (buffer over limit of: " + this.maxDataAvailable + ")");
        }
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (isOpen()) {
            if (this.beforeCloseAction != null) {
                this.beforeCloseAction.run();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Before close action: %s run on: %s", this.beforeCloseAction, this));
                }
            }
            super.close();
            this.open = false;
            if (this.afterCloseAction != null) {
                this.afterCloseAction.run();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("After close action: %s run on: %s", this.afterCloseAction, this));
                }
            }
        }
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr) throws IOException {
        ByteBuffer dataToRead = getDataToRead();
        if (dataToRead == null) {
            return -1;
        }
        int min = Math.min(dataToRead.remaining(), bArr.length);
        dataToRead.get(bArr, 0, min);
        decrementAvailable(min);
        return min;
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        ByteBuffer dataToRead = getDataToRead();
        if (dataToRead == null) {
            return -1;
        }
        int min = Math.min(dataToRead.remaining(), i2);
        dataToRead.get(bArr, i, min);
        decrementAvailable(min);
        return min;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        ByteBuffer dataToRead = getDataToRead();
        if (dataToRead == null) {
            return -1;
        }
        decrementAvailable(1);
        return dataToRead.get() & 255;
    }

    protected void decrementAvailable(int i) {
        this.availableBytes.addAndGet(-i);
        if (this.flowControlLatch != null) {
            this.flowControlLatch.release();
        }
    }

    protected ByteBuffer getDataToRead() throws IOException {
        if (!isOpen()) {
            throw new EOFException("Reading from closed input stream: " + this);
        }
        if (this.currentByteBuffer != null && this.currentByteBuffer.hasRemaining()) {
            return this.currentByteBuffer;
        }
        if (this.byteBuffers.isEmpty() && !this.channel.isOpen()) {
            return null;
        }
        long j = this.expectedBytes.get();
        if (this.expectingBytes && available() == 0 && j > 0 && this.totalBytesReceived.get() >= j) {
            this.expectingBytes = false;
            if (!LOGGER.isDebugEnabled()) {
                return null;
            }
            LOGGER.debug(String.format("Not waiting for further data: received %d bytes of %d expected (and %d byte(s) available left)", Long.valueOf(this.totalBytesReceived.get()), Long.valueOf(j), Integer.valueOf(available())));
            return null;
        }
        try {
            ByteBuffer poll = this.byteBuffers.poll(this.maxDataWait, TimeUnit.MILLISECONDS);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Polled %s with %d buffer(s) in queue and %d byte(s) available", poll, Integer.valueOf(this.byteBuffers.size()), Integer.valueOf(available())));
            }
            this.currentByteBuffer = poll == CHANNEL_CLOSED_POISON_PILL ? null : poll;
            return this.currentByteBuffer;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    @Override // java.io.InputStream
    public int available() {
        return this.availableBytes.get();
    }

    public boolean isOpen() {
        return this.open;
    }

    @Override // org.mule.transport.nio.tcp.ChannelReceiverResource
    public Channel getChannel() {
        return this.channel;
    }

    public long getMaxDataWait() {
        return this.maxDataWait;
    }

    public void setMaxDataWait(long j) {
        this.maxDataWait = j;
    }

    public long getMaxDataAvailable() {
        return this.maxDataAvailable;
    }

    public void setMaxDataAvailable(long j) {
        this.maxDataAvailable = j;
    }

    public boolean hasBeforeCloseAction() {
        return this.beforeCloseAction != null;
    }

    public void setBeforeCloseAction(Runnable runnable) {
        Validate.isTrue(this.beforeCloseAction == null, "can't re-assign beforeCloseAction");
        this.beforeCloseAction = runnable;
    }

    public boolean hasAfterCloseAction() {
        return this.afterCloseAction != null;
    }

    public void setAfterCloseAction(Runnable runnable) {
        Validate.isTrue(this.afterCloseAction == null, "can't re-assign afterCloseAction");
        this.afterCloseAction = runnable;
    }

    public void setExpectedBytes(long j) {
        Validate.isTrue(j > 0, "expectedBytes must be > 0");
        this.expectedBytes.addAndGet(j);
        this.expectingBytes = true;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Expected bytes set to: %d (currently received: %d)", Long.valueOf(this.expectedBytes.get()), Long.valueOf(this.totalBytesReceived.get())));
        }
    }

    public void resetExpectedBytes() {
        this.expectingBytes = false;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Reset expected bytes (currently received: %d)", Long.valueOf(this.totalBytesReceived.get())));
        }
    }
}
