package org.mule.runtime.core.internal.routing;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.el.BindingContextUtils;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.collection.SmallMap;
import org.mule.runtime.core.api.el.ExpressionManagerSession;
import org.mule.runtime.core.api.el.ExtendedExpressionManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.event.EventInternalContextResolver;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.privileged.exception.SuppressedMuleException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/mule/runtime/core/internal/routing/UntilSuccessfulRouter.class */
class UntilSuccessfulRouter {
    private static final Logger LOGGER = LoggerFactory.getLogger(UntilSuccessfulRouter.class);
    static final String RETRY_CTX_INTERNAL_PARAM_KEY = "untilSuccessful.router.retryContext";
    private static final String UNTIL_SUCCESSFUL_MSG = "'until-successful' retries exhausted";
    private final Component owner;
    private final boolean suppressErrors;
    private final Predicate<CoreEvent> shouldRetry;
    private final ConditionalExecutorServiceDecorator delayScheduler;
    private final Flux<CoreEvent> upstreamFlux;
    private final Flux<CoreEvent> innerFlux;
    private Function<ExpressionManagerSession, Integer> maxRetriesSupplier;
    private Function<ExpressionManagerSession, Integer> delaySupplier;
    private Function<CoreEvent, ExpressionManagerSession> sessionSupplier;
    private final FluxSinkRecorder<CoreEvent> innerRecorder = new FluxSinkRecorder<>();
    private final FluxSinkRecorder<Either<Throwable, CoreEvent>> downstreamRecorder = new FluxSinkRecorder<>();
    private final AtomicReference<ContextView> downstreamCtxReference = new AtomicReference<>(Context.empty());
    private final AtomicInteger inflightEvents = new AtomicInteger(0);
    private final AtomicBoolean completeDeferred = new AtomicBoolean(false);
    private final EventInternalContextResolver<Map<String, RetryContext>> retryContextResolver = new EventInternalContextResolver<>(RETRY_CTX_INTERNAL_PARAM_KEY, HashMap::new);
    private final Flux<CoreEvent> downstreamFlux = Flux.create(fluxSink -> {
        this.downstreamRecorder.accept((FluxSink<Either<Throwable, CoreEvent>>) fluxSink);
        subscribeUpstreamChains(this.downstreamCtxReference.get());
    }).doOnNext(either -> {
        this.inflightEvents.decrementAndGet();
    }).map(getScopeResultMapper());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/UntilSuccessfulRouter$RetryContext.class */
    public static class RetryContext {
        CoreEvent event;
        AtomicInteger retryCount = new AtomicInteger();
        Integer delayInMillis;
        Integer maxRetries;

        RetryContext(CoreEvent coreEvent, Function<CoreEvent, ExpressionManagerSession> function, Function<ExpressionManagerSession, Integer> function2, Function<ExpressionManagerSession, Integer> function3) {
            this.event = coreEvent;
            ExpressionManagerSession apply = function.apply(coreEvent);
            this.maxRetries = function2.apply(apply);
            this.delayInMillis = function3.apply(apply);
            this.retryCount.set(this.maxRetries.intValue());
        }

        int getAttemptNumber() {
            return this.maxRetries.intValue() - this.retryCount.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/UntilSuccessfulRouter$RetryContextInitializationException.class */
    public static class RetryContextInitializationException extends RuntimeException {
        private static final long serialVersionUID = -399718600886069735L;

        public RetryContextInitializationException(Throwable th) {
            super(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UntilSuccessfulRouter(Component component, Publisher<CoreEvent> publisher, Processor processor, ProcessingStrategy processingStrategy, ExtendedExpressionManager extendedExpressionManager, Predicate<CoreEvent> predicate, Scheduler scheduler, String str, String str2, boolean z) {
        this.owner = component;
        this.suppressErrors = z;
        this.shouldRetry = predicate;
        this.delayScheduler = new ConditionalExecutorServiceDecorator(scheduler, scheduledExecutorService -> {
            return TransactionCoordination.isTransactionActive();
        });
        this.upstreamFlux = Flux.from(publisher).doOnNext(coreEvent -> {
            RetryContext retryContext = new RetryContext(coreEvent, this.sessionSupplier, this.maxRetriesSupplier, this.delaySupplier);
            this.inflightEvents.getAndIncrement();
            this.innerRecorder.next(eventWithCurrentContext(coreEvent, retryContext));
        }).doOnComplete(() -> {
            if (this.inflightEvents.get() == 0) {
                completeRouter();
            } else {
                this.completeDeferred.set(true);
            }
        });
        this.innerFlux = Flux.from(processingStrategy.configureInternalPublisher(this.innerRecorder.flux())).transform(flux -> {
            return MessageProcessors.applyWithChildContext(flux, processor, Optional.of(component.getLocation()));
        }).doOnNext(coreEvent2 -> {
            this.downstreamRecorder.next(Either.right(Throwable.class, eventWithCurrentContextDeleted(coreEvent2)));
            completeRouterIfNecessary();
        }).onErrorContinue(getRetryPredicate(), getRetryHandler());
        if (extendedExpressionManager.isExpression(str)) {
            this.maxRetriesSupplier = expressionToIntegerSupplierFor(str);
        } else {
            this.maxRetriesSupplier = expressionManagerSession -> {
                return Integer.valueOf(Integer.parseInt(str));
            };
        }
        if (extendedExpressionManager.isExpression(str2)) {
            this.delaySupplier = expressionToIntegerSupplierFor(str2);
        } else {
            this.delaySupplier = expressionManagerSession2 -> {
                return Integer.valueOf(Integer.parseInt(str2));
            };
        }
        if (extendedExpressionManager.isExpression(str) || extendedExpressionManager.isExpression(str2)) {
            this.sessionSupplier = coreEvent3 -> {
                return extendedExpressionManager.openSession(component.getLocation(), coreEvent3, BindingContextUtils.NULL_BINDING_CONTEXT);
            };
        } else {
            this.sessionSupplier = coreEvent4 -> {
                return null;
            };
        }
    }

    private Function<Either<Throwable, CoreEvent>, CoreEvent> getScopeResultMapper() {
        return either -> {
            if (either.isLeft()) {
                throw Exceptions.propagate((Throwable) either.getLeft());
            }
            return (CoreEvent) either.getRight();
        };
    }

    private BiConsumer<Throwable, Object> getRetryHandler() {
        return (th, obj) -> {
            MessagingException messagingException = (MessagingException) th;
            RetryContext retryContextForEvent = getRetryContextForEvent(messagingException.getEvent());
            int i = 0;
            if (retryContextForEvent != null) {
                i = retryContextForEvent.retryCount.getAndDecrement();
            } else {
                LOGGER.error("The RetryContext was not found. This is probably a race condition. No further attempts for the until successful will be done.");
            }
            if (i > 0) {
                LOGGER.error("Retrying execution of event, attempt {} of {}.", Integer.valueOf(retryContextForEvent.getAttemptNumber()), retryContextForEvent.maxRetries.intValue() != -1 ? retryContextForEvent.maxRetries : "unlimited");
                this.delayScheduler.schedule(() -> {
                    this.innerRecorder.next(eventWithCurrentContext(retryContextForEvent.event, retryContextForEvent));
                }, retryContextForEvent.delayInMillis.intValue(), TimeUnit.MILLISECONDS);
                return;
            }
            LOGGER.error("Retry attempts exhausted. Failing...");
            Throwable apply = retryContextForEvent != null ? getThrowableFunction(retryContextForEvent.event).apply(th) : getThrowableFunction(messagingException.getEvent()).apply(th);
            eventWithCurrentContextDeleted(messagingException.getEvent());
            this.downstreamRecorder.next(Either.left(apply, CoreEvent.class));
            completeRouterIfNecessary();
        };
    }

    private void completeRouterIfNecessary() {
        if (this.completeDeferred.get() && this.inflightEvents.get() == 0) {
            completeRouter();
        }
    }

    private void completeRouter() {
        this.innerRecorder.complete();
        this.downstreamRecorder.complete();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<CoreEvent> getDownstreamPublisher() {
        return this.downstreamFlux.transformDeferredContextual((flux, contextView) -> {
            return flux.doOnSubscribe(subscription -> {
                this.downstreamCtxReference.set(contextView);
            });
        });
    }

    private void subscribeUpstreamChains(ContextView contextView) {
        AtomicReference atomicReference = new AtomicReference();
        Flux contextWrite = this.innerFlux.contextWrite(contextView);
        Consumer consumer = coreEvent -> {
        };
        Objects.requireNonNull(atomicReference);
        contextWrite.subscribe(consumer, (v1) -> {
            r2.set(v1);
        });
        if (atomicReference.get() != null) {
            throw org.mule.runtime.core.api.rx.Exceptions.propagateWrappingFatal((Throwable) atomicReference.get());
        }
        Flux contextWrite2 = this.upstreamFlux.contextWrite(contextView);
        Consumer consumer2 = coreEvent2 -> {
        };
        Objects.requireNonNull(atomicReference);
        contextWrite2.subscribe(consumer2, (v1) -> {
            r2.set(v1);
        });
        if (atomicReference.get() != null) {
            throw org.mule.runtime.core.api.rx.Exceptions.propagateWrappingFatal((Throwable) atomicReference.get());
        }
    }

    private RetryContext getRetryContextForEvent(CoreEvent coreEvent) {
        return this.retryContextResolver.getCurrentContextFromEvent(coreEvent).get(coreEvent.getContext().getId());
    }

    private CoreEvent eventWithCurrentContext(CoreEvent coreEvent, RetryContext retryContext) {
        Map<String, RetryContext> copy = SmallMap.copy(this.retryContextResolver.getCurrentContextFromEvent(coreEvent));
        copy.put(coreEvent.getContext().getId(), retryContext);
        return this.retryContextResolver.eventWithContext(coreEvent, copy);
    }

    private CoreEvent eventWithCurrentContextDeleted(CoreEvent coreEvent) {
        Map<String, RetryContext> currentContextFromEvent = this.retryContextResolver.getCurrentContextFromEvent(coreEvent);
        currentContextFromEvent.remove(coreEvent.getContext().getId());
        return this.retryContextResolver.eventWithContext(coreEvent, currentContextFromEvent);
    }

    private Predicate<Throwable> getRetryPredicate() {
        return th -> {
            return (th instanceof MessagingException) && this.shouldRetry.test(((MessagingException) th).getEvent());
        };
    }

    private Function<Throwable, Throwable> getThrowableFunction(CoreEvent coreEvent) {
        return th -> {
            CoreEvent coreEvent2 = coreEvent;
            RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(I18nMessageFactory.createStaticMessage(UNTIL_SUCCESSFUL_MSG), suppressMuleException(th), this.owner);
            if (th instanceof MessagingException) {
                coreEvent2 = ((MessagingException) th).getEvent();
            }
            return new MessagingException(coreEvent2, retryPolicyExhaustedException, this.owner);
        };
    }

    private Throwable suppressMuleException(Throwable th) {
        return this.suppressErrors ? SuppressedMuleException.suppressIfPresent(th, MuleException.class) : th;
    }

    private Function<ExpressionManagerSession, Integer> expressionToIntegerSupplierFor(String str) {
        return expressionManagerSession -> {
            try {
                return (Integer) expressionManagerSession.evaluate(str, DataType.NUMBER).getValue();
            } catch (Exception e) {
                throw new RetryContextInitializationException(e);
            }
        };
    }
}
