package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.util.rx.RetrySchedulerWrapper;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.retry.BackoffDelay;
import reactor.retry.Retry;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategy.class */
public abstract class ProactorStreamProcessingStrategy extends ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy {
    protected static final int STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD = Integer.getInteger(AbstractStreamProcessingStrategyFactory.SYSTEM_PROPERTY_PREFIX + "STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD", DataUnit.KB.toBytes(16)).intValue();
    private static Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProactorStreamProcessingStrategy.class);
    private static long SCHEDULER_BUSY_RETRY_INTERVAL_MS = 2;
    private static long SCHEDULER_BUSY_RETRY_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(SCHEDULER_BUSY_RETRY_INTERVAL_MS);
    private final Supplier<Scheduler> blockingSchedulerSupplier;
    private final Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
    private Scheduler blockingScheduler;
    private Scheduler cpuIntensiveScheduler;
    private final AtomicLong lastRetryTimestamp;
    private final AtomicInteger inFlightEvents;
    private final BiConsumer<CoreEvent, Throwable> IN_FLIGHT_DECREMENT_CALLBACK;
    private final LongUnaryOperator LAST_RETRY_TIMESTAMP_CHECK_OPERATOR;

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategy$ProactorSinkWrapper.class */
    protected final class ProactorSinkWrapper<E> implements AbstractProcessingStrategy.ReactorSink<E> {
        private final AbstractProcessingStrategy.ReactorSink<E> innerSink;

        /* JADX INFO: Access modifiers changed from: protected */
        public ProactorSinkWrapper(AbstractProcessingStrategy.ReactorSink<E> reactorSink) {
            this.innerSink = reactorSink;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.mule.runtime.core.api.processor.Sink, java.util.function.Consumer
        public final void accept(CoreEvent coreEvent) {
            if (!checkCapacity(coreEvent)) {
                throw new RejectedExecutionException();
            }
            this.innerSink.accept(coreEvent);
        }

        @Override // org.mule.runtime.core.api.processor.Sink
        public final boolean emit(CoreEvent coreEvent) {
            return checkCapacity(coreEvent) && this.innerSink.emit(coreEvent);
        }

        private boolean checkCapacity(CoreEvent coreEvent) {
            if (ProactorStreamProcessingStrategy.this.lastRetryTimestamp.get() != Long.MIN_VALUE && ProactorStreamProcessingStrategy.this.lastRetryTimestamp.updateAndGet(ProactorStreamProcessingStrategy.this.LAST_RETRY_TIMESTAMP_CHECK_OPERATOR) != Long.MIN_VALUE) {
                return false;
            }
            if (!ProactorStreamProcessingStrategy.this.maxConcurrencyEagerCheck) {
                return true;
            }
            if (ProactorStreamProcessingStrategy.this.inFlightEvents.incrementAndGet() > ProactorStreamProcessingStrategy.this.maxConcurrency) {
                ProactorStreamProcessingStrategy.this.inFlightEvents.decrementAndGet();
                return false;
            }
            ((BaseEventContext) coreEvent.getContext()).onResponse(ProactorStreamProcessingStrategy.this.IN_FLIGHT_DECREMENT_CALLBACK);
            return true;
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy.ReactorSink
        public E intoSink(CoreEvent coreEvent) {
            return this.innerSink.intoSink(coreEvent);
        }

        @Override // org.mule.runtime.api.lifecycle.Disposable
        public final void dispose() {
            this.innerSink.dispose();
        }
    }

    public ProactorStreamProcessingStrategy(Supplier<Scheduler> supplier, int i, int i2, String str, Supplier<Scheduler> supplier2, Supplier<Scheduler> supplier3, Supplier<Scheduler> supplier4, int i3, int i4, boolean z) {
        super(supplier, i, i2, str, supplier2, i3, i4, z);
        this.lastRetryTimestamp = new AtomicLong(Long.MIN_VALUE);
        this.inFlightEvents = new AtomicInteger();
        this.IN_FLIGHT_DECREMENT_CALLBACK = (coreEvent, th) -> {
            this.inFlightEvents.decrementAndGet();
        };
        this.LAST_RETRY_TIMESTAMP_CHECK_OPERATOR = j -> {
            if (System.nanoTime() - j < SCHEDULER_BUSY_RETRY_INTERVAL_NS * 2) {
                return j;
            }
            return Long.MIN_VALUE;
        };
        this.blockingSchedulerSupplier = supplier3;
        this.cpuIntensiveSchedulerSupplier = supplier4;
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Startable
    public void start() throws MuleException {
        super.start();
        this.cpuLightScheduler = new RetrySchedulerWrapper(this.cpuLightScheduler, SCHEDULER_BUSY_RETRY_INTERVAL_MS, () -> {
            this.lastRetryTimestamp.set(System.nanoTime());
        });
        this.blockingScheduler = this.blockingSchedulerSupplier.get();
        this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        super.stop();
        if (this.blockingScheduler != null) {
            this.blockingScheduler.stop();
        }
        if (this.cpuIntensiveScheduler != null) {
            this.cpuIntensiveScheduler.stop();
        }
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
        return (reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.BLOCKING || reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.IO_RW) ? proactor(reactiveProcessor, this.blockingScheduler) : reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE ? proactor(reactiveProcessor, this.cpuIntensiveScheduler) : super.onProcessor(reactiveProcessor);
    }

    private ReactiveProcessor proactor(ReactiveProcessor reactiveProcessor, Scheduler scheduler) {
        return publisher -> {
            return Flux.from(publisher).flatMap(coreEvent -> {
                return (reactiveProcessor.getProcessingType() != ReactiveProcessor.ProcessingType.IO_RW || scheduleIoRwEvent(coreEvent)) ? withRetry(scheduleProcessor(reactiveProcessor, scheduler, coreEvent).subscriberContext(context -> {
                    return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, scheduler);
                }), scheduler) : Flux.just(coreEvent).transform(reactiveProcessor).subscriberContext(context2 -> {
                    return context2.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, getCpuLightScheduler());
                });
            }, Math.max(this.maxConcurrency / (getParallelism() * this.subscribers), 1));
        };
    }

    protected boolean scheduleIoRwEvent(CoreEvent coreEvent) {
        return coreEvent.getMessage().getPayload().getDataType().isStreamType() && coreEvent.getMessage().getPayload().getByteLength().orElse(Long.MAX_VALUE) > ((long) STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD);
    }

    protected abstract Flux<CoreEvent> scheduleProcessor(ReactiveProcessor reactiveProcessor, Scheduler scheduler, CoreEvent coreEvent);

    private Flux<CoreEvent> withRetry(Flux<CoreEvent> flux, Scheduler scheduler) {
        return flux.retryWhen(Retry.onlyIf(retryContext -> {
            boolean isSchedulerBusy = isSchedulerBusy(retryContext.exception());
            if (isSchedulerBusy) {
                LOGGER.trace("Shared scheduler {} is busy. Scheduling of the current event will be retried after {}ms.", scheduler.getName(), Long.valueOf(SCHEDULER_BUSY_RETRY_INTERVAL_MS));
                this.lastRetryTimestamp.set(System.nanoTime());
            }
            return isSchedulerBusy;
        }).backoff(context -> {
            return new BackoffDelay(Duration.ofMillis(SCHEDULER_BUSY_RETRY_INTERVAL_MS));
        }).withBackoffScheduler(Schedulers.fromExecutorService(decorateScheduler(getCpuLightScheduler()))));
    }

    protected Scheduler getBlockingScheduler() {
        return this.blockingScheduler;
    }

    protected Scheduler getCpuIntensiveScheduler() {
        return this.cpuIntensiveScheduler;
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public /* bridge */ /* synthetic */ ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
        return super.onPipeline(reactiveProcessor);
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory.AbstractStreamProcessingStrategy, org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public /* bridge */ /* synthetic */ Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
        return super.createSink(flowConstruct, reactiveProcessor);
    }
}
