package org.mule.runtime.core.internal.processor.strategy;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.profiling.ProfilingDataProducer;
import org.mule.runtime.api.profiling.ProfilingService;
import org.mule.runtime.api.profiling.type.ProfilingEventType;
import org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes;
import org.mule.runtime.api.profiling.type.context.ProcessingStrategyProfilingEventContext;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.reactor.builder.ReactorPublisherBuilder;
import org.mule.runtime.core.internal.processor.strategy.util.ProfilingUtils;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/TransactionAwareStreamEmitterProcessingStrategyDecorator.class */
public class TransactionAwareStreamEmitterProcessingStrategyDecorator extends ProcessingStrategyDecorator {
    private static final String TX_SCOPES_KEY = "mule.tx.activeTransactionsInReactorChain";
    private static final Consumer<CoreEvent> NULL_EVENT_CONSUMER = coreEvent -> {
    };

    @Inject
    private ProfilingService profilingService;

    @Inject
    private FeatureFlaggingService featureFlags;

    @Inject
    private MuleContext muleContext;

    public TransactionAwareStreamEmitterProcessingStrategyDecorator(ProcessingStrategy processingStrategy) {
        super(processingStrategy);
        if (processingStrategy instanceof ProcessingStrategyAdapter) {
            ProcessingStrategyAdapter processingStrategyAdapter = (ProcessingStrategyAdapter) processingStrategy;
            processingStrategyAdapter.setOnEventConsumer(NULL_EVENT_CONSUMER);
            Function<ScheduledExecutorService, ScheduledExecutorService> schedulerDecorator = processingStrategyAdapter.getSchedulerDecorator();
            processingStrategyAdapter.setSchedulerDecorator(scheduledExecutorService -> {
                return new ConditionalExecutorServiceDecorator((ScheduledExecutorService) schedulerDecorator.apply(scheduledExecutorService), scheduledExecutorService -> {
                    return TransactionCoordination.isTransactionActive();
                });
            });
        }
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
        return new TransactionalDelegateSink(new StreamPerThreadSink(publisher -> {
            return Flux.from(publisher).subscriberContext(popTxFromSubscriberContext()).transform(reactiveProcessor).subscriberContext(pushTxToSubscriberContext("source"));
        }, NULL_EVENT_CONSUMER, flowConstruct), this.delegate.createSink(flowConstruct, reactiveProcessor));
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
        ComponentLocation location = ProfilingUtils.getLocation(reactiveProcessor);
        String artifactId = ProfilingUtils.getArtifactId(this.muleContext);
        String artifactType = ProfilingUtils.getArtifactType(this.muleContext);
        return publisher -> {
            return Mono.subscriberContext().flatMapMany(context -> {
                return isTxActive(context) ? ReactorPublisherBuilder.buildFlux(publisher).profileEvent(location, getDataProducer(RuntimeProfilingEventTypes.PS_SCHEDULING_FLOW_EXECUTION), artifactId, artifactType).profileEvent(location, getDataProducer(RuntimeProfilingEventTypes.STARTING_FLOW_EXECUTION), artifactId, artifactType).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onPipeline(reactiveProcessor)).profileEvent(location, getDataProducer(RuntimeProfilingEventTypes.FLOW_EXECUTED), artifactId, artifactType).build() : Flux.from(publisher).transform(this.delegate.onPipeline(reactiveProcessor));
            });
        };
    }

    private Optional<ProfilingDataProducer<ProcessingStrategyProfilingEventContext>> getDataProducer(ProfilingEventType<ProcessingStrategyProfilingEventContext> profilingEventType) {
        return this.featureFlags.isEnabled(MuleRuntimeFeature.ENABLE_PROFILING_SERVICE) ? Optional.of(this.profilingService.getProfilingDataProducer(profilingEventType)) : Optional.empty();
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
        ComponentLocation location = ProfilingUtils.getLocation(reactiveProcessor);
        String id = this.muleContext.getConfiguration().getId();
        String asString = this.muleContext.getArtifactType().getAsString();
        return publisher -> {
            return Mono.subscriberContext().flatMapMany(context -> {
                return isTxActive(context) ? ReactorPublisherBuilder.buildFlux(publisher).profileEvent(location, getDataProducer(RuntimeProfilingEventTypes.PS_SCHEDULING_OPERATION_EXECUTION), id, asString).profileEvent(location, getDataProducer(RuntimeProfilingEventTypes.STARTING_OPERATION_EXECUTION), id, asString).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onProcessor(reactiveProcessor)).profileEvent(location, getDataProducer(RuntimeProfilingEventTypes.OPERATION_EXECUTED), id, asString).profileEvent(location, getDataProducer(RuntimeProfilingEventTypes.PS_FLOW_MESSAGE_PASSING), id, asString).build() : Flux.from(publisher).transform(this.delegate.onProcessor(reactiveProcessor));
            });
        };
    }

    private boolean isTxActive(Context context) {
        return ((Boolean) context.getOrEmpty(TX_SCOPES_KEY).map(deque -> {
            return Boolean.valueOf(!deque.isEmpty());
        }).orElse(false)).booleanValue();
    }

    public static Function<Context, Context> popTxFromSubscriberContext() {
        return context -> {
            ArrayDeque arrayDeque = new ArrayDeque((Collection) context.getOrDefault(TX_SCOPES_KEY, Collections.emptyList()));
            arrayDeque.pop();
            return context.put(TX_SCOPES_KEY, arrayDeque);
        };
    }

    public static Function<Context, Context> pushTxToSubscriberContext(String str) {
        return context -> {
            ArrayDeque arrayDeque = new ArrayDeque((Collection) context.getOrDefault(TX_SCOPES_KEY, Collections.emptyList()));
            arrayDeque.push(str);
            return context.put(TX_SCOPES_KEY, arrayDeque);
        };
    }
}
