package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.retry.BackoffDelay;
import reactor.retry.Retry;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategyFactory.class */
public class ProactorStreamProcessingStrategyFactory extends ReactorStreamProcessingStrategyFactory {
    protected static final long STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD = Long.getLong(SYSTEM_PROPERTY_PREFIX + "STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD", DataUnit.KB.toBytes(16)).longValue();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategyFactory$ProactorStreamProcessingStrategy.class */
    public static class ProactorStreamProcessingStrategy extends ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy {
        private static Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProactorStreamProcessingStrategy.class);
        private static int SCHEDULER_BUSY_RETRY_INTERVAL_MS = 2;
        private Supplier<Scheduler> blockingSchedulerSupplier;
        private Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
        private Scheduler blockingScheduler;
        private Scheduler cpuIntensiveScheduler;

        public ProactorStreamProcessingStrategy(Supplier<Scheduler> supplier, int i, int i2, String str, Supplier<Scheduler> supplier2, Supplier<Scheduler> supplier3, Supplier<Scheduler> supplier4, int i3, int i4) {
            super(supplier, i, i2, str, supplier2, i3, i4);
            this.blockingSchedulerSupplier = supplier3;
            this.cpuIntensiveSchedulerSupplier = supplier4;
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Startable
        public void start() throws MuleException {
            super.start();
            this.blockingScheduler = this.blockingSchedulerSupplier.get();
            this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Stoppable
        public void stop() throws MuleException {
            if (this.blockingScheduler != null) {
                this.blockingScheduler.stop();
            }
            if (this.cpuIntensiveScheduler != null) {
                this.cpuIntensiveScheduler.stop();
            }
            super.stop();
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
            return (reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.BLOCKING || reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.IO_RW) ? proactor(reactiveProcessor, this.blockingScheduler) : reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE ? proactor(reactiveProcessor, this.cpuIntensiveScheduler) : super.onProcessor(reactiveProcessor);
        }

        private ReactiveProcessor proactor(ReactiveProcessor reactiveProcessor, Scheduler scheduler) {
            reactor.core.scheduler.Scheduler fromExecutorService = Schedulers.fromExecutorService(decorateScheduler(getCpuLightScheduler()));
            return publisher -> {
                return Flux.from(publisher).flatMap(coreEvent -> {
                    return (reactiveProcessor.getProcessingType() != ReactiveProcessor.ProcessingType.IO_RW || scheduleIoRwEvent(coreEvent)) ? scheduleProcessor(reactiveProcessor, fromExecutorService, scheduler, coreEvent) : Flux.just(coreEvent).transform(reactiveProcessor).subscriberContext(context -> {
                        return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, getCpuLightScheduler());
                    });
                }, Math.max(this.maxConcurrency / (getParallelism() * this.subscribers), 1));
            };
        }

        private boolean scheduleIoRwEvent(CoreEvent coreEvent) {
            return coreEvent.getMessage().getPayload().getDataType().isStreamType() && coreEvent.getMessage().getPayload().getByteLength().orElse(Long.MAX_VALUE) > ProactorStreamProcessingStrategyFactory.STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD;
        }

        private Publisher<CoreEvent> scheduleProcessor(ReactiveProcessor reactiveProcessor, reactor.core.scheduler.Scheduler scheduler, Scheduler scheduler2, CoreEvent coreEvent) {
            return Flux.just(coreEvent).transform(reactiveProcessor).publishOn(scheduler).subscribeOn(Schedulers.fromExecutorService(decorateScheduler(scheduler2))).subscriberContext(context -> {
                return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, scheduler2);
            }).doOnError(RejectedExecutionException.class, rejectedExecutionException -> {
                LOGGER.trace("Shared scheduler {} is busy. Scheduling of the current event will be retried after {}ms.", scheduler2.getName(), Integer.valueOf(SCHEDULER_BUSY_RETRY_INTERVAL_MS));
            }).retryWhen(Retry.onlyIf(retryContext -> {
                return RejectedExecutionException.class.isAssignableFrom(Exceptions.unwrap(retryContext.exception()).getClass());
            }).backoff(context2 -> {
                return new BackoffDelay(Duration.ofMillis(SCHEDULER_BUSY_RETRY_INTERVAL_MS));
            }).withBackoffScheduler(Schedulers.fromExecutorService(getCpuLightScheduler())));
        }
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory, org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public ProcessingStrategy create(MuleContext muleContext, String str) {
        return new ProactorStreamProcessingStrategy(getRingBufferSchedulerSupplier(muleContext, str), getBufferSize(), getSubscriberCount(), getWaitStrategy(), getCpuLightSchedulerSupplier(muleContext, str), () -> {
            return muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(str + "." + ReactiveProcessor.ProcessingType.BLOCKING.name()));
        }, () -> {
            return muleContext.getSchedulerService().cpuIntensiveScheduler(muleContext.getSchedulerBaseConfig().withName(str + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name()));
        }, resolveParallelism(), getMaxConcurrency());
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory
    protected int resolveParallelism() {
        return getMaxConcurrency() == Integer.MAX_VALUE ? Math.max(CORES / getSubscriberCount(), 1) : Math.min(CORES, maxFactor(Float.max(getMaxConcurrency() / getSubscriberCount(), 1.0f)));
    }

    private int maxFactor(float f) {
        if (f % 0.0f != 0.0f) {
            return 1;
        }
        for (int i = CORES; i > 1; i--) {
            if (f % i == 0.0f) {
                return i;
            }
        }
        return 1;
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory, org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory, org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return ProactorStreamProcessingStrategy.class;
    }
}
