package org.mule.runtime.core.internal.util.rx;

import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/util/rx/RxUtils.class */
public class RxUtils {
    public static Flux<CoreEvent> subscribeFluxOnPublisherSubscription(Flux<CoreEvent> flux, Flux<CoreEvent> flux2) {
        return flux.compose(flux3 -> {
            return Mono.subscriberContext().flatMapMany(context -> {
                return flux3.doOnSubscribe(subscription -> {
                    flux2.subscriberContext(context).subscribe();
                });
            });
        });
    }

    public static Publisher<CoreEvent> transform(Publisher<CoreEvent> publisher, ReactiveProcessor reactiveProcessor) {
        return Flux.from(publisher).transform(reactiveProcessor);
    }

    public static Publisher<CoreEvent> map(Publisher<CoreEvent> publisher, Function<CoreEvent, CoreEvent> function) {
        return Flux.from(publisher).map(function);
    }

    public static Publisher<CoreEvent> flatMap(Publisher<CoreEvent> publisher, Function<CoreEvent, Publisher<CoreEvent>> function, Component component) {
        return Flux.from(publisher).flatMap(coreEvent -> {
            return Mono.from((Publisher) function.apply(coreEvent)).onErrorMap(th -> {
                return !(th instanceof MessagingException);
            }, th2 -> {
                return new MessagingException(coreEvent, th2, component);
            });
        });
    }

    public static Publisher<CoreEvent> justPublishOn(CoreEvent coreEvent, ExecutorService executorService) {
        return Flux.just(coreEvent).publishOn(Schedulers.fromExecutorService(executorService));
    }

    public static <T, TKey> Publisher<T> applyWaitingInflightEvents(Publisher<T> publisher, Publisher<Either<MessagingException, T>> publisher2, Function<Publisher<T>, Publisher<T>> function, Function<Publisher<T>, Publisher<Either<MessagingException, T>>> function2, CheckedRunnable checkedRunnable, Function<Publisher<Either<MessagingException, T>>, Publisher<T>> function3, Function<T, TKey> function4) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Set newSetFromMap = Collections.newSetFromMap(new WeakHashMap());
        return Flux.from(publisher).doOnNext(obj -> {
            atomicInteger.incrementAndGet();
        }).transform(function).compose(function2).doOnComplete(() -> {
            if (atomicInteger.get() == 0) {
                checkedRunnable.run();
            } else {
                atomicBoolean.set(true);
            }
        }).mergeWith(publisher2).doOnNext(either -> {
            if (either.isLeft()) {
                completeErrorSinkIfNeeded(checkedRunnable, atomicInteger, atomicBoolean);
            }
        }).compose(function3).distinct(function4, () -> {
            return newSetFromMap;
        }).doOnNext(obj2 -> {
            completeErrorSinkIfNeeded(checkedRunnable, atomicInteger, atomicBoolean);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void completeErrorSinkIfNeeded(CheckedRunnable checkedRunnable, AtomicInteger atomicInteger, AtomicBoolean atomicBoolean) {
        if (atomicInteger.decrementAndGet() == 0 && atomicBoolean.compareAndSet(true, false)) {
            checkedRunnable.run();
        }
    }
}
