package org.mule.runtime.core.internal.execution;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.meta.AbstractAnnotatedObject;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.core.DefaultEventContext;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.notification.ConnectorMessageNotification;
import org.mule.runtime.core.api.exception.ErrorTypeMatcher;
import org.mule.runtime.core.api.exception.ErrorTypeRepository;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.exception.NullExceptionHandler;
import org.mule.runtime.core.api.exception.SingleErrorTypeMatcher;
import org.mule.runtime.core.api.execution.MessageProcessContext;
import org.mule.runtime.core.api.execution.MessageProcessTemplate;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.message.ErrorBuilder;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.internal.policy.PolicyManager;
import org.mule.runtime.core.internal.policy.SourcePolicy;
import org.mule.runtime.core.internal.policy.SourcePolicyFailureResult;
import org.mule.runtime.core.internal.policy.SourcePolicySuccessResult;
import org.mule.runtime.core.internal.util.FunctionalUtils;
import org.mule.runtime.core.internal.util.message.MessageUtils;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:org/mule/runtime/core/internal/execution/ModuleFlowProcessingPhase.class */
public class ModuleFlowProcessingPhase extends NotificationFiringProcessingPhase<ModuleFlowProcessingPhaseTemplate> implements Initialisable {
    private static Logger LOGGER = LoggerFactory.getLogger((Class<?>) ModuleFlowProcessingPhase.class);
    private ErrorTypeMatcher sourceResponseErrorTypeMatcher;
    private ErrorType sourceResponseGenerateErrorType;
    private ErrorType sourceResponseSendErrorType;
    private ErrorType sourceErrorResponseGenerateErrorType;
    private ErrorType sourceErrorResponseSendErrorType;
    private final PolicyManager policyManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/execution/ModuleFlowProcessingPhase$FlowProcessor.class */
    public class FlowProcessor extends AbstractAnnotatedObject implements Processor {
        private final ModuleFlowProcessingPhaseTemplate template;
        private final InternalEvent templateEvent;
        private final MessagingExceptionHandler messagingExceptionHandler;

        public FlowProcessor(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessagingExceptionHandler messagingExceptionHandler, InternalEvent internalEvent) {
            this.template = moduleFlowProcessingPhaseTemplate;
            this.templateEvent = internalEvent;
            this.messagingExceptionHandler = messagingExceptionHandler;
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public InternalEvent process(InternalEvent internalEvent) throws MuleException {
            return MessageProcessors.processToApply(internalEvent, this);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
        public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
            return Mono.from(publisher).flatMapMany(internalEvent -> {
                return MessageProcessors.processWithChildContext(internalEvent, publisher2 -> {
                    return Mono.from(publisher2).flatMapMany(internalEvent -> {
                        return this.template.routeEventAsync(internalEvent);
                    }).switchIfEmpty(Mono.fromCallable(() -> {
                        return ModuleFlowProcessingPhase.this.emptyEvent(this.templateEvent);
                    }));
                }, Optional.empty(), this.messagingExceptionHandler);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/execution/ModuleFlowProcessingPhase$PhaseContext.class */
    public static final class PhaseContext {
        final ModuleFlowProcessingPhaseTemplate template;
        final MessageProcessContext messageProcessContext;
        final PhaseResultNotifier phaseResultNotifier;
        final Consumer<Either<MessagingException, InternalEvent>> terminateConsumer;

        PhaseContext(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier, Consumer<Either<MessagingException, InternalEvent>> consumer) {
            this.template = moduleFlowProcessingPhaseTemplate;
            this.messageProcessContext = messageProcessContext;
            this.phaseResultNotifier = phaseResultNotifier;
            this.terminateConsumer = consumer;
        }
    }

    public ModuleFlowProcessingPhase(PolicyManager policyManager) {
        this.policyManager = policyManager;
    }

    @Override // org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        ErrorTypeRepository errorTypeRepository = this.muleContext.getErrorTypeRepository();
        this.sourceResponseErrorTypeMatcher = new SingleErrorTypeMatcher(errorTypeRepository.getSourceResponseErrorType());
        this.sourceResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_GENERATE).get();
        this.sourceResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_SEND).get();
        this.sourceErrorResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_GENERATE).get();
        this.sourceErrorResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_SEND).get();
    }

    @Override // org.mule.runtime.core.internal.execution.MessageProcessPhase
    public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) {
        return messageProcessTemplate instanceof ModuleFlowProcessingPhaseTemplate;
    }

    @Override // org.mule.runtime.core.internal.execution.MessageProcessPhase
    public void runPhase(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) {
        try {
            MessageSource messageSource = messageProcessContext.getMessageSource();
            FlowConstruct flowConstruct = (FlowConstruct) this.muleContext.getConfigurationComponentLocator().find(Location.builder().globalName(messageSource.getRootContainerName()).build()).get();
            ComponentLocation location = messageSource.getLocation();
            Consumer<Either<MessagingException, InternalEvent>> terminateConsumer = getTerminateConsumer(messageSource, moduleFlowProcessingPhaseTemplate);
            MonoProcessor create = MonoProcessor.create();
            InternalEvent createEvent = createEvent(moduleFlowProcessingPhaseTemplate, location, create, flowConstruct);
            FlowProcessor flowProcessor = new FlowProcessor(moduleFlowProcessingPhaseTemplate, flowConstruct.getExceptionListener(), createEvent);
            flowProcessor.setAnnotations(flowConstruct.getAnnotations());
            SourcePolicy createSourcePolicyInstance = this.policyManager.createSourcePolicyInstance(messageSource, createEvent, flowProcessor, moduleFlowProcessingPhaseTemplate);
            PhaseContext phaseContext = new PhaseContext(moduleFlowProcessingPhaseTemplate, messageProcessContext, phaseResultNotifier, terminateConsumer);
            Mono.just(createEvent).doOnNext(onMessageReceived(messageProcessContext, flowConstruct)).then(internalEvent -> {
                return Mono.from(createSourcePolicyInstance.process(internalEvent));
            }).then(either -> {
                return (Mono) either.reduce(policyFailure(phaseContext, flowConstruct), policySuccess(phaseContext, flowConstruct));
            }).doOnSuccess(r3 -> {
                phaseResultNotifier.phaseSuccessfully();
            }).doOnError(onFailure(phaseResultNotifier, terminateConsumer)).doAfterTerminate((r32, th) -> {
                create.onComplete();
            }).subscribe();
        } catch (Exception e) {
            phaseResultNotifier.phaseFailure(e);
        }
    }

    private Consumer<InternalEvent> onMessageReceived(MessageProcessContext messageProcessContext, FlowConstruct flowConstruct) {
        return internalEvent -> {
            fireNotification(messageProcessContext.getMessageSource(), internalEvent, flowConstruct, ConnectorMessageNotification.MESSAGE_RECEIVED);
        };
    }

    private Function<SourcePolicySuccessResult, Mono<Void>> policySuccess(PhaseContext phaseContext, FlowConstruct flowConstruct) {
        return sourcePolicySuccessResult -> {
            fireNotification(phaseContext.messageProcessContext.getMessageSource(), sourcePolicySuccessResult.getResult(), flowConstruct, ConnectorMessageNotification.MESSAGE_RESPONSE);
            try {
                return Mono.from(phaseContext.template.sendResponseToClient(sourcePolicySuccessResult.getResult(), sourcePolicySuccessResult.getResponseParameters().get())).doOnSuccess(r7 -> {
                    onTerminate(phaseContext.terminateConsumer, Either.right(sourcePolicySuccessResult.getResult()));
                }).onErrorResume(th -> {
                    return policySuccessError(new SourceErrorException(sourcePolicySuccessResult.getResult(), this.sourceResponseSendErrorType, th), sourcePolicySuccessResult, phaseContext, flowConstruct);
                });
            } catch (Exception e) {
                return policySuccessError(new SourceErrorException(sourcePolicySuccessResult.getResult(), this.sourceResponseGenerateErrorType, e), sourcePolicySuccessResult, phaseContext, flowConstruct);
            }
        };
    }

    private Function<SourcePolicyFailureResult, Mono<Void>> policyFailure(PhaseContext phaseContext, FlowConstruct flowConstruct) {
        return sourcePolicyFailureResult -> {
            fireNotification(phaseContext.messageProcessContext.getMessageSource(), sourcePolicyFailureResult.getMessagingException().getEvent(), flowConstruct, ConnectorMessageNotification.MESSAGE_ERROR_RESPONSE);
            return sendErrorResponse(sourcePolicyFailureResult.getMessagingException(), internalEvent -> {
                return sourcePolicyFailureResult.getErrorResponseParameters().get();
            }, phaseContext).doOnSuccess(r7 -> {
                onTerminate(phaseContext.terminateConsumer, Either.left(sourcePolicyFailureResult.getMessagingException()));
            });
        };
    }

    private Mono<Void> policySuccessError(SourceErrorException sourceErrorException, SourcePolicySuccessResult sourcePolicySuccessResult, PhaseContext phaseContext, FlowConstruct flowConstruct) {
        MessagingException messagingException = sourceErrorException.toMessagingException();
        return Mono.when(Mono.just(messagingException).flatMapMany(flowConstruct.getExceptionListener()).last().onErrorResume(th -> {
            return Mono.empty();
        }), sendErrorResponse(messagingException, sourcePolicySuccessResult.createErrorResponseParameters(), phaseContext).doOnSuccess(r7 -> {
            onTerminate(phaseContext.terminateConsumer, Either.left(messagingException));
        })).then();
    }

    private Mono<Void> sendErrorResponse(MessagingException messagingException, Function<InternalEvent, Map<String, Object>> function, PhaseContext phaseContext) {
        InternalEvent event = messagingException.getEvent();
        if (messagingException.inErrorHandler()) {
            return Mono.error(new SourceErrorException(event, this.sourceErrorResponseGenerateErrorType, messagingException.getCause(), messagingException));
        }
        try {
            return Mono.from(phaseContext.template.sendFailureResponseToClient(messagingException, function.apply(event))).onErrorMap(th -> {
                return new SourceErrorException(InternalEvent.builder(messagingException.getEvent()).error(ErrorBuilder.builder(th).errorType(this.sourceErrorResponseSendErrorType).build()).build(), this.sourceErrorResponseSendErrorType, th);
            });
        } catch (Exception e) {
            return Mono.error(new SourceErrorException(event, this.sourceErrorResponseGenerateErrorType, e, messagingException));
        }
    }

    private Consumer<Throwable> onFailure(PhaseResultNotifier phaseResultNotifier, Consumer<Either<MessagingException, InternalEvent>> consumer) {
        return th -> {
            onTerminate(consumer, Either.left(th));
            Throwable cause = th instanceof SourceErrorException ? th.getCause() : th;
            phaseResultNotifier.phaseFailure(cause instanceof Exception ? (Exception) cause : new DefaultMuleException(cause));
        };
    }

    private Consumer<Either<MessagingException, InternalEvent>> getTerminateConsumer(MessageSource messageSource, ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate) {
        return either -> {
            moduleFlowProcessingPhaseTemplate.afterPhaseExecution(either.mapLeft(messagingException -> {
                messagingException.setProcessedEvent(ExceptionUtils.createErrorEvent(messagingException.getEvent(), messageSource, messagingException, this.muleContext.getErrorTypeLocator()));
                return messagingException;
            }));
        };
    }

    private InternalEvent createEvent(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, ComponentLocation componentLocation, Publisher<Void> publisher, FlowConstruct flowConstruct) throws MuleException {
        Message message = moduleFlowProcessingPhaseTemplate.getMessage();
        InternalEvent build = InternalEvent.builder(DefaultEventContext.create(flowConstruct.getUniqueIdString(), flowConstruct.getServerId(), componentLocation, null, publisher, NullExceptionHandler.getInstance())).message(message).flow(flowConstruct).build();
        if (message.getPayload().getValue() instanceof SourceResultAdapter) {
            SourceResultAdapter sourceResultAdapter = (SourceResultAdapter) message.getPayload().getValue();
            Result result = sourceResultAdapter.getResult();
            Object output = result.getOutput();
            build = InternalEvent.builder(build).message(((output instanceof Collection) && sourceResultAdapter.isCollection()) ? MessageUtils.toMessage(Result.builder().output(MessageUtils.toMessageCollection((Collection) output, sourceResultAdapter.getCursorProviderFactory(), build)).mediaType(result.getMediaType().orElse(MediaType.ANY)).build()) : MessageUtils.toMessage(result, result.getMediaType().orElse(MediaType.ANY), sourceResultAdapter.getCursorProviderFactory(), build)).build();
        }
        return build;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public InternalEvent emptyEvent(InternalEvent internalEvent) {
        return InternalEvent.builder(internalEvent).message(Message.of(null)).build();
    }

    private void onTerminate(Consumer<Either<MessagingException, InternalEvent>> consumer, Either<Throwable, InternalEvent> either) {
        FunctionalUtils.safely(() -> {
            consumer.accept(either.mapLeft(th -> {
                if (th instanceof MessagingException) {
                    return (MessagingException) th;
                }
                if (th instanceof SourceErrorException) {
                    return ((SourceErrorException) th).toMessagingException();
                }
                return null;
            }));
        });
    }

    @Override // java.lang.Comparable
    public int compareTo(MessageProcessPhase messageProcessPhase) {
        return messageProcessPhase instanceof ValidationPhase ? 1 : 0;
    }
}
