package org.mule.runtime.core.internal.util.rx;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.api.util.func.Once;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:org/mule/runtime/core/internal/util/rx/RxUtils.class */
public class RxUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RxUtils.class);
    public static final String KEY_ON_NEXT_ERROR_STRATEGY = "reactor.onNextError.localStrategy";
    public static final String ON_NEXT_FAILURE_STRATEGY = "reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy";
    public static final String REACTOR_RECREATE_ROUTER = "recreateRouter";

    public static <T, U> Flux<T> subscribeFluxOnPublisherSubscription(Flux<T> flux, Flux<U> flux2) {
        return subscribeFluxOnPublisherSubscription(flux, flux2, null, null, null);
    }

    public static <T, U> Flux<T> subscribeFluxOnPublisherSubscription(Flux<T> flux, Flux<U> flux2, @Nullable Consumer<? super U> consumer, @Nullable Consumer<? super Throwable> consumer2, @Nullable Runnable runnable) {
        return flux.transformDeferredContextual((flux3, contextView) -> {
            return flux3.doOnSubscribe(subscription -> {
                flux2.contextWrite(contextView).subscribe(consumer, consumer2, runnable);
            });
        });
    }

    public static <T, U> Publisher<T> propagateCompletion(Publisher<U> publisher, Publisher<T> publisher2, Function<Publisher<U>, Publisher<T>> function, CheckedRunnable checkedRunnable, CheckedConsumer<Throwable> checkedConsumer) {
        Objects.requireNonNull(publisher, "'upstream' must not be null");
        Objects.requireNonNull(publisher2, "'downstream' must not be null");
        Objects.requireNonNull(function, "'transformer' must not be null");
        Objects.requireNonNull(checkedRunnable, "'completionCallback' must not be null");
        Objects.requireNonNull(checkedConsumer, "'errorCallback' must not be null");
        return doPropagateCompletion(publisher, publisher2, function, new AtomicInteger(0), Once.of(checkedRunnable), Once.of(checkedConsumer), () -> {
            return null;
        });
    }

    public static <T, U> Publisher<T> propagateCompletion(Publisher<U> publisher, Publisher<T> publisher2, Function<Publisher<U>, Publisher<T>> function, AtomicInteger atomicInteger, CheckedRunnable checkedRunnable, CheckedConsumer<Throwable> checkedConsumer) {
        Objects.requireNonNull(publisher, "'upstream' must not be null");
        Objects.requireNonNull(publisher2, "'downstream' must not be null");
        Objects.requireNonNull(function, "'transformer' must not be null");
        Objects.requireNonNull(checkedRunnable, "'completionCallback' must not be null");
        Objects.requireNonNull(checkedConsumer, "'errorCallback' must not be null");
        return doPropagateCompletion(publisher, publisher2, function, atomicInteger, Once.of(checkedRunnable), Once.of(checkedConsumer), () -> {
            return null;
        });
    }

    public static <T, U> Publisher<T> propagateCompletion(Publisher<U> publisher, Publisher<T> publisher2, Function<Publisher<U>, Publisher<T>> function, CheckedRunnable checkedRunnable, CheckedConsumer<Throwable> checkedConsumer, long j, ScheduledExecutorService scheduledExecutorService, String str) {
        Objects.requireNonNull(publisher, "'upstream' must not be null");
        Objects.requireNonNull(publisher2, "'downstream' must not be null");
        Objects.requireNonNull(function, "'transformer' must not be null");
        Objects.requireNonNull(checkedRunnable, "'completionCallback' must not be null");
        Objects.requireNonNull(checkedConsumer, "'errorCallback' must not be null");
        Objects.requireNonNull(scheduledExecutorService, "'delayedExecutor' must not be null");
        Once.RunOnce of = Once.of(checkedRunnable);
        return doPropagateCompletion(publisher, publisher2, function, new AtomicInteger(0), of, Once.of(checkedConsumer), () -> {
            return scheduledExecutorService.schedule(() -> {
                LOGGER.debug("Propagating completion after {} milliseconds\nDSL Source:\n{}", Long.valueOf(j), str);
                of.runOnce();
            }, j, TimeUnit.MILLISECONDS);
        });
    }

    private static <T, U> Publisher<T> doPropagateCompletion(Publisher<U> publisher, Publisher<T> publisher2, Function<Publisher<U>, Publisher<T>> function, AtomicInteger atomicInteger, Once.RunOnce runOnce, Once.ConsumeOnce<Throwable> consumeOnce, Supplier<ScheduledFuture<?>> supplier) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        return subscribeFluxOnPublisherSubscription(Flux.from(publisher2).doOnNext(obj -> {
            if (atomicInteger.decrementAndGet() == 0 && atomicBoolean.get()) {
                runOnce.runOnce();
                ScheduledFuture scheduledFuture = (ScheduledFuture) atomicReference2.get();
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                }
            }
        }), Flux.from(publisher).doOnNext(obj2 -> {
            atomicInteger.incrementAndGet();
        }).transform(function), null, th -> {
            atomicReference.set(th);
            consumeOnce.consumeOnce(th);
        }, () -> {
            atomicBoolean.set(true);
            if (atomicInteger.get() == 0) {
                runOnce.runOnce();
            } else {
                atomicReference2.set((ScheduledFuture) supplier.get());
            }
        });
    }

    public static Publisher<CoreEvent> transform(Publisher<CoreEvent> publisher, ReactiveProcessor reactiveProcessor) {
        return Flux.from(publisher).transform(reactiveProcessor);
    }

    public static Publisher<CoreEvent> map(Publisher<CoreEvent> publisher, Function<CoreEvent, CoreEvent> function) {
        return Flux.from(publisher).map(function);
    }

    public static Publisher<CoreEvent> flatMap(Publisher<CoreEvent> publisher, Function<CoreEvent, Publisher<CoreEvent>> function, Component component) {
        return Flux.from(publisher).flatMap(coreEvent -> {
            return Mono.from((Publisher) function.apply(coreEvent)).onErrorMap(th -> {
                return !(th instanceof MessagingException);
            }, th2 -> {
                return new MessagingException(coreEvent, th2, component);
            });
        });
    }

    public static Publisher<CoreEvent> justPublishOn(CoreEvent coreEvent, ExecutorService executorService) {
        return Flux.just(coreEvent).publishOn(Schedulers.fromExecutorService(executorService));
    }

    public static <T> Supplier<FluxSink<T>> createFluxSupplier(Function<Flux<T>, Flux<?>> function) {
        return () -> {
            FluxSinkRecorder fluxSinkRecorder = new FluxSinkRecorder();
            Flux flux = (Flux) function.apply(TransactionCoordination.isTransactionActive() ? fluxSinkRecorder.flux().subscriberContext(ReactorTransactionUtils.popTxFromSubscriberContext()) : fluxSinkRecorder.flux());
            if (TransactionCoordination.isTransactionActive()) {
                flux = flux.subscriberContext(ReactorTransactionUtils.pushTxToSubscriberContext(function.toString()));
            }
            flux.subscribe((Consumer) null, th -> {
                LOGGER.error("Exception reached subscriber for " + function.toString(), th);
            });
            return fluxSinkRecorder.getFluxSink();
        };
    }

    public static <T> FluxSinkSupplier<T> createRoundRobinFluxSupplier(Function<Flux<T>, Flux<?>> function, int i) {
        Supplier createFluxSupplier = createFluxSupplier(function);
        return new TransactionAwareFluxSinkSupplier(createFluxSupplier, new RoundRobinFluxSinkSupplier(i, createFluxSupplier));
    }

    public static <E extends Throwable> Function<? super Either<E, CoreEvent>, ? extends CoreEvent> propagateErrorResponseMapper() {
        return either -> {
            return (CoreEvent) either.reduce(th -> {
                throw Exceptions.propagateWrappingFatal(th);
            }, UnaryOperator.identity());
        };
    }
}
