package org.mule.runtime.core.internal.processor.strategy;

import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/AbstractReactorStreamProcessingStrategy.class */
abstract class AbstractReactorStreamProcessingStrategy extends AbstractStreamProcessingStrategyFactory.AbstractStreamProcessingStrategy implements Startable, Stoppable {
    private final Supplier<Scheduler> cpuLightSchedulerSupplier;
    private Scheduler cpuLightScheduler;
    private final int parallelism;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractReactorStreamProcessingStrategy(int i, Supplier<Scheduler> supplier, int i2, int i3, boolean z) {
        super(i, i3, z);
        this.cpuLightSchedulerSupplier = supplier;
        this.parallelism = i2;
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
        reactor.core.scheduler.Scheduler fromExecutorService = Schedulers.fromExecutorService(decorateScheduler(getCpuLightScheduler()));
        return reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_LITE_ASYNC ? onNonBlockingProcessorTxAware(publisher -> {
            return Flux.from(publisher).transform(reactiveProcessor).publishOn(fromExecutorService).subscriberContext(context -> {
                return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, getCpuLightScheduler());
            });
        }) : publisher2 -> {
            return Flux.from(publisher2).transform(reactiveProcessor).subscriberContext(context -> {
                return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, getCpuLightScheduler());
            });
        };
    }

    protected ReactiveProcessor onNonBlockingProcessorTxAware(ReactiveProcessor reactiveProcessor) {
        return reactiveProcessor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getParallelism() {
        return this.parallelism;
    }

    @Override // org.mule.runtime.api.lifecycle.Startable
    public void start() throws MuleException {
        this.cpuLightScheduler = createCpuLightScheduler(this.cpuLightSchedulerSupplier);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Scheduler createCpuLightScheduler(Supplier<Scheduler> supplier) {
        return supplier.get();
    }

    @Override // org.mule.runtime.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        if (this.cpuLightScheduler != null) {
            this.cpuLightScheduler.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Scheduler getCpuLightScheduler() {
        return this.cpuLightScheduler;
    }
}
