package org.mule;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.exception.NullExceptionHandler;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.privileged.processor.chain.DefaultMessageProcessorChainBuilder;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.Warmup;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;

@Warmup(iterations = 10)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Measurement(iterations = 10)
/* loaded from: input_file:org/mule/ProcessorChainBenchmark.class */
public class ProcessorChainBenchmark extends AbstractBenchmark {
    private static final int NUM_PROCESSORS = 20;
    private static final int STREAM_SIZE = 1000;
    private MessageProcessorChain chain;
    private CoreEvent event;

    @Setup
    public void setup() throws Exception {
        DefaultMessageProcessorChainBuilder defaultMessageProcessorChainBuilder = new DefaultMessageProcessorChainBuilder();
        for (int i = 0; i < NUM_PROCESSORS; i++) {
            defaultMessageProcessorChainBuilder.chain(new Processor[]{coreEvent -> {
                return coreEvent;
            }});
        }
        this.chain = defaultMessageProcessorChainBuilder.build();
        this.chain.setMuleContext(createMuleContextWithServices());
        this.event = CoreEvent.builder(EventContextFactory.create("", "", CONNECTOR_LOCATION, NullExceptionHandler.getInstance())).message(Message.of(PAYLOAD)).build();
    }

    @Benchmark
    public CoreEvent blocking() throws MuleException {
        return this.chain.process(this.event);
    }

    @Benchmark
    public CountDownLatch stream() throws MuleException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(STREAM_SIZE);
        Reference reference = new Reference();
        reference.getClass();
        FluxProcessor.create((v1) -> {
            r0.set(v1);
        }).transform(this.chain).doOnNext(coreEvent -> {
            countDownLatch.countDown();
        }).subscribe();
        for (int i = 0; i < STREAM_SIZE; i++) {
            ((FluxSink) reference.get()).next(this.event);
        }
        ((FluxSink) reference.get()).complete();
        countDownLatch.await();
        return countDownLatch;
    }
}
