package org.mule.routing;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.resource.spi.work.WorkException;
import org.apache.commons.collections.CollectionUtils;
import org.mule.DefaultMuleEvent;
import org.mule.OptimizedRequestContext;
import org.mule.api.DefaultMuleException;
import org.mule.api.MessagingException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.config.ThreadingProfile;
import org.mule.api.context.WorkManager;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageRouter;
import org.mule.api.routing.AggregationContext;
import org.mule.api.routing.ResponseTimeoutException;
import org.mule.api.routing.RoutePathNotFoundException;
import org.mule.api.transport.DispatchException;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
import org.mule.message.DefaultExceptionPayload;
import org.mule.processor.AbstractMessageProcessorOwner;
import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
import org.mule.util.Preconditions;
import org.mule.util.concurrent.ThreadNameHelper;
import org.mule.work.ProcessingMuleEventWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/mule-core-3.5.5-SNAPSHOT.jar:org/mule/routing/ScatterGatherRouter.class */
public class ScatterGatherRouter extends AbstractMessageProcessorOwner implements MessageRouter {
    private static final Logger logger = LoggerFactory.getLogger(ScatterGatherRouter.class);
    private long timeout = 0;
    private List<MessageProcessor> routes = new ArrayList();
    private boolean initialised = false;
    private List<MessageProcessor> routeChains;
    private AggregationStrategy aggregationStrategy;
    private ThreadingProfile threadingProfile;
    private WorkManager workManager;

    @Override // org.mule.api.processor.MessageProcessor
    public MuleEvent process(MuleEvent muleEvent) throws MuleException {
        if (CollectionUtils.isEmpty(this.routes)) {
            throw new RoutePathNotFoundException(CoreMessages.noEndpointsForRouter(), muleEvent, (MessageProcessor) null);
        }
        AbstractRoutingStrategy.validateMessageIsNotConsumable(muleEvent, muleEvent.getMessage());
        MuleEvent processResponses = processResponses(muleEvent, executeWork(muleEvent));
        if (processResponses instanceof DefaultMuleEvent) {
            processResponses = DefaultMuleEvent.copy(processResponses);
            OptimizedRequestContext.unsafeSetEvent(processResponses);
        }
        return processResponses;
    }

    private MuleEvent processResponses(MuleEvent muleEvent, List<ProcessingMuleEventWork> list) throws MuleException {
        ArrayList arrayList = new ArrayList(list.size());
        long j = this.timeout;
        for (int i = 0; i < list.size(); i++) {
            MuleEvent muleEvent2 = null;
            Exception exc = null;
            ProcessingMuleEventWork processingMuleEventWork = list.get(i);
            MessageProcessor messageProcessor = this.routes.get(i);
            long currentTimeMillis = System.currentTimeMillis();
            try {
                muleEvent2 = processingMuleEventWork.getResult(j, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new DefaultMuleException(MessageFactory.createStaticMessage(String.format("Was interrupted while waiting for route %d", Integer.valueOf(i))), e);
            } catch (ResponseTimeoutException e2) {
                exc = e2;
            } catch (MessagingException e3) {
                exc = wrapInDispatchException(e3.getEvent(), i, messageProcessor, e3);
            } catch (Exception e4) {
                exc = wrapInDispatchException(muleEvent, i, messageProcessor, e4);
            }
            j -= System.currentTimeMillis() - currentTimeMillis;
            if (exc != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("route %d generated exception for MuleEvent %s", Integer.valueOf(i), muleEvent.getId()), (Throwable) exc);
                }
                muleEvent2 = exc instanceof MessagingException ? DefaultMuleEvent.copy(((MessagingException) exc).getEvent()) : DefaultMuleEvent.copy(muleEvent);
                if (muleEvent2.getMessage().getExceptionPayload() == null) {
                    muleEvent2.getMessage().setExceptionPayload(new DefaultExceptionPayload(exc));
                }
            } else if (logger.isDebugEnabled()) {
                logger.debug(String.format("route %d executed successfully for event %s", Integer.valueOf(i), muleEvent.getId()));
            }
            arrayList.add(muleEvent2);
        }
        return this.aggregationStrategy.aggregate(new AggregationContext(muleEvent, arrayList));
    }

    private Exception wrapInDispatchException(MuleEvent muleEvent, int i, MessageProcessor messageProcessor, Exception exc) {
        return new DispatchException(MessageFactory.createStaticMessage(String.format("route number %d failed to be executed", Integer.valueOf(i))), muleEvent, messageProcessor, exc);
    }

    private List<ProcessingMuleEventWork> executeWork(MuleEvent muleEvent) throws MuleException {
        ArrayList arrayList = new ArrayList(this.routes.size());
        try {
            Iterator<MessageProcessor> it = this.routes.iterator();
            while (it.hasNext()) {
                ProcessingMuleEventWork processingMuleEventWork = new ProcessingMuleEventWork(it.next(), muleEvent);
                this.workManager.scheduleWork(processingMuleEventWork);
                arrayList.add(processingMuleEventWork);
            }
            return arrayList;
        } catch (WorkException e) {
            throw new DefaultMuleException(MessageFactory.createStaticMessage("Could not schedule work for route"), e);
        }
    }

    @Override // org.mule.processor.AbstractMuleObjectOwner, org.mule.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        try {
            buildRouteChains();
            if (this.threadingProfile == null) {
                this.threadingProfile = this.muleContext.getDefaultThreadingProfile();
            }
            if (this.aggregationStrategy == null) {
                this.aggregationStrategy = new CollectAllAggregationStrategy();
            }
            if (this.timeout <= 0) {
                this.timeout = Long.MAX_VALUE;
            }
            this.workManager = this.threadingProfile.createWorkManager(ThreadNameHelper.getPrefix(this.muleContext) + "ScatterGatherWorkManager", this.muleContext.getConfiguration().getShutdownTimeout());
            super.initialise();
            this.initialised = true;
        } catch (Exception e) {
            throw new InitialisationException(e, this);
        }
    }

    @Override // org.mule.processor.AbstractMuleObjectOwner, org.mule.api.lifecycle.Startable
    public void start() throws MuleException {
        this.workManager.start();
        super.start();
    }

    @Override // org.mule.processor.AbstractMuleObjectOwner, org.mule.api.lifecycle.Disposable
    public void dispose() {
        try {
            try {
                this.workManager.dispose();
                super.dispose();
            } catch (Exception e) {
                logger.error("Exception found while tring to dispose work manager. Will continue with the disposal", (Throwable) e);
                super.dispose();
            }
        } catch (Throwable th) {
            super.dispose();
            throw th;
        }
    }

    @Override // org.mule.api.processor.MessageRouter
    public void addRoute(MessageProcessor messageProcessor) throws MuleException {
        checkNotInitialised();
        this.routes.add(messageProcessor);
    }

    @Override // org.mule.api.processor.MessageRouter
    public void removeRoute(MessageProcessor messageProcessor) throws MuleException {
        checkNotInitialised();
        this.routes.remove(messageProcessor);
    }

    private void buildRouteChains() throws MuleException {
        Preconditions.checkState(this.routes.size() > 1, "At least 2 routes are required for ScatterGather");
        this.routeChains = new ArrayList(this.routes.size());
        Iterator<MessageProcessor> it = this.routes.iterator();
        while (it.hasNext()) {
            this.routeChains.add(new DefaultMessageProcessorChainBuilder().chain(it.next()).build());
        }
    }

    private void checkNotInitialised() {
        Preconditions.checkState(!this.initialised, "<scatter-gather> router is not dynamic. Cannot modify routes after initialisation");
    }

    @Override // org.mule.processor.AbstractMessageProcessorOwner
    protected List<MessageProcessor> getOwnedMessageProcessors() {
        return this.routes;
    }

    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
        this.aggregationStrategy = aggregationStrategy;
    }

    public void setThreadingProfile(ThreadingProfile threadingProfile) {
        this.threadingProfile = threadingProfile;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void setRoutes(List<MessageProcessor> list) {
        this.routes = list;
    }
}
