package org.mule.runtime.core.internal.execution;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.inject.Inject;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.execution.CompletableCallback;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.connection.SourceRemoteConnectionException;
import org.mule.runtime.api.event.EventContext;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.ErrorTypeRepository;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.interception.SourceInterceptor;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.notification.ConnectorMessageNotification;
import org.mule.runtime.api.notification.PollingSourceItemNotification;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.CorrelationIdGenerator;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.context.notification.NotificationHelper;
import org.mule.runtime.core.api.context.notification.ServerNotificationManager;
import org.mule.runtime.core.api.error.Errors;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.event.EventContextService;
import org.mule.runtime.core.api.execution.ExceptionContextProvider;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.internal.construct.AbstractPipeline;
import org.mule.runtime.core.internal.construct.FlowBackPressureException;
import org.mule.runtime.core.internal.event.InternalEvent;
import org.mule.runtime.core.internal.exception.InternalExceptionUtils;
import org.mule.runtime.core.internal.interception.InterceptorManager;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.policy.PolicyManager;
import org.mule.runtime.core.internal.policy.SourcePolicy;
import org.mule.runtime.core.internal.policy.SourcePolicyContext;
import org.mule.runtime.core.internal.policy.SourcePolicyFailureResult;
import org.mule.runtime.core.internal.policy.SourcePolicySuccessResult;
import org.mule.runtime.core.internal.processor.interceptor.CompletableInterceptorSourceFailureCallbackAdapter;
import org.mule.runtime.core.internal.processor.interceptor.CompletableInterceptorSourceSuccessCallbackAdapter;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.internal.util.FunctionalUtils;
import org.mule.runtime.core.internal.util.mediatype.MediaTypeDecoratedResultCollection;
import org.mule.runtime.core.internal.util.message.MessageUtils;
import org.mule.runtime.core.internal.util.message.TransformingLegacyResultAdapterCollection;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.event.context.FlowProcessMediatorContext;
import org.mule.runtime.core.privileged.exception.ErrorTypeLocator;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.tracer.api.EventTracer;
import org.mule.runtime.tracer.api.context.SpanContextAware;
import org.mule.runtime.tracer.api.context.getter.DistributedTraceContextGetter;
import org.mule.sdk.api.runtime.operation.Result;
import org.mule.sdk.api.runtime.source.DistributedTraceContextManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/mule/runtime/core/internal/execution/FlowProcessMediator.class */
public class FlowProcessMediator implements Initialisable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) FlowProcessMediator.class);

    @Inject
    private InterceptorManager processorInterceptorManager;

    @Inject
    private ErrorTypeRepository errorTypeRepository;

    @Inject
    private ErrorTypeLocator errorTypeLocator;

    @Inject
    private Collection<ExceptionContextProvider> exceptionContextProviders;

    @Inject
    private ServerNotificationManager notificationManager;

    @Inject
    private MuleContext muleContext;

    @Inject
    private EventContextService eventContextService;

    @Inject
    private FeatureFlaggingService featureFlaggingService;

    @Inject
    private InternalProfilingService profilingService;
    private final PolicyManager policyManager;
    private final PhaseResultNotifier phaseResultNotifier;
    private ErrorType sourceResponseGenerateErrorType;
    private ErrorType sourceResponseSendErrorType;
    private ErrorType sourceErrorResponseGenerateErrorType;
    private ErrorType sourceErrorResponseSendErrorType;
    private ErrorType flowBackPressureErrorType;
    private NotificationHelper notificationHelper;
    private Optional<CorrelationIdGenerator> correlationIdGenerator;
    private EventTracer<CoreEvent> coreEventTracer;
    private final List<CompletableInterceptorSourceSuccessCallbackAdapter> additionalSuccessInterceptors = new LinkedList();
    private final List<CompletableInterceptorSourceFailureCallbackAdapter> additionalFailureInterceptors = new LinkedList();
    private final List<SourceInterceptor> sourceInterceptors = new LinkedList();

    /* loaded from: input_file:org/mule/runtime/core/internal/execution/FlowProcessMediator$DefaultFlowProcessMediatorContext.class */
    public static final class DefaultFlowProcessMediatorContext implements FlowProcessMediatorContext {
        private final FlowProcessTemplate template;
        private final Consumer<Either<MessagingException, CoreEvent>> terminateConsumer;
        private final CompletableFuture<Void> responseCompletion;

        private DefaultFlowProcessMediatorContext(FlowProcessTemplate flowProcessTemplate, Consumer<Either<MessagingException, CoreEvent>> consumer, CompletableFuture<Void> completableFuture) {
            this.template = flowProcessTemplate;
            this.terminateConsumer = consumer;
            this.responseCompletion = completableFuture;
        }

        @Override // org.mule.runtime.core.internal.message.EventInternalContext
        /* renamed from: copy, reason: merged with bridge method [inline-methods] */
        public FlowProcessMediatorContext copy2() {
            return this;
        }
    }

    public FlowProcessMediator(PolicyManager policyManager, PhaseResultNotifier phaseResultNotifier) {
        this.policyManager = policyManager;
        this.phaseResultNotifier = phaseResultNotifier;
    }

    @Override // org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        this.coreEventTracer = this.profilingService.getCoreEventTracer();
        this.notificationHelper = new NotificationHelper(this.notificationManager, ConnectorMessageNotification.class, false, this.featureFlaggingService.isEnabled(MuleRuntimeFeature.DISABLE_OPTIMISED_NOTIFICATION_HANDLER_DYNAMIC_RESOLUTION_UPDATE_BASED_ON_DELEGATE));
        this.sourceResponseGenerateErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_GENERATE).orElseThrow(createInitialisationExceptionFor(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_GENERATE));
        this.sourceResponseSendErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_SEND).orElseThrow(createInitialisationExceptionFor(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_SEND));
        this.sourceErrorResponseGenerateErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_GENERATE).orElseThrow(createInitialisationExceptionFor(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_GENERATE));
        this.sourceErrorResponseSendErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_SEND).orElseThrow(createInitialisationExceptionFor(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_SEND));
        this.flowBackPressureErrorType = this.errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Unhandleable.FLOW_BACK_PRESSURE).orElseThrow(createInitialisationExceptionFor(Errors.ComponentIdentifiers.Unhandleable.FLOW_BACK_PRESSURE));
        this.correlationIdGenerator = this.muleContext.getConfiguration().getDefaultCorrelationIdGenerator();
        if (this.processorInterceptorManager != null) {
            this.processorInterceptorManager.getSourceInterceptorFactories().stream().forEach(sourceInterceptorFactory -> {
                CompletableInterceptorSourceSuccessCallbackAdapter completableInterceptorSourceSuccessCallbackAdapter = new CompletableInterceptorSourceSuccessCallbackAdapter(sourceInterceptorFactory);
                CompletableInterceptorSourceFailureCallbackAdapter completableInterceptorSourceFailureCallbackAdapter = new CompletableInterceptorSourceFailureCallbackAdapter(sourceInterceptorFactory);
                try {
                    this.muleContext.getInjector().inject(completableInterceptorSourceSuccessCallbackAdapter);
                    this.muleContext.getInjector().inject(completableInterceptorSourceFailureCallbackAdapter);
                    this.additionalSuccessInterceptors.add(0, completableInterceptorSourceSuccessCallbackAdapter);
                    this.additionalFailureInterceptors.add(0, completableInterceptorSourceFailureCallbackAdapter);
                    this.sourceInterceptors.add(0, sourceInterceptorFactory.get());
                } catch (MuleException e) {
                    throw new MuleRuntimeException(e);
                }
            });
        }
    }

    private Supplier<InitialisationException> createInitialisationExceptionFor(ComponentIdentifier componentIdentifier) {
        return () -> {
            return new InitialisationException(I18nMessageFactory.createStaticMessage("ErrorType %s not found in repository", componentIdentifier), this);
        };
    }

    public void process(FlowProcessTemplate flowProcessTemplate, MessageProcessContext messageProcessContext, Optional<DistributedTraceContextManager> optional) {
        try {
            MessageSource messageSource = messageProcessContext.getMessageSource();
            FlowConstruct flowConstruct = messageProcessContext.getFlowConstruct();
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            FlowProcessor flowProcessor = new FlowProcessor(publisher -> {
                Flux from = Flux.from(publisher);
                Objects.requireNonNull(flowProcessTemplate);
                return MessageProcessors.applyWithChildContext(from, flowProcessTemplate::routeEventAsync, Optional.empty());
            }, flowConstruct);
            CoreEvent createEvent = createEvent(flowProcessTemplate, messageSource, completableFuture, flowConstruct);
            if (optional.isPresent()) {
                setEventToSourceDistributedTraceContextManager(optional, createEvent);
            }
            flowProcessTemplate.getSourceMessage().getPollItemInformation().ifPresent(pollItemInformation -> {
                this.notificationManager.fireNotification(new PollingSourceItemNotification(PollingSourceItemNotification.ITEM_DISPATCHED, pollItemInformation.getPollId(), pollItemInformation.getItemId(), pollItemInformation.getWatermark().orElse(null), createEvent.getContext().getId(), pollItemInformation.getComponentLocation()));
            });
            this.policyManager.addSourcePointcutParametersIntoEvent(messageSource, createEvent.getMessage().getAttributes(), (InternalEvent) createEvent);
            try {
                SourcePolicy createSourcePolicyInstance = this.policyManager.createSourcePolicyInstance(messageSource, createEvent, flowProcessor, flowProcessTemplate);
                DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext = new DefaultFlowProcessMediatorContext(flowProcessTemplate, getTerminateConsumer(messageSource, flowProcessTemplate), completableFuture);
                ((InternalEvent) createEvent).setFlowProcessMediatorContext(defaultFlowProcessMediatorContext);
                BaseEventContext rootContext = ((BaseEventContext) createEvent.getContext()).getRootContext();
                this.sourceInterceptors.forEach(sourceInterceptor -> {
                    rootContext.onTerminated((coreEvent, th) -> {
                        sourceInterceptor.afterTerminated(messageSource.getLocation(), rootContext);
                    });
                });
                flowConstruct.getStatistics().incMessagesDispatched();
                dispatch(createEvent, createSourcePolicyInstance, (Pipeline) flowConstruct, defaultFlowProcessMediatorContext);
            } catch (Exception e) {
                flowProcessTemplate.sendFailureResponseToClient(messageProcessContext.getMessagingExceptionResolver().resolve(new MessagingException(createEvent, e), this.errorTypeLocator, this.exceptionContextProviders), flowProcessTemplate.getFailedExecutionResponseParametersFunction().apply(createEvent), CompletableCallback.always(() -> {
                    this.phaseResultNotifier.phaseFailure(e);
                }));
                ((BaseEventContext) createEvent.getContext()).error(e);
                completableFuture.complete(null);
            }
        } catch (Exception e2) {
            this.phaseResultNotifier.phaseFailure(e2);
        }
    }

    private static void setEventToSourceDistributedTraceContextManager(Optional<DistributedTraceContextManager> optional, CoreEvent coreEvent) {
        DistributedTraceContextManager distributedTraceContextManager = optional.get();
        if (distributedTraceContextManager instanceof SpanContextAware) {
            ((SpanContextAware) distributedTraceContextManager).setSpanContext(((SpanContextAware) coreEvent.getContext()).getSpanContext());
        }
    }

    private void dispatch(final CoreEvent coreEvent, SourcePolicy sourcePolicy, final Pipeline pipeline, final DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext) throws Exception {
        try {
            onMessageReceived(coreEvent, pipeline, defaultFlowProcessMediatorContext);
            pipeline.checkBackpressure(coreEvent);
            defaultFlowProcessMediatorContext.template.getNotificationFunctions().forEach(notificationFunction -> {
                this.notificationManager.fireNotification(notificationFunction.apply(coreEvent, pipeline.getSource()));
            });
            sourcePolicy.process(coreEvent, defaultFlowProcessMediatorContext.template, new CompletableCallback<Either<SourcePolicyFailureResult, SourcePolicySuccessResult>>() { // from class: org.mule.runtime.core.internal.execution.FlowProcessMediator.1
                @Override // org.mule.runtime.api.component.execution.CompletableCallback
                public void complete(Either<SourcePolicyFailureResult, SourcePolicySuccessResult> either) {
                    FlowProcessMediator.this.dispatchResponse(pipeline, defaultFlowProcessMediatorContext, either, coreEvent);
                }

                @Override // org.mule.runtime.api.component.execution.CompletableCallback
                public void error(Throwable th) {
                    FlowProcessMediator.this.dispatchResponse(pipeline, defaultFlowProcessMediatorContext, Either.left(new SourcePolicyFailureResult(new MessagingException(coreEvent, th), Collections::emptyMap)), coreEvent);
                }
            });
        } catch (Exception e) {
            Exception exc = (Exception) Exceptions.unwrap(e);
            this.coreEventTracer.endCurrentSpan(coreEvent);
            if (!(exc instanceof FlowBackPressureException)) {
                throw exc;
            }
            ((BaseEventContext) coreEvent.getContext()).error(exc);
            dispatchResponse(pipeline, defaultFlowProcessMediatorContext, mapBackPressureExceptionToPolicyFailureResult(defaultFlowProcessMediatorContext.template, coreEvent, (FlowBackPressureException) exc), coreEvent);
        }
    }

    private void dispatchResponse(Pipeline pipeline, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext, Either<SourcePolicyFailureResult, SourcePolicySuccessResult> either, CoreEvent coreEvent) {
        either.apply(policyFailure(pipeline, defaultFlowProcessMediatorContext, coreEvent), policySuccess(pipeline, defaultFlowProcessMediatorContext, coreEvent));
    }

    private void finish(Pipeline pipeline, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext, Throwable th) {
        try {
            if (th != null) {
                onFailure(pipeline, defaultFlowProcessMediatorContext).accept(th);
            } else {
                this.phaseResultNotifier.phaseSuccessfully();
            }
            defaultFlowProcessMediatorContext.responseCompletion.complete(null);
        } catch (Throwable th2) {
            defaultFlowProcessMediatorContext.responseCompletion.complete(null);
            throw th2;
        }
    }

    protected Either<SourcePolicyFailureResult, SourcePolicySuccessResult> mapBackPressureExceptionToPolicyFailureResult(FlowProcessTemplate flowProcessTemplate, CoreEvent coreEvent, FlowBackPressureException flowBackPressureException) {
        CoreEvent build = CoreEvent.builder(coreEvent).error(ErrorBuilder.builder(flowBackPressureException).errorType(this.flowBackPressureErrorType).build()).build();
        return Either.left(new SourcePolicyFailureResult(new MessagingException(build, flowBackPressureException, flowBackPressureException.getFlow()), () -> {
            return flowProcessTemplate.getFailedExecutionResponseParametersFunction().apply(build);
        }));
    }

    private Consumer<SourcePolicySuccessResult> policySuccess(Pipeline pipeline, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext, CoreEvent coreEvent) {
        return sourcePolicySuccessResult -> {
            fireNotification(pipeline.getSource(), sourcePolicySuccessResult.getResult(), pipeline, ConnectorMessageNotification.MESSAGE_RESPONSE);
            CompletableCallback<Void> completableCallback = new CompletableCallback<Void>() { // from class: org.mule.runtime.core.internal.execution.FlowProcessMediator.2
                @Override // org.mule.runtime.api.component.execution.CompletableCallback
                public void complete(Void r6) {
                    FlowProcessMediator.this.onTerminate(pipeline, defaultFlowProcessMediatorContext, Either.right(sourcePolicySuccessResult.getResult()));
                    FlowProcessMediator.this.finish(pipeline, defaultFlowProcessMediatorContext, null);
                    FlowProcessMediator.this.coreEventTracer.endCurrentSpan(coreEvent);
                }

                @Override // org.mule.runtime.api.component.execution.CompletableCallback
                public void error(Throwable th) {
                    FlowProcessMediator.this.policySuccessError(pipeline, new SourceErrorException(sourcePolicySuccessResult.getResult(), FlowProcessMediator.this.sourceResponseSendErrorType, th));
                }
            };
            try {
                if (this.additionalSuccessInterceptors.isEmpty()) {
                    defaultFlowProcessMediatorContext.template.sendResponseToClient(sourcePolicySuccessResult.getResult(), sourcePolicySuccessResult.getResponseParameters().get(), completableCallback);
                } else {
                    BiConsumer<SourcePolicySuccessResult, CompletableCallback<Void>> biConsumer = (sourcePolicySuccessResult, completableCallback2) -> {
                        defaultFlowProcessMediatorContext.template.sendResponseToClient(sourcePolicySuccessResult.getResult(), sourcePolicySuccessResult.getResponseParameters().get(), completableCallback2);
                    };
                    Iterator<CompletableInterceptorSourceSuccessCallbackAdapter> it = this.additionalSuccessInterceptors.iterator();
                    while (it.hasNext()) {
                        biConsumer = it.next().apply(pipeline.getSource(), (BiConsumer) biConsumer);
                    }
                    biConsumer.accept(sourcePolicySuccessResult, completableCallback);
                }
            } catch (Exception e) {
                policySuccessError(pipeline, new SourceErrorException(sourcePolicySuccessResult.getResult(), this.sourceResponseGenerateErrorType, e));
            }
        };
    }

    private Consumer<SourcePolicyFailureResult> policyFailure(Pipeline pipeline, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext, CoreEvent coreEvent) {
        return sourcePolicyFailureResult -> {
            fireNotification(pipeline.getSource(), sourcePolicyFailureResult.getMessagingException().getEvent(), pipeline, ConnectorMessageNotification.MESSAGE_ERROR_RESPONSE);
            CompletableCallback<Void> completableCallback = new CompletableCallback<Void>() { // from class: org.mule.runtime.core.internal.execution.FlowProcessMediator.3
                @Override // org.mule.runtime.api.component.execution.CompletableCallback
                public void complete(Void r6) {
                    FlowProcessMediator.this.onTerminate(pipeline, defaultFlowProcessMediatorContext, Either.left(sourcePolicyFailureResult.getMessagingException()));
                    FlowProcessMediator.this.finish(pipeline, defaultFlowProcessMediatorContext, null);
                }

                @Override // org.mule.runtime.api.component.execution.CompletableCallback
                public void error(Throwable th) {
                    FlowProcessMediator.this.finish(pipeline, defaultFlowProcessMediatorContext, th);
                }
            };
            try {
                try {
                    if (this.additionalFailureInterceptors.isEmpty()) {
                        sendErrorResponse(sourcePolicyFailureResult.getMessagingException(), coreEvent2 -> {
                            return sourcePolicyFailureResult.getErrorResponseParameters().get();
                        }, defaultFlowProcessMediatorContext, completableCallback);
                    } else {
                        BiConsumer<SourcePolicyFailureResult, CompletableCallback<Void>> biConsumer = (sourcePolicyFailureResult, completableCallback2) -> {
                            sendErrorResponse(sourcePolicyFailureResult.getMessagingException(), coreEvent3 -> {
                                return sourcePolicyFailureResult.getErrorResponseParameters().get();
                            }, defaultFlowProcessMediatorContext, completableCallback2);
                        };
                        Iterator<CompletableInterceptorSourceFailureCallbackAdapter> it = this.additionalFailureInterceptors.iterator();
                        while (it.hasNext()) {
                            biConsumer = it.next().apply(pipeline.getSource(), (BiConsumer) biConsumer);
                        }
                        biConsumer.accept(sourcePolicyFailureResult, completableCallback);
                    }
                    this.coreEventTracer.endCurrentSpan(coreEvent);
                } catch (Exception e) {
                    completableCallback.error(new SourceErrorException(sourcePolicyFailureResult.getResult(), this.sourceErrorResponseGenerateErrorType, e, sourcePolicyFailureResult.getMessagingException()));
                    this.coreEventTracer.endCurrentSpan(coreEvent);
                }
            } catch (Throwable th) {
                this.coreEventTracer.endCurrentSpan(coreEvent);
                throw th;
            }
        };
    }

    private void policySuccessError(Pipeline pipeline, SourceErrorException sourceErrorException) {
        MessagingException messagingException = sourceErrorException.toMessagingException(this.exceptionContextProviders, pipeline.getSource());
        if (pipeline instanceof AbstractPipeline) {
            ((AbstractPipeline) pipeline).errorRouterForSourceResponseError(pipeline2 -> {
                return exc -> {
                    final InternalEvent internalEvent = (InternalEvent) ((MessagingException) exc).getEvent();
                    final DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext = (DefaultFlowProcessMediatorContext) internalEvent.getFlowProcessMediatorContext();
                    sendErrorResponse((MessagingException) exc, SourcePolicyContext.from(internalEvent).getResponseParametersProcessor().getFailedExecutionResponseParametersFunction(), defaultFlowProcessMediatorContext, new CompletableCallback<Void>() { // from class: org.mule.runtime.core.internal.execution.FlowProcessMediator.4
                        @Override // org.mule.runtime.api.component.execution.CompletableCallback
                        public void complete(Void r6) {
                            FlowProcessMediator.this.onTerminate(pipeline2, defaultFlowProcessMediatorContext, Either.left(exc));
                            FlowProcessMediator.this.finish(pipeline2, defaultFlowProcessMediatorContext, null);
                            FlowProcessMediator.this.coreEventTracer.endCurrentSpan(internalEvent);
                        }

                        @Override // org.mule.runtime.api.component.execution.CompletableCallback
                        public void error(Throwable th) {
                            FlowProcessMediator.this.finish(pipeline2, defaultFlowProcessMediatorContext, th);
                            FlowProcessMediator.this.coreEventTracer.endCurrentSpan(internalEvent);
                        }
                    });
                };
            }).accept(messagingException);
        }
    }

    private void sendErrorResponse(MessagingException messagingException, Function<CoreEvent, Map<String, Object>> function, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext, final CompletableCallback<Void> completableCallback) {
        final CoreEvent event = messagingException.getEvent();
        try {
            if (ExceptionUtils.containsType(messagingException, SourceRemoteConnectionException.class)) {
                completableCallback.complete(null);
            } else {
                defaultFlowProcessMediatorContext.template.sendFailureResponseToClient(messagingException, function.apply(event), new CompletableCallback<Void>() { // from class: org.mule.runtime.core.internal.execution.FlowProcessMediator.5
                    @Override // org.mule.runtime.api.component.execution.CompletableCallback
                    public void complete(Void r4) {
                        completableCallback.complete(r4);
                    }

                    @Override // org.mule.runtime.api.component.execution.CompletableCallback
                    public void error(Throwable th) {
                        completableCallback.error(new SourceErrorException(CoreEvent.builder(event).error(ErrorBuilder.builder(th).errorType(FlowProcessMediator.this.sourceErrorResponseSendErrorType).build()).build(), FlowProcessMediator.this.sourceErrorResponseSendErrorType, th));
                    }
                });
            }
        } catch (Exception e) {
            completableCallback.error(new SourceErrorException(event, this.sourceErrorResponseGenerateErrorType, e, messagingException));
        }
    }

    private Consumer<Throwable> onFailure(Pipeline pipeline, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext) {
        return th -> {
            onTerminate(pipeline, defaultFlowProcessMediatorContext, Either.left(th));
            Throwable cause = th instanceof SourceErrorException ? th.getCause() : th;
            this.phaseResultNotifier.phaseFailure(cause instanceof Exception ? (Exception) cause : new DefaultMuleException(cause));
        };
    }

    private Consumer<Either<MessagingException, CoreEvent>> getTerminateConsumer(MessageSource messageSource, FlowProcessTemplate flowProcessTemplate) {
        return either -> {
            flowProcessTemplate.afterPhaseExecution(either.mapLeft(messagingException -> {
                messagingException.setProcessedEvent(InternalExceptionUtils.createErrorEvent(messagingException.getEvent(), messageSource, messagingException, this.errorTypeLocator));
                return messagingException;
            }));
        };
    }

    private void onMessageReceived(CoreEvent coreEvent, Pipeline pipeline, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext) {
        fireNotification(pipeline.getSource(), coreEvent, pipeline, ConnectorMessageNotification.MESSAGE_RECEIVED);
        defaultFlowProcessMediatorContext.template.getNotificationFunctions().forEach(notificationFunction -> {
            this.notificationManager.fireNotification(notificationFunction.apply(coreEvent, pipeline.getSource()));
        });
    }

    private String evaluateCorrelationIdExpressionGenerator() {
        if (this.correlationIdGenerator.isPresent()) {
            return this.correlationIdGenerator.get().generateCorrelationId();
        }
        return null;
    }

    private String resolveSourceCorrelationId(SourceResultAdapter sourceResultAdapter) {
        return sourceResultAdapter.getCorrelationId().orElseGet(this::evaluateCorrelationIdExpressionGenerator);
    }

    private CoreEvent createEvent(FlowProcessTemplate flowProcessTemplate, MessageSource messageSource, CompletableFuture<Void> completableFuture, FlowConstruct flowConstruct) {
        SourceResultAdapter sourceMessage = flowProcessTemplate.getSourceMessage();
        InternalEvent build = createEventBuilder(messageSource.getLocation(), completableFuture, flowConstruct, resolveSourceCorrelationId(sourceMessage), sourceMessage.getDistributedTraceContextManager()).message(eventContext -> {
            Message message;
            Result result = sourceMessage.getResult();
            Object output = result.getOutput();
            if ((output instanceof Collection) && sourceMessage.isCollection()) {
                message = MessageUtils.toMessage(Result.builder().output(MessageUtils.messageCollection(new MediaTypeDecoratedResultCollection(new TransformingLegacyResultAdapterCollection((Collection) output), sourceMessage.getPayloadMediaTypeResolver()), sourceMessage.getCursorProviderFactory(), ((BaseEventContext) eventContext).getRootContext(), messageSource.getLocation())).mediaType(result.getMediaType().orElse(MediaType.ANY)).build());
            } else {
                message = MessageUtils.toMessage((Result<?, ?>) result, sourceMessage.getMediaType(), sourceMessage.getCursorProviderFactory(), ((BaseEventContext) eventContext).getRootContext(), messageSource.getLocation());
            }
            return message;
        }).build();
        if (sourceMessage.getRootSpanName() != null) {
            this.coreEventTracer.setCurrentSpanName(build, sourceMessage.getRootSpanName());
        }
        this.coreEventTracer.addCurrentSpanAttributes(build, sourceMessage.getSpanRootAttributes());
        return build;
    }

    private InternalEvent.Builder createEventBuilder(ComponentLocation componentLocation, CompletableFuture<Void> completableFuture, FlowConstruct flowConstruct, String str, DistributedTraceContextManager distributedTraceContextManager) {
        return InternalEvent.builder(getEventContext(componentLocation, completableFuture, flowConstruct, str, distributedTraceContextManager));
    }

    private EventContext getEventContext(ComponentLocation componentLocation, CompletableFuture<Void> completableFuture, FlowConstruct flowConstruct, String str, final DistributedTraceContextManager distributedTraceContextManager) {
        EventContext create = EventContextFactory.create(flowConstruct, this.eventContextService, componentLocation, str, (Optional<CompletableFuture<Void>>) Optional.of(completableFuture));
        this.coreEventTracer.injectDistributedTraceContext(create, new DistributedTraceContextGetter() { // from class: org.mule.runtime.core.internal.execution.FlowProcessMediator.6
            @Override // org.mule.runtime.tracer.api.context.getter.DistributedTraceContextGetter
            public Iterable<String> keys() {
                return distributedTraceContextManager.getRemoteTraceContextMap().keySet();
            }

            @Override // org.mule.runtime.tracer.api.context.getter.DistributedTraceContextGetter
            public Optional<String> get(String str2) {
                return Optional.ofNullable(distributedTraceContextManager.getRemoteTraceContextMap().get(str2));
            }
        });
        return create;
    }

    private void onTerminate(Pipeline pipeline, DefaultFlowProcessMediatorContext defaultFlowProcessMediatorContext, Either<Throwable, CoreEvent> either) {
        FunctionalUtils.safely(either.mapLeft(th -> {
            if (th instanceof MessagingException) {
                return (MessagingException) th;
            }
            if (th instanceof SourceErrorException) {
                return ((SourceErrorException) th).toMessagingException(this.exceptionContextProviders, pipeline.getSource());
            }
            return null;
        }), either2 -> {
            defaultFlowProcessMediatorContext.terminateConsumer.accept(either2);
        }, exc -> {
        });
    }

    private void fireNotification(Component component, CoreEvent coreEvent, FlowConstruct flowConstruct, int i) {
        if (coreEvent == null) {
            try {
                coreEvent = PrivilegedEvent.getCurrentEvent();
                if (coreEvent == null) {
                    return;
                }
            } catch (Exception e) {
                if (LOGGER.isWarnEnabled()) {
                    LOGGER.warn(String.format("Could not fire notification. Action: %s", Integer.valueOf(i)), (Throwable) e);
                    return;
                }
                return;
            }
        }
        this.notificationHelper.fireNotification(component, coreEvent, flowConstruct.getLocation(), i);
    }

    public void setMuleContext(MuleContext muleContext) {
        this.muleContext = muleContext;
    }
}
