/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.routing.forkjoin;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.ItemSequenceInfo;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.CollectionDataType;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.util.Pair;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.event.DefaultEventBuilder;
import org.mule.runtime.core.internal.event.InternalEvent;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.internal.routing.ForkJoinStrategyFactory;
import org.mule.runtime.core.internal.routing.forkjoin.DefaultRoutePairPublisherAssemblyHelper;
import org.mule.runtime.core.internal.routing.forkjoin.LegacyRoutePairPublisherAssemblyHelper;
import org.mule.runtime.core.internal.routing.forkjoin.RoutePairPublisherAssemblyHelper;
import org.mule.runtime.core.internal.routing.result.CompositeRoutingException;
import org.mule.runtime.core.privileged.exception.EventProcessingException;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.routing.RoutingResult;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public abstract class AbstractForkJoinStrategyFactory
implements ForkJoinStrategyFactory {
    public static final String TIMEOUT_EXCEPTION_DESCRIPTION = "Route Timeout";
    public static final String TIMEOUT_EXCEPTION_DETAILED_DESCRIPTION_PREFIX = "Timeout while processing route/part:";
    private final boolean mergeVariables;
    private final boolean completeChildContextsOnTimeout;

    @Inject
    public AbstractForkJoinStrategyFactory(FeatureFlaggingService featureFlaggingService) {
        this(true, featureFlaggingService);
    }

    public AbstractForkJoinStrategyFactory(boolean mergeVariables, FeatureFlaggingService featureFlaggingService) {
        this.mergeVariables = mergeVariables;
        this.completeChildContextsOnTimeout = featureFlaggingService != null && featureFlaggingService.isEnabled(MuleRuntimeFeature.FORK_JOIN_COMPLETE_CHILDREN_ON_TIMEOUT);
    }

    @Override
    public ForkJoinStrategy createForkJoinStrategy(ProcessingStrategy processingStrategy, int maxConcurrency, boolean delayErrors, long timeout, org.mule.runtime.api.scheduler.Scheduler timeoutScheduler, ErrorType timeoutErrorType, org.mule.runtime.api.scheduler.Scheduler timeoutBlockingScheduler, boolean isDetailedLogEnabled) {
        Duration timeoutDuration = timeout == Long.MAX_VALUE ? Duration.ofNanos(Long.MAX_VALUE) : Duration.ofMillis(timeout);
        Scheduler reactorTimeoutScheduler = Schedulers.fromExecutorService((ExecutorService)timeoutScheduler);
        return (original, routingPairs) -> {
            AtomicInteger count = new AtomicInteger();
            CoreEvent.Builder resultBuilder = CoreEvent.builder(original);
            return Flux.from((Publisher)routingPairs).map(this.addSequence(count)).flatMapSequential(this.processRoutePair(processingStrategy, maxConcurrency, delayErrors, timeoutDuration, reactorTimeoutScheduler, timeoutErrorType, timeoutBlockingScheduler), maxConcurrency).reduce(new Pair(new ArrayList(), false), (listBooleanPair, coreEventExceptionPair) -> {
                ((List)listBooleanPair.getFirst()).add(coreEventExceptionPair);
                boolean hasNewError = ((CoreEvent)coreEventExceptionPair.getFirst()).getError().map(err -> !this.isOriginalError((Error)err, original.getError())).orElse(false);
                return new Pair<List, Boolean>((List)listBooleanPair.getFirst(), (Boolean)listBooleanPair.getSecond() != false || hasNewError);
            }).doOnNext(listBooleanPair -> {
                if (((Boolean)listBooleanPair.getSecond()).booleanValue()) {
                    throw Exceptions.propagate((Throwable)this.createCompositeRoutingException(((List)listBooleanPair.getFirst()).stream().map(coreEventExceptionPair -> this.removeOriginalError((Pair<CoreEvent, EventProcessingException>)coreEventExceptionPair, original.getError())).collect(Collectors.toList()), isDetailedLogEnabled));
                }
            }).map(listBooleanPair -> ((List)listBooleanPair.getFirst()).stream().map(Pair::getFirst).collect(Collectors.toList())).doOnNext(this.mergeVariables(original, resultBuilder)).map(this.createResultEvent(original, resultBuilder));
        };
    }

    private boolean isOriginalError(Error newError, Optional<Error> originalError) {
        return originalError.map(error -> error.equals(newError)).orElse(false);
    }

    private Pair<CoreEvent, EventProcessingException> removeOriginalError(Pair<CoreEvent, EventProcessingException> coreEventExceptionPair, Optional<Error> originalError) {
        CoreEvent coreEvent = coreEventExceptionPair.getFirst();
        EventProcessingException eventProcessingException = coreEventExceptionPair.getSecond();
        return coreEvent.getError().map(err -> this.isOriginalError((Error)err, originalError) ? new Pair<CoreEvent, EventProcessingException>(CoreEvent.builder(coreEvent).error(null).build(), eventProcessingException) : new Pair<CoreEvent, EventProcessingException>(coreEvent, eventProcessingException)).orElse(coreEventExceptionPair);
    }

    protected abstract Function<List<CoreEvent>, CoreEvent> createResultEvent(CoreEvent var1, CoreEvent.Builder var2);

    private Function<ForkJoinStrategy.RoutingPair, ForkJoinStrategy.RoutingPair> addSequence(AtomicInteger count) {
        return pair -> ForkJoinStrategy.RoutingPair.of(CoreEvent.builder(pair.getEvent()).itemSequenceInfo(Optional.ofNullable(ItemSequenceInfo.of(count.getAndIncrement()))).build(), pair.getRoute());
    }

    private Function<ForkJoinStrategy.RoutingPair, Publisher<Pair<CoreEvent, EventProcessingException>>> processRoutePair(ProcessingStrategy processingStrategy, int maxConcurrency, boolean delayErrors, Duration timeout, Scheduler reactorTimeoutScheduler, ErrorType timeoutErrorType, org.mule.runtime.api.scheduler.Scheduler timeoutBlockingScheduler) {
        return pair -> {
            ReactiveProcessor route = publisher -> Flux.from((Publisher)publisher).transform((Function)pair.getRoute());
            route = this.applyProcessingStrategy(processingStrategy, route, maxConcurrency);
            RoutePairPublisherAssemblyHelper routePairPublisherAssemblyHelper = this.completeChildContextsOnTimeout ? new DefaultRoutePairPublisherAssemblyHelper(pair.getEvent(), route, timeoutBlockingScheduler) : new LegacyRoutePairPublisherAssemblyHelper(pair.getEvent(), route);
            return Flux.from(routePairPublisherAssemblyHelper.getPublisherOnChildContext()).timeout(timeout, routePairPublisherAssemblyHelper.decorateTimeoutPublisher((Publisher<CoreEvent>)this.onTimeout(processingStrategy, delayErrors, timeoutErrorType, (ForkJoinStrategy.RoutingPair)pair)), reactorTimeoutScheduler).map(this::eventToPair).onErrorResume(MessagingException.class, me -> this.getPublisher(delayErrors, (EventProcessingException)me));
        };
    }

    private Pair<CoreEvent, EventProcessingException> eventToPair(CoreEvent coreEvent) {
        return new Pair<InternalEvent, Object>(((DefaultEventBuilder)CoreEvent.builder(coreEvent)).removeInternalParameter("error.context").build(), null);
    }

    private Publisher<Pair<CoreEvent, EventProcessingException>> getPublisher(boolean delayErrors, EventProcessingException me) {
        Pair<CoreEvent, EventProcessingException> pair = new Pair<CoreEvent, EventProcessingException>(me.getEvent(), me);
        return delayErrors ? Mono.just(pair) : Mono.error((Throwable)me);
    }

    private Mono<CoreEvent> onTimeout(ProcessingStrategy processingStrategy, boolean delayErrors, ErrorType timeoutErrorType, ForkJoinStrategy.RoutingPair pair) {
        return Mono.defer(() -> delayErrors ? Mono.just((Object)this.createTimeoutErrorEvent(timeoutErrorType, pair)) : Mono.error((Throwable)new TimeoutException(this.buildDetailedDescription(pair)))).transform((Function)processingStrategy.onPipeline(p -> p));
    }

    private ReactiveProcessor applyProcessingStrategy(ProcessingStrategy processingStrategy, ReactiveProcessor processor, int maxConcurrency) {
        if (maxConcurrency > 1) {
            return processingStrategy.onPipeline(processor);
        }
        return processor;
    }

    private CoreEvent createTimeoutErrorEvent(ErrorType timeoutErrorType, ForkJoinStrategy.RoutingPair pair) {
        String detailedDescription = this.buildDetailedDescription(pair);
        return CoreEvent.builder(pair.getEvent()).message(Message.of(null)).error(ErrorBuilder.builder().errorType(timeoutErrorType).exception(new TimeoutException(detailedDescription)).description(TIMEOUT_EXCEPTION_DESCRIPTION).detailedDescription(detailedDescription).build()).build();
    }

    private String buildDetailedDescription(ForkJoinStrategy.RoutingPair pair) {
        return "Timeout while processing route/part: '" + pair.getEvent().getGroupCorrelation().get().getSequence() + "'";
    }

    private CompositeRoutingException createCompositeRoutingException(List<Pair<CoreEvent, EventProcessingException>> results, boolean isDetailedLogEnabled) {
        LinkedHashMap<String, Message> successMap = new LinkedHashMap<String, Message>();
        LinkedHashMap<String, Pair<Error, EventProcessingException>> errorMap = new LinkedHashMap<String, Pair<Error, EventProcessingException>>();
        for (Pair<CoreEvent, EventProcessingException> eventExceptionPair : results) {
            String key = Integer.toString(eventExceptionPair.getFirst().getGroupCorrelation().get().getSequence());
            if (eventExceptionPair.getFirst().getError().isPresent()) {
                errorMap.put(key, new Pair<Error, EventProcessingException>(eventExceptionPair.getFirst().getError().get(), eventExceptionPair.getSecond()));
                continue;
            }
            successMap.put(key, eventExceptionPair.getFirst().getMessage());
        }
        if (isDetailedLogEnabled) {
            return new CompositeRoutingException(RoutingResult.routingResultWithException(successMap, errorMap));
        }
        Map<String, Error> previousErrorMap = errorMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, pair -> (Error)((Pair)pair.getValue()).getFirst()));
        return new CompositeRoutingException(new RoutingResult(successMap, previousErrorMap));
    }

    private Consumer<List<CoreEvent>> mergeVariables(CoreEvent original, CoreEvent.Builder result) {
        return list -> {
            if (!this.mergeVariables) {
                return;
            }
            HashMap<String, TypedValue> routeVars = new HashMap<String, TypedValue>();
            list.forEach(event -> event.getVariables().forEach((key, typedValue) -> {
                if (!typedValue.equals(original.getVariables().get(key))) {
                    if (!routeVars.containsKey(key)) {
                        AbstractForkJoinStrategyFactory.addNewVariable(routeVars, key, typedValue);
                    } else {
                        AbstractForkJoinStrategyFactory.addExistingVariable(routeVars, key, typedValue);
                    }
                }
            }));
            routeVars.forEach((s, typedValue) -> result.addVariable((String)s, (TypedValue<?>)typedValue));
        };
    }

    private static void addNewVariable(Map<String, TypedValue<?>> routeVars, String key, TypedValue<?> typedValue) {
        if (typedValue.getValue() instanceof List) {
            ArrayList newList = new ArrayList((List)typedValue.getValue());
            routeVars.put(key, new TypedValue(newList, DataType.builder().collectionType(List.class).itemType(((CollectionDataType)typedValue.getDataType()).getItemDataType().getType()).build()));
        } else {
            routeVars.put(key, typedValue);
        }
    }

    private static void addExistingVariable(Map<String, TypedValue<?>> routeVars, String key, TypedValue<?> typedValue) {
        if (!(routeVars.get(key).getValue() instanceof List)) {
            ArrayList newList = new ArrayList();
            newList.add(routeVars.get(key).getValue());
            routeVars.put(key, new TypedValue(newList, DataType.builder().collectionType(List.class).itemType(routeVars.get(key).getDataType().getType()).build()));
        }
        List valueList = (List)routeVars.get(key).getValue();
        valueList.add(typedValue.getValue());
        if (((CollectionDataType)routeVars.get(key).getDataType()).getItemDataType().isCompatibleWith(typedValue.getDataType())) {
            routeVars.put(key, new TypedValue<List>(valueList, routeVars.get(key).getDataType()));
        } else {
            routeVars.put(key, new TypedValue<List>(valueList, DataType.builder().collectionType(List.class).build()));
        }
    }
}

