package reactor.core.publisher;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.raml.v2.internal.impl.v10.type.TypeToXmlSchemaVisitor;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.EventLoopProcessor;
import reactor.core.publisher.RingBuffer;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.concurrent.WaitStrategy;

/* loaded from: input_file:repository/io/projectreactor/reactor-core/3.2.0.M1/reactor-core-3.2.0.M1.jar:reactor/core/publisher/WorkQueueProcessor.class */
public final class WorkQueueProcessor<E> extends EventLoopProcessor<E> {
    final RingBuffer.Sequence workSequence;
    final Queue<Object> claimedDisposed;
    final WaitStrategy writeWait;
    volatile int replaying;
    static final Supplier FACTORY = EventLoopProcessor.Slot::new;
    static final AtomicIntegerFieldUpdater<WorkQueueProcessor> REPLAYING = AtomicIntegerFieldUpdater.newUpdater(WorkQueueProcessor.class, "replaying");
    static final Logger log = Loggers.getLogger((Class<?>) WorkQueueProcessor.class);

    /* loaded from: input_file:repository/io/projectreactor/reactor-core/3.2.0.M1/reactor-core-3.2.0.M1.jar:reactor/core/publisher/WorkQueueProcessor$Builder.class */
    public static final class Builder<T> {
        String name;
        ExecutorService executor;
        ExecutorService requestTaskExecutor;
        WaitStrategy waitStrategy;
        int bufferSize = Queues.SMALL_BUFFER_SIZE;
        boolean autoCancel = true;
        boolean share = false;

        Builder() {
        }

        public Builder<T> name(@Nullable String str) {
            if (this.executor != null) {
                throw new IllegalArgumentException("Executor service is configured, name will not be used.");
            }
            this.name = str;
            return this;
        }

        public Builder<T> bufferSize(int i) {
            if (!Queues.isPowerOfTwo(i)) {
                throw new IllegalArgumentException("bufferSize must be a power of 2 : " + i);
            }
            if (i < 1) {
                throw new IllegalArgumentException("bufferSize must be strictly positive, was: " + i);
            }
            this.bufferSize = i;
            return this;
        }

        public Builder<T> waitStrategy(@Nullable WaitStrategy waitStrategy) {
            this.waitStrategy = waitStrategy;
            return this;
        }

        public Builder<T> autoCancel(boolean z) {
            this.autoCancel = z;
            return this;
        }

        public Builder<T> executor(@Nullable ExecutorService executorService) {
            this.executor = executorService;
            return this;
        }

        public Builder<T> requestTaskExecutor(@Nullable ExecutorService executorService) {
            this.requestTaskExecutor = executorService;
            return this;
        }

        public Builder<T> share(boolean z) {
            this.share = z;
            return this;
        }

        public WorkQueueProcessor<T> build() {
            String simpleName = this.name != null ? this.name : WorkQueueProcessor.class.getSimpleName();
            WaitStrategy liteBlocking = this.waitStrategy != null ? this.waitStrategy : WaitStrategy.liteBlocking();
            EventLoopProcessor.EventLoopFactory eventLoopFactory = this.executor != null ? null : new EventLoopProcessor.EventLoopFactory(simpleName, this.autoCancel);
            return new WorkQueueProcessor<>(eventLoopFactory, this.executor, this.requestTaskExecutor != null ? this.requestTaskExecutor : EventLoopProcessor.defaultRequestTaskExecutor(EventLoopProcessor.defaultName(eventLoopFactory, WorkQueueProcessor.class)), this.bufferSize, liteBlocking, this.share, this.autoCancel);
        }
    }

    /* loaded from: input_file:repository/io/projectreactor/reactor-core/3.2.0.M1/reactor-core-3.2.0.M1.jar:reactor/core/publisher/WorkQueueProcessor$WorkQueueInner.class */
    static final class WorkQueueInner<T> implements Runnable, InnerProducer<T> {
        final RingBuffer.Reader barrier;
        final WorkQueueProcessor<T> processor;
        final CoreSubscriber<? super T> subscriber;
        final AtomicBoolean running = new AtomicBoolean(true);
        final RingBuffer.Sequence sequence = RingBuffer.newSequence(-1);
        final RingBuffer.Sequence pendingRequest = RingBuffer.newSequence(0);
        final Runnable waiter = new Runnable() { // from class: reactor.core.publisher.WorkQueueProcessor.WorkQueueInner.1
            @Override // java.lang.Runnable
            public void run() {
                if (!WorkQueueInner.this.barrier.isAlerted() && WorkQueueInner.this.isRunning()) {
                    if (!WorkQueueInner.this.replay(WorkQueueInner.this.pendingRequest.getAsLong() == TypeToXmlSchemaVisitor.UNBOUNDED)) {
                        return;
                    }
                }
                WaitStrategy.alert();
            }
        };

        WorkQueueInner(CoreSubscriber<? super T> coreSubscriber, WorkQueueProcessor<T> workQueueProcessor) {
            this.processor = workQueueProcessor;
            this.subscriber = coreSubscriber;
            this.barrier = workQueueProcessor.ringBuffer.newReader();
        }

        void halt() {
            this.running.set(false);
            this.barrier.alert();
        }

        boolean isRunning() {
            return this.running.get() && (this.processor.terminated == 0 || (this.processor.error == null && this.processor.ringBuffer.getAsLong() > this.sequence.getAsLong()));
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            try {
                Thread.currentThread().setContextClassLoader(this.processor.contextClassLoader);
                this.subscriber.onSubscribe(this);
                long j = Long.MIN_VALUE;
                long asLong = this.sequence.getAsLong();
                EventLoopProcessor.Slot<T> slot = null;
                boolean z2 = this.pendingRequest.getAsLong() == TypeToXmlSchemaVisitor.UNBOUNDED;
                if (!EventLoopProcessor.waitRequestOrTerminalEvent(this.pendingRequest, this.barrier, this.running, this.sequence, this.waiter) && replay(z2)) {
                    if (!this.running.get()) {
                        this.processor.decrementSubscribers();
                        this.running.set(false);
                        if (1 == 0) {
                            this.processor.claimedDisposed.add(this.sequence);
                        } else {
                            this.processor.ringBuffer.removeGatingSequence(this.sequence);
                        }
                        this.processor.writeWait.signalAllWhenBlocking();
                        return;
                    }
                    if (this.processor.terminated == 1 && this.processor.ringBuffer.getAsLong() == -1) {
                        if (this.processor.error != null) {
                            this.subscriber.onError(this.processor.error);
                            this.processor.decrementSubscribers();
                            this.running.set(false);
                            if (1 == 0) {
                                this.processor.claimedDisposed.add(this.sequence);
                            } else {
                                this.processor.ringBuffer.removeGatingSequence(this.sequence);
                            }
                            this.processor.writeWait.signalAllWhenBlocking();
                            return;
                        }
                        this.subscriber.onComplete();
                        this.processor.decrementSubscribers();
                        this.running.set(false);
                        if (1 == 0) {
                            this.processor.claimedDisposed.add(this.sequence);
                        } else {
                            this.processor.ringBuffer.removeGatingSequence(this.sequence);
                        }
                        this.processor.writeWait.signalAllWhenBlocking();
                        return;
                    }
                }
                while (true) {
                    if (z) {
                        try {
                            if (!this.running.get()) {
                                break;
                            }
                            z = false;
                            do {
                                asLong = this.processor.workSequence.getAsLong() + 1;
                                while (!z2 && this.pendingRequest.getAsLong() == 0) {
                                    if (!isRunning()) {
                                        WaitStrategy.alert();
                                    }
                                    LockSupport.parkNanos(1L);
                                }
                                this.sequence.set(asLong - 1);
                            } while (!this.processor.workSequence.compareAndSet(asLong - 1, asLong));
                        } catch (InterruptedException | RuntimeException e) {
                            if (!Exceptions.isCancel(e)) {
                                if (!WaitStrategy.isAlert(e)) {
                                    throw Exceptions.propagate(e);
                                }
                                this.barrier.clearAlert();
                                if (!this.running.get()) {
                                    break;
                                }
                                if (this.processor.terminated == 1) {
                                    if (this.processor.error == null) {
                                        if (this.processor.ringBuffer.getPending() == 0) {
                                            z = true;
                                            this.subscriber.onComplete();
                                            break;
                                        }
                                    } else {
                                        z = true;
                                        this.subscriber.onError(this.processor.error);
                                        break;
                                    }
                                }
                            } else {
                                reschedule(slot);
                                break;
                            }
                        }
                    }
                    if (j >= asLong) {
                        slot = (EventLoopProcessor.Slot) this.processor.ringBuffer.get(asLong);
                        try {
                            readNextEvent(z2);
                        } catch (Exception e2) {
                            if (!this.running.get() || !WaitStrategy.isAlert(e2)) {
                                throw e2;
                                break;
                            }
                            this.barrier.clearAlert();
                        }
                        z = true;
                        this.subscriber.onNext(slot.value);
                    } else {
                        this.processor.readWait.signalAllWhenBlocking();
                        j = this.barrier.waitFor(asLong, this.waiter);
                    }
                }
            } finally {
                this.processor.decrementSubscribers();
                this.running.set(false);
                if (z) {
                    this.processor.ringBuffer.removeGatingSequence(this.sequence);
                } else {
                    this.processor.claimedDisposed.add(this.sequence);
                }
                this.processor.writeWait.signalAllWhenBlocking();
            }
        }

        boolean replay(boolean z) {
            try {
                if (!WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 0, 1)) {
                    return !this.processor.alive() && this.processor.ringBuffer.getPending() == 0;
                }
                RingBuffer.Sequence sequence = null;
                while (this.running.get()) {
                    try {
                        Object peek = this.processor.claimedDisposed.peek();
                        if (peek == null) {
                            this.processor.readWait.signalAllWhenBlocking();
                            boolean z2 = !this.processor.alive() && this.processor.ringBuffer.getPending() == 0;
                            WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                            return z2;
                        }
                        if (peek instanceof RingBuffer.Sequence) {
                            sequence = (RingBuffer.Sequence) peek;
                            long asLong = sequence.getAsLong() + 1;
                            if (asLong > this.processor.ringBuffer.getAsLong()) {
                                this.processor.readWait.signalAllWhenBlocking();
                                boolean z3 = !this.processor.alive() && this.processor.ringBuffer.getPending() == 0;
                                WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                                return z3;
                            }
                            this.barrier.waitFor(asLong, this.waiter);
                            peek = (T) ((EventLoopProcessor.Slot) this.processor.ringBuffer.get(asLong)).value;
                            if (peek == null) {
                                this.processor.ringBuffer.removeGatingSequence(sequence);
                                this.processor.claimedDisposed.poll();
                                sequence = null;
                            }
                        }
                        readNextEvent(z);
                        this.subscriber.onNext(peek);
                        this.processor.claimedDisposed.poll();
                        if (sequence != null) {
                            this.processor.ringBuffer.removeGatingSequence(sequence);
                            sequence = null;
                        }
                    } catch (InterruptedException e) {
                        this.running.set(false);
                        WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                        return true;
                    } catch (RuntimeException e2) {
                        if (this.running.get() && !Exceptions.isCancel(e2)) {
                            throw e2;
                        }
                        this.running.set(false);
                        WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                        return true;
                    }
                }
                this.processor.readWait.signalAllWhenBlocking();
                WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                return true;
            } catch (Throwable th) {
                WorkQueueProcessor.REPLAYING.compareAndSet(this.processor, 1, 0);
                throw th;
            }
        }

        boolean reschedule(@Nullable EventLoopProcessor.Slot<T> slot) {
            if (slot == null || slot.value == null) {
                return false;
            }
            this.processor.claimedDisposed.add(slot.value);
            this.barrier.alert();
            this.processor.readWait.signalAllWhenBlocking();
            return true;
        }

        void readNextEvent(boolean z) {
            while (!z && EventLoopProcessor.getAndSub(this.pendingRequest, 1L) == 0) {
                if (!isRunning()) {
                    WaitStrategy.alert();
                }
                LockSupport.parkNanos(1L);
            }
        }

        @Override // reactor.core.publisher.InnerProducer, reactor.core.Scannable
        @Nullable
        public Object scanUnsafe(Scannable.Attr attr) {
            if (attr == Scannable.Attr.PARENT) {
                return this.processor;
            }
            if (attr == Scannable.Attr.PREFETCH) {
                return Integer.MAX_VALUE;
            }
            if (attr == Scannable.Attr.TERMINATED) {
                return Boolean.valueOf(this.processor.isTerminated());
            }
            if (attr == Scannable.Attr.CANCELLED) {
                return Boolean.valueOf(!this.running.get());
            }
            return attr == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM ? Long.valueOf(this.pendingRequest.getAsLong()) : super.scanUnsafe(attr);
        }

        @Override // reactor.core.publisher.InnerProducer
        public CoreSubscriber<? super T> actual() {
            return this.subscriber;
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (Operators.validate(j) && this.running.get()) {
                EventLoopProcessor.addCap(this.pendingRequest, j);
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            halt();
        }
    }

    public static final <T> Builder<T> builder() {
        return new Builder<>();
    }

    public static <E> WorkQueueProcessor<E> create() {
        return builder().build();
    }

    public static <E> WorkQueueProcessor<E> create(String str, int i) {
        return builder().name(str).bufferSize(i).build();
    }

    public static <E> WorkQueueProcessor<E> share(String str, int i) {
        return builder().share(true).name(str).bufferSize(i).build();
    }

    WorkQueueProcessor(@Nullable ThreadFactory threadFactory, @Nullable ExecutorService executorService, ExecutorService executorService2, int i, WaitStrategy waitStrategy, boolean z, boolean z2) {
        super(i, threadFactory, executorService, executorService2, z2, z, FACTORY, waitStrategy);
        this.workSequence = RingBuffer.newSequence(-1L);
        this.claimedDisposed = new ConcurrentLinkedQueue();
        this.writeWait = waitStrategy;
        this.ringBuffer.addGatingSequence(this.workSequence);
    }

    @Override // reactor.core.publisher.Flux
    public void subscribe(CoreSubscriber<? super E> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "subscribe");
        if (!alive()) {
            TopicProcessor.coldSource(this.ringBuffer, null, this.error, this.workSequence).subscribe((CoreSubscriber) coreSubscriber);
            return;
        }
        WorkQueueInner workQueueInner = new WorkQueueInner(coreSubscriber, this);
        try {
            incrementSubscribers();
            workQueueInner.sequence.set(this.workSequence.getAsLong());
            this.ringBuffer.addGatingSequence(workQueueInner.sequence);
            int bestEffortMaxSubscribers = bestEffortMaxSubscribers(this.executor);
            if (bestEffortMaxSubscribers > Integer.MIN_VALUE && this.subscriberCount > bestEffortMaxSubscribers) {
                throw new IllegalStateException("The executor service could not accommodate another subscriber, detected limit " + bestEffortMaxSubscribers);
            }
            this.executor.execute(workQueueInner);
        } catch (Throwable th) {
            decrementSubscribers();
            this.ringBuffer.removeGatingSequence(workQueueInner.sequence);
            if (RejectedExecutionException.class.isAssignableFrom(th.getClass())) {
                TopicProcessor.coldSource(this.ringBuffer, th, this.error, this.workSequence).subscribe((CoreSubscriber) coreSubscriber);
            } else {
                Operators.error(coreSubscriber, th);
            }
        }
    }

    static int bestEffortMaxSubscribers(ExecutorService executorService) {
        int i = Integer.MIN_VALUE;
        if (executorService instanceof ThreadPoolExecutor) {
            i = ((ThreadPoolExecutor) executorService).getMaximumPoolSize();
        } else if (executorService instanceof ForkJoinPool) {
            i = ((ForkJoinPool) executorService).getParallelism();
        }
        return i;
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    public Flux<E> drain() {
        return TopicProcessor.coldSource(this.ringBuffer, null, this.error, this.workSequence);
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void doError(Throwable th) {
        this.writeWait.signalAllWhenBlocking();
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void doComplete() {
        this.writeWait.signalAllWhenBlocking();
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    protected void requestTask(Subscription subscription) {
        ExecutorService executorService = this.requestTaskExecutor;
        RingBuffer<EventLoopProcessor.Slot<IN>> ringBuffer = this.ringBuffer;
        ringBuffer.getClass();
        executorService.execute(createRequestTask(subscription, this, null, ringBuffer::getMinimumGatingSequence));
    }

    @Override // reactor.core.publisher.EventLoopProcessor
    public long getPending() {
        return (getBufferSize() - this.ringBuffer.getPending()) + this.claimedDisposed.size();
    }

    @Override // java.lang.Runnable
    public void run() {
        if (alive()) {
            return;
        }
        WaitStrategy.alert();
    }

    @Override // reactor.core.publisher.EventLoopProcessor, reactor.core.publisher.FluxProcessor
    public /* bridge */ /* synthetic */ long downstreamCount() {
        return super.downstreamCount();
    }

    @Override // reactor.core.publisher.EventLoopProcessor, reactor.core.publisher.FluxProcessor
    public /* bridge */ /* synthetic */ boolean isSerialized() {
        return super.isSerialized();
    }

    @Override // reactor.core.publisher.EventLoopProcessor, reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    public /* bridge */ /* synthetic */ Stream inners() {
        return super.inners();
    }

    @Override // reactor.core.publisher.EventLoopProcessor, reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    @Nullable
    public /* bridge */ /* synthetic */ Object scanUnsafe(Scannable.Attr attr) {
        return super.scanUnsafe(attr);
    }
}
