package org.mule.runtime.core.processor.strategy;

import java.util.function.Function;
import org.mule.AbstractBenchmark;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionAwareProactorStreamEmitterProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionAwareProactorStreamWorkQueueProcessingStrategyFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@BenchmarkMode({Mode.Throughput})
@State(Scope.Benchmark)
/* loaded from: input_file:org/mule/runtime/core/processor/strategy/ProcessingStrategyBenchmark.class */
public class ProcessingStrategyBenchmark extends AbstractBenchmark {
    private MuleContext muleContext;
    private ProcessingStrategy directPs;
    private ProcessingStrategy emitterPs;
    private ProcessingStrategy workQueuePs;
    private Flow flow;
    private Sink directSink;
    private Sink emitterSink;
    private Sink workQueueSink;
    private FluxSink<CoreEvent> directPipeline;
    private FluxSink<CoreEvent> emitterPipeline;
    private FluxSink<CoreEvent> workQueuePipeline;
    private FluxSink<CoreEvent> directProcessor;
    private FluxSink<CoreEvent> emitterProcessor;
    private FluxSink<CoreEvent> workQueueProcessor;
    private Sink directAllSink;
    private Sink emitterAllSink;
    private Sink workQueueAllSink;

    @Setup(Level.Trial)
    public void setUp() throws MuleException {
        this.muleContext = createMuleContextWithServices();
        this.directPs = new DirectProcessingStrategyFactory().create(this.muleContext, "direct_mb");
        LifecycleUtils.startIfNeeded(this.directPs);
        this.emitterPs = new TransactionAwareProactorStreamEmitterProcessingStrategyFactory().create(this.muleContext, "emitter_mb");
        LifecycleUtils.startIfNeeded(this.emitterPs);
        this.workQueuePs = new TransactionAwareProactorStreamWorkQueueProcessingStrategyFactory().create(this.muleContext, "workQueue_mb");
        LifecycleUtils.startIfNeeded(this.workQueuePs);
        this.flow = createFlow(this.muleContext);
        ReactiveProcessor reactiveProcessor = publisher -> {
            return Flux.from(publisher).doOnNext(coreEvent -> {
                Blackhole.consumeCPU(100L);
            });
        };
        this.directSink = this.directPs.createSink(this.flow, publisher2 -> {
            return baseFlux(publisher2, reactiveProcessor);
        });
        this.emitterSink = this.emitterPs.createSink(this.flow, publisher3 -> {
            return baseFlux(publisher3, reactiveProcessor);
        });
        this.workQueueSink = this.workQueuePs.createSink(this.flow, publisher4 -> {
            return baseFlux(publisher4, reactiveProcessor);
        });
        Flux.create(fluxSink -> {
            this.directPipeline = fluxSink;
        }, FluxSink.OverflowStrategy.ERROR).transform(this.directPs.onPipeline(publisher5 -> {
            return baseFlux(publisher5, reactiveProcessor);
        })).subscribe();
        Flux.create(fluxSink2 -> {
            this.emitterPipeline = fluxSink2;
        }, FluxSink.OverflowStrategy.ERROR).transform(this.emitterPs.onPipeline(publisher6 -> {
            return baseFlux(publisher6, reactiveProcessor);
        })).subscribe();
        Flux.create(fluxSink3 -> {
            this.workQueuePipeline = fluxSink3;
        }, FluxSink.OverflowStrategy.ERROR).transform(this.workQueuePs.onPipeline(publisher7 -> {
            return baseFlux(publisher7, reactiveProcessor);
        })).subscribe();
        Flux.create(fluxSink4 -> {
            this.directProcessor = fluxSink4;
        }, FluxSink.OverflowStrategy.ERROR).transform(this.directPs.onProcessor(publisher8 -> {
            return baseFlux(publisher8, reactiveProcessor);
        })).subscribe();
        Flux.create(fluxSink5 -> {
            this.emitterProcessor = fluxSink5;
        }, FluxSink.OverflowStrategy.ERROR).transform(this.emitterPs.onProcessor(publisher9 -> {
            return baseFlux(publisher9, reactiveProcessor);
        })).subscribe();
        Flux.create(fluxSink6 -> {
            this.workQueueProcessor = fluxSink6;
        }, FluxSink.OverflowStrategy.ERROR).transform(this.workQueuePs.onProcessor(publisher10 -> {
            return baseFlux(publisher10, reactiveProcessor);
        })).subscribe();
        this.directAllSink = this.directPs.createSink(this.flow, publisher11 -> {
            return baseFlux(publisher11, this.directPs.onPipeline(this.directPs.onProcessor(reactiveProcessor)));
        });
        this.emitterAllSink = this.emitterPs.createSink(this.flow, publisher12 -> {
            return baseFlux(publisher12, this.emitterPs.onPipeline(this.emitterPs.onProcessor(reactiveProcessor)));
        });
        this.workQueueAllSink = this.workQueuePs.createSink(this.flow, publisher13 -> {
            return baseFlux(publisher13, this.workQueuePs.onPipeline(this.workQueuePs.onProcessor(reactiveProcessor)));
        });
    }

    private Flux<CoreEvent> baseFlux(Publisher<CoreEvent> publisher, Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>> function) {
        return Flux.from(publisher).transform(function).doOnNext(coreEvent -> {
            ((MonoSink) coreEvent.getMessage().getPayload().getValue()).success(coreEvent);
        }).errorStrategyContinue((th, coreEvent2) -> {
            ((MonoSink) coreEvent2.getMessage().getPayload().getValue()).error(th);
        });
    }

    @Benchmark
    @Threads(-1)
    public CoreEvent directSink() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.directSink.accept(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(-1)
    public CoreEvent emitterSink() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.emitterSink.accept(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(-1)
    public CoreEvent workQueueSink() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.workQueueSink.accept(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(1)
    public CoreEvent directPipeline() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.directPipeline.next(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(1)
    public CoreEvent emitterPipeline() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.emitterPipeline.next(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(1)
    public CoreEvent workQueuePipeline() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.workQueuePipeline.next(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(1)
    public CoreEvent directProcessor() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.directProcessor.next(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(1)
    public CoreEvent emitterProcessor() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.emitterProcessor.next(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(1)
    public CoreEvent workQueueProcessor() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.workQueueProcessor.next(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(-1)
    public CoreEvent directAllSink() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.directAllSink.accept(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(-1)
    public CoreEvent emitterAllSink() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.emitterAllSink.accept(createEvent(this.flow, monoSink));
        }).block();
    }

    @Benchmark
    @Threads(-1)
    public CoreEvent workQueueAllSink() {
        return (CoreEvent) Mono.create(monoSink -> {
            this.workQueueAllSink.accept(createEvent(this.flow, monoSink));
        }).block();
    }
}
