package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.context.thread.notification.ThreadLoggingExecutorServiceDecorator;
import org.mule.runtime.core.internal.context.thread.notification.ThreadNotificationLogger;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.WorkQueueProcessor;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/WorkQueueStreamProcessingStrategyFactory.class */
public class WorkQueueStreamProcessingStrategyFactory extends AbstractStreamWorkQueueProcessingStrategyFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) WorkQueueStreamProcessingStrategyFactory.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/WorkQueueStreamProcessingStrategyFactory$EventWrapper.class */
    public static final class EventWrapper {
        CoreEvent wrappedEvent;

        public EventWrapper(CoreEvent coreEvent) {
            this.wrappedEvent = coreEvent;
            ((BaseEventContext) this.wrappedEvent.getContext()).getRootContext().onTerminated((coreEvent2, th) -> {
                this.wrappedEvent = null;
            });
        }

        public CoreEvent getWrappedEvent() {
            return this.wrappedEvent;
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/WorkQueueStreamProcessingStrategyFactory$WaitStrategy.class */
    protected enum WaitStrategy {
        BLOCKING(reactor.util.concurrent.WaitStrategy.blocking()),
        LITE_BLOCKING(reactor.util.concurrent.WaitStrategy.liteBlocking()),
        SLEEPING(reactor.util.concurrent.WaitStrategy.sleeping()),
        BUSY_SPIN(reactor.util.concurrent.WaitStrategy.busySpin()),
        YIELDING(reactor.util.concurrent.WaitStrategy.yielding()),
        PARKING(reactor.util.concurrent.WaitStrategy.parking()),
        PHASED(reactor.util.concurrent.WaitStrategy.phasedOffLiteLock(200, 100, TimeUnit.MILLISECONDS));

        private final reactor.util.concurrent.WaitStrategy reactorWaitStrategy;

        WaitStrategy(reactor.util.concurrent.WaitStrategy waitStrategy) {
            this.reactorWaitStrategy = waitStrategy;
        }

        reactor.util.concurrent.WaitStrategy getReactorWaitStrategy() {
            return this.reactorWaitStrategy;
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/WorkQueueStreamProcessingStrategyFactory$WorkQueueStreamProcessingStrategy.class */
    static class WorkQueueStreamProcessingStrategy extends AbstractStreamProcessingStrategyFactory.AbstractStreamProcessingStrategy implements Startable, Stoppable {
        private final int bufferSize;
        private final Supplier<Scheduler> ringBufferSchedulerSupplier;
        private final Supplier<Scheduler> blockingSchedulerSupplier;
        private final WaitStrategy waitStrategy;
        private Scheduler blockingScheduler;
        private final List<Sink> sinkList;
        private final boolean isThreadLoggingEnabled;

        /* JADX INFO: Access modifiers changed from: protected */
        public WorkQueueStreamProcessingStrategy(Supplier<Scheduler> supplier, int i, int i2, String str, Supplier<Scheduler> supplier2, int i3, boolean z, boolean z2) {
            super(i2, i3, z);
            this.sinkList = new ArrayList();
            this.bufferSize = ((Integer) Objects.requireNonNull(Integer.valueOf(i))).intValue();
            this.ringBufferSchedulerSupplier = (Supplier) Objects.requireNonNull(supplier);
            this.blockingSchedulerSupplier = (Supplier) Objects.requireNonNull(supplier2);
            this.waitStrategy = WaitStrategy.valueOf(str);
            this.isThreadLoggingEnabled = z2;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public WorkQueueStreamProcessingStrategy(Supplier<Scheduler> supplier, int i, int i2, String str, Supplier<Scheduler> supplier2, int i3, boolean z) {
            this(supplier, i, i2, str, supplier2, i3, z, false);
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
            long shutdownTimeout = flowConstruct.getMuleContext().getConfiguration().getShutdownTimeout();
            WorkQueueProcessor build = WorkQueueProcessor.builder().executor(this.ringBufferSchedulerSupplier.get()).bufferSize(this.bufferSize).waitStrategy(this.waitStrategy.getReactorWaitStrategy()).build();
            int i = this.maxConcurrency < this.subscribers ? this.maxConcurrency : this.subscribers;
            CountDownLatch countDownLatch = new CountDownLatch(i);
            for (int i2 = 0; i2 < i; i2++) {
                Flux transform = build.doOnSubscribe(subscription -> {
                    Thread.currentThread().setContextClassLoader(this.executionClassloader);
                }).map((v0) -> {
                    return v0.getWrappedEvent();
                }).transform(reactiveProcessor);
                Consumer consumer = th -> {
                    countDownLatch.countDown();
                };
                countDownLatch.getClass();
                transform.subscribe((Consumer) null, consumer, countDownLatch::countDown);
            }
            if (!build.hasDownstreams()) {
                build.forceShutdown();
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("No subscriptions active for processor."));
            }
            if (build.downstreamCount() >= i) {
                return buildSink(build.sink(FluxSink.OverflowStrategy.BUFFER), () -> {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (!build.awaitAndShutdown(Duration.ofMillis(shutdownTimeout))) {
                        WorkQueueStreamProcessingStrategyFactory.LOGGER.warn("WorkQueueProcessor of ProcessingStrategy for flow '{}' not shutDown in {} ms. Forcing shutdown...", flowConstruct.getName(), Long.valueOf(shutdownTimeout));
                        build.forceShutdown();
                    }
                    awaitSubscribersCompletion(flowConstruct, shutdownTimeout, countDownLatch, currentTimeMillis);
                }, createOnEventConsumer(), this.bufferSize);
            }
            build.forceShutdown();
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("Not enough subscriptions active for processor."));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public <E> AbstractProcessingStrategy.ReactorSink<E> buildSink(FluxSink<E> fluxSink, Disposable disposable, Consumer<CoreEvent> consumer, int i) {
            return new AbstractProcessingStrategy.DefaultReactorSink(fluxSink, disposable, consumer, i) { // from class: org.mule.runtime.core.internal.processor.strategy.WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy.1
                @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy.DefaultReactorSink, org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy.ReactorSink
                public EventWrapper intoSink(CoreEvent coreEvent) {
                    return new EventWrapper(coreEvent);
                }
            };
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
            return this.maxConcurrency > this.subscribers ? this.isThreadLoggingEnabled ? publisher -> {
                return Flux.from(publisher).flatMap(coreEvent -> {
                    return Mono.subscriberContext().flatMap(context -> {
                        return Mono.just(coreEvent).transform(reactiveProcessor).subscribeOn(Schedulers.fromExecutorService(new ThreadLoggingExecutorServiceDecorator(context.getOrEmpty(ThreadNotificationLogger.THREAD_NOTIFICATION_LOGGER_CONTEXT_KEY), decorateScheduler(this.blockingScheduler), coreEvent.getContext().getId())));
                    });
                });
            } : publisher2 -> {
                return Flux.from(publisher2).flatMap(coreEvent -> {
                    return Flux.just(coreEvent).transform(reactiveProcessor).subscribeOn(Schedulers.fromExecutorService(decorateScheduler(this.blockingScheduler))).subscriberContext(context -> {
                        return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, this.blockingScheduler);
                    });
                }, this.maxConcurrency);
            } : super.onPipeline(reactiveProcessor);
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
            return reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_LITE_ASYNC ? publisher -> {
                return Flux.from(publisher).transform(reactiveProcessor).publishOn(Schedulers.fromExecutorService(decorateScheduler(this.blockingScheduler)));
            } : super.onProcessor(reactiveProcessor);
        }

        @Override // org.mule.runtime.api.lifecycle.Startable
        public void start() throws MuleException {
            this.blockingScheduler = this.blockingSchedulerSupplier.get();
        }

        @Override // org.mule.runtime.api.lifecycle.Stoppable
        public void stop() throws MuleException {
            this.sinkList.stream().filter(sink -> {
                return sink instanceof org.mule.runtime.api.lifecycle.Disposable;
            }).forEach(sink2 -> {
                ((org.mule.runtime.api.lifecycle.Disposable) sink2).dispose();
            });
            if (this.blockingScheduler != null) {
                this.blockingScheduler.stop();
            }
        }
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public ProcessingStrategy create(MuleContext muleContext, String str) {
        return new WorkQueueStreamProcessingStrategy(getRingBufferSchedulerSupplier(muleContext, str), getBufferSize(), getSubscriberCount(), getWaitStrategy(), () -> {
            return muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(str + "." + ReactiveProcessor.ProcessingType.BLOCKING.name()));
        }, getMaxConcurrency(), isMaxConcurrencyEagerCheck(), muleContext.getConfiguration().isThreadLoggingEnabled());
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractStreamWorkQueueProcessingStrategyFactory, org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory, org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return WorkQueueStreamProcessingStrategy.class;
    }
}
