package org.mule.runtime.core.internal.processor.strategy;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.function.Supplier;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.context.thread.notification.ThreadLoggingExecutorServiceDecorator;
import org.mule.runtime.core.internal.context.thread.notification.ThreadNotificationLogger;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.ProactorStreamProcessingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamEmitterProcessingStrategyFactory.class */
public class ProactorStreamEmitterProcessingStrategyFactory extends ReactorStreamProcessingStrategyFactory {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamEmitterProcessingStrategyFactory$ProactorStreamEmitterProcessingStrategy.class */
    public static class ProactorStreamEmitterProcessingStrategy extends ProactorStreamProcessingStrategy {
        private static Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProactorStreamEmitterProcessingStrategy.class);
        private final int bufferSize;
        private final boolean isThreadLoggingEnabled;

        public ProactorStreamEmitterProcessingStrategy(int i, int i2, Supplier<Scheduler> supplier, Supplier<Scheduler> supplier2, Supplier<Scheduler> supplier3, int i3, int i4, boolean z, boolean z2) {
            super(i2, supplier, supplier2, supplier3, i3, i4, z);
            this.bufferSize = ((Integer) Objects.requireNonNull(Integer.valueOf(i))).intValue();
            this.isThreadLoggingEnabled = z2;
        }

        public ProactorStreamEmitterProcessingStrategy(int i, int i2, Supplier<Scheduler> supplier, Supplier<Scheduler> supplier2, Supplier<Scheduler> supplier3, int i3, int i4, boolean z) {
            this(i, i2, supplier, supplier2, supplier3, i3, i4, z, false);
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
            long shutdownTimeout = flowConstruct.getMuleContext().getConfiguration().getShutdownTimeout();
            int i = this.maxConcurrency < AbstractStreamProcessingStrategyFactory.CORES ? this.maxConcurrency : AbstractStreamProcessingStrategyFactory.CORES;
            int i2 = this.bufferSize / i;
            ArrayList arrayList = new ArrayList();
            for (int i3 = 0; i3 < i; i3++) {
                Latch latch = new Latch();
                EmitterProcessor create = EmitterProcessor.create(i2);
                create.transform(reactiveProcessor).subscribe((Consumer) null, th -> {
                    latch.release();
                }, () -> {
                    latch.release();
                });
                arrayList.add(new ProactorStreamProcessingStrategy.ProactorSinkWrapper(new AbstractProcessingStrategy.DefaultReactorSink(create.sink(FluxSink.OverflowStrategy.BUFFER), () -> {
                    awaitSubscribersCompletion(flowConstruct, shutdownTimeout, latch, System.currentTimeMillis());
                }, createOnEventConsumer(), i2)));
            }
            return new RoundRobinReactorSink(arrayList);
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
            reactor.core.scheduler.Scheduler fromExecutorService = Schedulers.fromExecutorService(decorateScheduler(getCpuLightScheduler()));
            return publisher -> {
                return Flux.from(publisher).publishOn(fromExecutorService).doOnSubscribe(subscription -> {
                    Thread.currentThread().setContextClassLoader(this.executionClassloader);
                }).transform(reactiveProcessor);
            };
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.ProactorStreamProcessingStrategy
        protected Flux<CoreEvent> scheduleProcessor(ReactiveProcessor reactiveProcessor, Scheduler scheduler, CoreEvent coreEvent) {
            return scheduleWithLogging(reactiveProcessor, scheduler, coreEvent);
        }

        private Flux<CoreEvent> scheduleWithLogging(ReactiveProcessor reactiveProcessor, Scheduler scheduler, CoreEvent coreEvent) {
            return this.isThreadLoggingEnabled ? Flux.just(coreEvent).flatMap(coreEvent2 -> {
                return Mono.subscriberContext().flatMap(context -> {
                    return Mono.just(coreEvent2).transform(reactiveProcessor).subscribeOn(Schedulers.fromExecutorService(new ThreadLoggingExecutorServiceDecorator(context.getOrEmpty(ThreadNotificationLogger.THREAD_NOTIFICATION_LOGGER_CONTEXT_KEY), decorateScheduler(scheduler), coreEvent2.getContext().getId())));
                });
            }) : Flux.just(coreEvent).transform(reactiveProcessor).subscribeOn(Schedulers.fromExecutorService(decorateScheduler(scheduler)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamEmitterProcessingStrategyFactory$RoundRobinReactorSink.class */
    public static class RoundRobinReactorSink<E> implements AbstractProcessingStrategy.ReactorSink<E> {
        private final List<AbstractProcessingStrategy.ReactorSink<E>> fluxSinks;
        private final AtomicInteger index = new AtomicInteger(0);
        private final IntUnaryOperator update = i -> {
            return (i + 1) % this.fluxSinks.size();
        };

        public RoundRobinReactorSink(List<AbstractProcessingStrategy.ReactorSink<E>> list) {
            this.fluxSinks = list;
        }

        @Override // org.mule.runtime.api.lifecycle.Disposable
        public void dispose() {
            this.fluxSinks.stream().forEach(reactorSink -> {
                reactorSink.dispose();
            });
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.mule.runtime.core.api.processor.Sink, java.util.function.Consumer
        public void accept(CoreEvent coreEvent) {
            this.fluxSinks.get(nextIndex()).accept(coreEvent);
        }

        private int nextIndex() {
            return this.index.getAndUpdate(this.update);
        }

        @Override // org.mule.runtime.core.api.processor.Sink
        public boolean emit(CoreEvent coreEvent) {
            return this.fluxSinks.get(nextIndex()).emit(coreEvent);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy.ReactorSink
        public E intoSink(CoreEvent coreEvent) {
            return coreEvent;
        }
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory, org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public ProcessingStrategy create(MuleContext muleContext, String str) {
        return new ProactorStreamEmitterProcessingStrategy(getBufferSize(), getSubscriberCount(), getCpuLightSchedulerSupplier(muleContext, str), () -> {
            return muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(str + "." + ReactiveProcessor.ProcessingType.BLOCKING.name()));
        }, () -> {
            return muleContext.getSchedulerService().cpuIntensiveScheduler(muleContext.getSchedulerBaseConfig().withName(str + "." + ReactiveProcessor.ProcessingType.CPU_INTENSIVE.name()));
        }, resolveParallelism(), getMaxConcurrency(), isMaxConcurrencyEagerCheck(), muleContext.getConfiguration().isThreadLoggingEnabled());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory
    public int resolveParallelism() {
        return Integer.max(CORES, getMaxConcurrency());
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory, org.mule.runtime.core.internal.processor.strategy.AbstractStreamWorkQueueProcessingStrategyFactory, org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory, org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return ProactorStreamEmitterProcessingStrategy.class;
    }
}
