package org.mule.runtime.core.internal.processor.strategy;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/TransactionAwareStreamEmitterProcessingStrategyDecorator.class */
public class TransactionAwareStreamEmitterProcessingStrategyDecorator extends ProcessingStrategyDecorator {
    private static final String TX_SCOPES_KEY = "mule.tx.activeTransactionsInReactorChain";
    private static final Consumer<CoreEvent> NULL_EVENT_CONSUMER = coreEvent -> {
    };

    public TransactionAwareStreamEmitterProcessingStrategyDecorator(ProcessingStrategy processingStrategy) {
        super(processingStrategy);
        if (processingStrategy instanceof ProcessingStrategyAdapter) {
            ProcessingStrategyAdapter processingStrategyAdapter = (ProcessingStrategyAdapter) processingStrategy;
            processingStrategyAdapter.setOnEventConsumer(NULL_EVENT_CONSUMER);
            Function<ScheduledExecutorService, ScheduledExecutorService> schedulerDecorator = processingStrategyAdapter.getSchedulerDecorator();
            processingStrategyAdapter.setSchedulerDecorator(scheduledExecutorService -> {
                return new ConditionalExecutorServiceDecorator((ScheduledExecutorService) schedulerDecorator.apply(scheduledExecutorService), scheduledExecutorService -> {
                    return TransactionCoordination.isTransactionActive();
                });
            });
        }
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
        return new TransactionalDelegateSink(new StreamPerThreadSink(publisher -> {
            return Flux.from(publisher).subscriberContext(popTxFromSubscriberContext()).transform(reactiveProcessor).subscriberContext(pushTxToSubscriberContext("source"));
        }, NULL_EVENT_CONSUMER, flowConstruct), this.delegate.createSink(flowConstruct, reactiveProcessor));
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
        return publisher -> {
            return Mono.subscriberContext().flatMapMany(context -> {
                return isTxActive(context).booleanValue() ? Flux.from(publisher).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onPipeline(reactiveProcessor)) : Flux.from(publisher).transform(this.delegate.onPipeline(reactiveProcessor));
            });
        };
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
        return publisher -> {
            return Mono.subscriberContext().flatMapMany(context -> {
                return isTxActive(context).booleanValue() ? Flux.from(publisher).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onProcessor(reactiveProcessor)) : Flux.from(publisher).transform(this.delegate.onProcessor(reactiveProcessor));
            });
        };
    }

    private Boolean isTxActive(Context context) {
        return (Boolean) context.getOrEmpty(TX_SCOPES_KEY).map(deque -> {
            return Boolean.valueOf(deque.size() > 0);
        }).orElse(false);
    }

    public static Function<Context, Context> popTxFromSubscriberContext() {
        return context -> {
            ArrayDeque arrayDeque = new ArrayDeque((Collection) context.getOrDefault(TX_SCOPES_KEY, Collections.emptyList()));
            arrayDeque.pop();
            return context.put(TX_SCOPES_KEY, arrayDeque);
        };
    }

    public static Function<Context, Context> pushTxToSubscriberContext(String str) {
        return context -> {
            ArrayDeque arrayDeque = new ArrayDeque((Collection) context.getOrDefault(TX_SCOPES_KEY, Collections.emptyList()));
            arrayDeque.push(str);
            return context.put(TX_SCOPES_KEY, arrayDeque);
        };
    }
}
