package org.mule.runtime.core.processor.strategy;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.scheduler.SchedulerConfig;
import org.mule.runtime.core.context.notification.AsyncMessageNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.session.DefaultMuleSession;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Deprecated
/* loaded from: input_file:org/mule/runtime/core/processor/strategy/LegacyAsynchronousProcessingStrategyFactory.class */
public class LegacyAsynchronousProcessingStrategyFactory implements ProcessingStrategyFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(LegacyAsynchronousProcessingStrategyFactory.class);
    public static final String SYNCHRONOUS_EVENT_ERROR_MESSAGE = "Unable to process a transactional flow asynchronously";

    @Deprecated
    /* loaded from: input_file:org/mule/runtime/core/processor/strategy/LegacyAsynchronousProcessingStrategyFactory$LegacyAsynchronousProcessingStrategy.class */
    static class LegacyAsynchronousProcessingStrategy extends AbstractLegacyProcessingStrategy implements Startable, Stoppable {
        private Consumer<Scheduler> schedulerStopper;
        private MuleContext muleContext;
        private Supplier<Scheduler> schedulerSupplier;
        private Scheduler scheduler;

        public LegacyAsynchronousProcessingStrategy(Supplier<Scheduler> supplier, Consumer<Scheduler> consumer, MuleContext muleContext) {
            this.schedulerSupplier = supplier;
            this.schedulerStopper = consumer;
            this.muleContext = muleContext;
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public Function<Publisher<Event>, Publisher<Event>> onPipeline(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> function, MessagingExceptionHandler messagingExceptionHandler) {
            return publisher -> {
                return Flux.from(publisher).doOnNext(createOnEventConsumer()).doOnNext(fireAsyncScheduledNotification(flowConstruct)).doOnNext(event -> {
                    Flux.just(event).map(event -> {
                        return Event.builder(event).session(new DefaultMuleSession(event.getSession())).build();
                    }).publishOn(Schedulers.fromExecutorService(this.scheduler)).transform(function).doOnNext(event2 -> {
                        fireAsyncCompleteNotification(event2, flowConstruct, null);
                    }).doOnError(MessagingException.class, messagingException -> {
                        fireAsyncCompleteNotification(messagingException.getEvent(), flowConstruct, messagingException);
                    }).onErrorResumeWith(MessagingException.class, messagingExceptionHandler).onErrorResumeWith(Exceptions.EventDroppedException.class, eventDroppedException -> {
                        return Mono.empty();
                    }).doOnError(Exceptions.UNEXPECTED_EXCEPTION_PREDICATE, th -> {
                        LegacyAsynchronousProcessingStrategyFactory.LOGGER.error("Unhandled exception in async processing.", th);
                    }).subscribe();
                });
            };
        }

        @Override // org.mule.runtime.api.lifecycle.Startable
        public void start() throws MuleException {
            this.scheduler = this.schedulerSupplier.get();
        }

        @Override // org.mule.runtime.api.lifecycle.Stoppable
        public void stop() throws MuleException {
            this.schedulerStopper.accept(this.scheduler);
        }

        protected Consumer<Event> fireAsyncScheduledNotification(FlowConstruct flowConstruct) {
            return event -> {
                this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(flowConstruct, event, null, AsyncMessageNotification.PROCESS_ASYNC_SCHEDULED));
            };
        }

        protected void fireAsyncCompleteNotification(Event event, FlowConstruct flowConstruct, MessagingException messagingException) {
            this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(flowConstruct, event, null, AsyncMessageNotification.PROCESS_ASYNC_COMPLETE, messagingException));
        }
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public ProcessingStrategy create(MuleContext muleContext, String str) {
        return new LegacyAsynchronousProcessingStrategy(() -> {
            return muleContext.getSchedulerService().ioScheduler(SchedulerConfig.config().withName(str));
        }, scheduler -> {
            scheduler.stop(muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS);
        }, muleContext);
    }
}
