package org.mule.routing.correlation;

import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.resource.spi.work.WorkException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.api.MessagingException;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.execution.ExecutionCallback;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.Startable;
import org.mule.api.lifecycle.Stoppable;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.routing.MessageInfoMapping;
import org.mule.api.routing.RoutingException;
import org.mule.api.service.Service;
import org.mule.api.store.ListableObjectStore;
import org.mule.api.store.ObjectAlreadyExistsException;
import org.mule.api.store.ObjectDoesNotExistException;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.store.ObjectStoreManager;
import org.mule.config.i18n.CoreMessages;
import org.mule.context.notification.RoutingNotification;
import org.mule.execution.ErrorHandlingExecutionTemplate;
import org.mule.routing.EventGroup;
import org.mule.routing.EventProcessingThread;
import org.mule.util.StringMessageUtils;
import org.mule.util.concurrent.ThreadNameHelper;
import org.mule.util.monitor.Expirable;
import org.mule.util.monitor.ExpiryMonitor;
import org.mule.util.store.DeserializationPostInitialisable;

/* loaded from: input_file:mule/lib/mule/mule-core-3.7.1.jar:org/mule/routing/correlation/EventCorrelator.class */
public class EventCorrelator implements Startable, Stoppable, Disposable {
    public static final String NO_CORRELATION_ID = "no-id";
    public static final int MAX_PROCESSED_GROUPS = 50000;
    private static final long ONE_DAY_IN_MILLI = 86400000;
    protected ListableObjectStore<EventGroup> eventGroups;
    protected ObjectStore<Long> processedGroups;
    private MessageInfoMapping messageInfoMapping;
    private MuleContext muleContext;
    private EventCorrelatorCallback callback;
    private MessageProcessor timeoutMessageProcessor;
    private ListableObjectStore<Long> expiredAndDispatchedGroups;
    private ExpiringGroupMonitoringThread expiringGroupMonitoringThread;
    private final String name;
    private final boolean persistentStores;
    private final String storePrefix;
    private final FlowConstruct flowConstruct;
    protected final transient Log logger = LogFactory.getLog(EventCorrelator.class);
    protected long groupTimeToLive = 86400000;
    protected final Object groupsLock = new Object();
    private long timeout = -1;
    private boolean failOnTimeout = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:mule/lib/mule/mule-core-3.7.1.jar:org/mule/routing/correlation/EventCorrelator$ExpiringGroupMonitoringThread.class */
    public final class ExpiringGroupMonitoringThread extends EventProcessingThread implements Expirable, Disposable {
        private ExpiryMonitor expiryMonitor;
        public static final long DELAY_TIME = 10;

        public ExpiringGroupMonitoringThread() {
            super(EventCorrelator.this.name, 10L);
            this.expiryMonitor = new ExpiryMonitor(EventCorrelator.this.name, 60000, EventCorrelator.this.muleContext, true);
            this.expiryMonitor.addExpirable(1800000L, TimeUnit.MILLISECONDS, this);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.mule.util.monitor.Expirable
        public void expired() {
            try {
                for (Serializable serializable : EventCorrelator.this.expiredAndDispatchedGroups.allKeys()) {
                    if (((Long) EventCorrelator.this.expiredAndDispatchedGroups.retrieve(serializable)).longValue() + EventCorrelator.this.groupTimeToLive < System.currentTimeMillis()) {
                        EventCorrelator.this.expiredAndDispatchedGroups.remove(serializable);
                        this.logger.warn(MessageFormat.format("Discarding group {0}", serializable));
                    }
                }
            } catch (ObjectStoreException e) {
                this.logger.warn("Expiration of objects failed due to ObjectStoreException " + e + ".");
            }
        }

        @Override // org.mule.routing.EventProcessingThread
        public void doRun() {
            if (EventCorrelator.this.muleContext.isPrimaryPollingInstance()) {
                ArrayList<EventGroup> arrayList = new ArrayList(1);
                try {
                    Iterator<Serializable> it = EventCorrelator.this.eventGroups.allKeys().iterator();
                    while (it.hasNext()) {
                        EventGroup eventGroup = EventCorrelator.this.getEventGroup(it.next());
                        if (eventGroup.getCreated() + EventCorrelator.this.getTimeout() < System.currentTimeMillis()) {
                            arrayList.add(eventGroup);
                        }
                    }
                } catch (ObjectStoreException e) {
                    this.logger.warn("expiry failed dues to ObjectStoreException " + e);
                }
                if (arrayList.size() > 0) {
                    for (final EventGroup eventGroup2 : arrayList) {
                        try {
                            ErrorHandlingExecutionTemplate.createErrorHandlingExecutionTemplate(EventCorrelator.this.muleContext, EventCorrelator.this.flowConstruct.getExceptionListener()).execute((ExecutionCallback) new ExecutionCallback<MuleEvent>() { // from class: org.mule.routing.correlation.EventCorrelator.ExpiringGroupMonitoringThread.1
                                /* JADX WARN: Can't rename method to resolve collision */
                                @Override // org.mule.api.execution.ExecutionCallback
                                public MuleEvent process() throws Exception {
                                    EventCorrelator.this.handleGroupExpiry(eventGroup2);
                                    return null;
                                }
                            });
                        } catch (MessagingException e2) {
                        } catch (Exception e3) {
                            EventCorrelator.this.muleContext.getExceptionListener().handleException(e3);
                        }
                    }
                }
            }
        }

        @Override // org.mule.api.lifecycle.Disposable
        public void dispose() {
            if (this.expiryMonitor != null) {
                this.expiryMonitor.dispose();
            }
        }
    }

    public EventCorrelator(EventCorrelatorCallback eventCorrelatorCallback, MessageProcessor messageProcessor, MessageInfoMapping messageInfoMapping, MuleContext muleContext, FlowConstruct flowConstruct, boolean z, String str) {
        this.processedGroups = null;
        this.expiredAndDispatchedGroups = null;
        if (eventCorrelatorCallback == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("EventCorrelatorCallback").getMessage());
        }
        if (messageInfoMapping == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MessageInfoMapping").getMessage());
        }
        if (muleContext == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MuleContext").getMessage());
        }
        this.callback = eventCorrelatorCallback;
        this.messageInfoMapping = messageInfoMapping;
        this.muleContext = muleContext;
        this.timeoutMessageProcessor = messageProcessor;
        this.persistentStores = z;
        this.storePrefix = str;
        this.name = String.format("%s%s.event.correlator", ThreadNameHelper.getPrefix(muleContext), flowConstruct.getName());
        ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER);
        this.expiredAndDispatchedGroups = (ListableObjectStore) objectStoreManager.getObjectStore(str + ".expiredAndDispatchedGroups", z);
        this.processedGroups = (ListableObjectStore) objectStoreManager.getObjectStore(str + ".processedGroups", z, 50000, -1, 1000);
        this.eventGroups = (ListableObjectStore) objectStoreManager.getObjectStore(str + ".eventGroups", z);
        this.flowConstruct = flowConstruct;
    }

    public void forceGroupExpiry(String str) throws MessagingException {
        try {
            if (this.eventGroups.retrieve(str) != null) {
                handleGroupExpiry(getEventGroup(str));
            } else {
                addProcessedGroup(str);
            }
        } catch (ObjectStoreException e) {
            throw new MessagingException((MuleEvent) null, e);
        }
    }

    public MuleEvent process(MuleEvent muleEvent) throws RoutingException {
        String correlationId = this.messageInfoMapping.getCorrelationId(muleEvent.getMessage());
        if (this.logger.isTraceEnabled()) {
            try {
                this.logger.trace(String.format("Received async reply message for correlationID: %s%n%s%n%s", correlationId, StringMessageUtils.truncate(StringMessageUtils.toString(muleEvent.getMessage().getPayload()), 200, false), StringMessageUtils.headersToString(muleEvent.getMessage())));
            } catch (Exception e) {
            }
        }
        if (correlationId == null || correlationId.equals(WorkException.INTERNAL)) {
            throw new RoutingException(CoreMessages.noCorrelationId(), muleEvent, this.timeoutMessageProcessor);
        }
        try {
            if (isGroupAlreadyProcessed(correlationId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. Correlation Id is: " + correlationId + ". Dropping event");
                }
                this.muleContext.fireNotification(new RoutingNotification(muleEvent.getMessage(), muleEvent.getMessageSourceURI().toString(), RoutingNotification.MISSED_AGGREGATION_GROUP_EVENT));
                return null;
            }
            try {
                EventGroup eventGroup = getEventGroup(correlationId);
                if (eventGroup == null) {
                    try {
                        eventGroup = addEventGroup(this.callback.createEventGroup(muleEvent, correlationId));
                    } catch (ObjectStoreException e2) {
                        throw new RoutingException(muleEvent, this.timeoutMessageProcessor, e2);
                    }
                }
                synchronized (this.groupsLock) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Adding event to aggregator group: " + correlationId);
                    }
                    try {
                        eventGroup.addEvent(muleEvent);
                        if (!this.callback.shouldAggregateEvents(eventGroup)) {
                            return null;
                        }
                        MuleEvent aggregateEvents = this.callback.aggregateEvents(eventGroup);
                        aggregateEvents.getMessage().setCorrelationId(correlationId);
                        String commonRootId = eventGroup.getCommonRootId();
                        if (commonRootId != null) {
                            aggregateEvents.getMessage().setMessageRootId(commonRootId);
                        }
                        try {
                            removeEventGroup(eventGroup);
                            eventGroup.clear();
                            return aggregateEvents;
                        } catch (ObjectStoreException e3) {
                            throw new RoutingException(muleEvent, this.timeoutMessageProcessor, e3);
                        }
                    } catch (ObjectStoreException e4) {
                        throw new RoutingException(muleEvent, this.timeoutMessageProcessor, e4);
                    }
                }
            } catch (ObjectStoreException e5) {
                throw new RoutingException(muleEvent, this.timeoutMessageProcessor, e5);
            }
        } catch (ObjectStoreException e6) {
            throw new RoutingException(muleEvent, this.timeoutMessageProcessor, e6);
        }
    }

    protected EventGroup getEventGroup(Serializable serializable) throws ObjectStoreException {
        try {
            EventGroup retrieve = this.eventGroups.retrieve(serializable);
            if (!retrieve.isInitialised()) {
                try {
                    DeserializationPostInitialisable.Implementation.init(retrieve, this.muleContext);
                } catch (Exception e) {
                    throw new ObjectStoreException(e);
                }
            }
            return retrieve;
        } catch (ObjectDoesNotExistException e2) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventGroup addEventGroup(EventGroup eventGroup) throws ObjectStoreException {
        try {
            this.eventGroups.store((Serializable) eventGroup.getGroupId(), eventGroup);
            return eventGroup;
        } catch (ObjectAlreadyExistsException e) {
            return getEventGroup((String) eventGroup.getGroupId());
        }
    }

    protected void removeEventGroup(EventGroup eventGroup) throws ObjectStoreException {
        Object groupId = eventGroup.getGroupId();
        this.eventGroups.remove((Serializable) groupId);
        addProcessedGroup(groupId);
    }

    protected void addProcessedGroup(Object obj) throws ObjectStoreException {
        synchronized (this.groupsLock) {
            this.processedGroups.store((Serializable) obj, Long.valueOf(System.currentTimeMillis()));
        }
    }

    protected boolean isGroupAlreadyProcessed(Object obj) throws ObjectStoreException {
        boolean contains;
        synchronized (this.groupsLock) {
            contains = this.processedGroups.contains((Serializable) obj);
        }
        return contains;
    }

    public boolean isFailOnTimeout() {
        return this.failOnTimeout;
    }

    public void setFailOnTimeout(boolean z) {
        this.failOnTimeout = z;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    protected void handleGroupExpiry(EventGroup eventGroup) throws MessagingException {
        try {
            removeEventGroup(eventGroup);
            if (isFailOnTimeout()) {
                try {
                    this.muleContext.fireNotification(new RoutingNotification(eventGroup.toMessageCollection(), null, RoutingNotification.CORRELATION_TIMEOUT));
                    MuleEvent messageCollectionEvent = eventGroup.getMessageCollectionEvent();
                    try {
                        eventGroup.clear();
                    } catch (ObjectStoreException e) {
                        this.logger.warn("Failed to clear group with id " + eventGroup.getGroupId() + " since underlying ObjectStore threw Exception:" + e.getMessage());
                    }
                    throw new CorrelationTimeoutException(CoreMessages.correlationTimedOut(eventGroup.getGroupId()), messageCollectionEvent);
                } catch (ObjectStoreException e2) {
                    throw new MessagingException(eventGroup.getMessageCollectionEvent(), e2);
                }
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(MessageFormat.format("Aggregator expired, but ''failOnTimeOut'' is false. Forwarding {0} events out of {1} total for group ID: {2}", Integer.valueOf(eventGroup.size()), Integer.valueOf(eventGroup.expectedSize()), eventGroup.getGroupId()));
            }
            try {
                if (eventGroup.getCreated() + this.groupTimeToLive >= System.currentTimeMillis()) {
                    MuleEvent aggregateEvents = this.callback.aggregateEvents(eventGroup);
                    eventGroup.clear();
                    aggregateEvents.getMessage().setCorrelationId(eventGroup.getGroupId().toString());
                    if (this.expiredAndDispatchedGroups.contains((Serializable) eventGroup.getGroupId())) {
                        this.logger.warn(MessageFormat.format("Discarding group {0}", eventGroup.getGroupId()));
                    } else {
                        if (this.timeoutMessageProcessor != null) {
                            this.timeoutMessageProcessor.process(aggregateEvents);
                        } else {
                            FlowConstruct flowConstruct = eventGroup.toArray(false)[0].getFlowConstruct();
                            if (!(flowConstruct instanceof Service)) {
                                throw new UnsupportedOperationException("EventAggregator is only supported with Service");
                            }
                            ((Service) flowConstruct).dispatchEvent(aggregateEvents);
                        }
                        this.expiredAndDispatchedGroups.store((Serializable) eventGroup.getGroupId(), Long.valueOf(eventGroup.getCreated()));
                    }
                }
            } catch (MessagingException e3) {
                throw e3;
            } catch (Exception e4) {
                throw new MessagingException(eventGroup.getMessageCollectionEvent(), e4);
            }
        } catch (ObjectStoreException e5) {
            throw new MessagingException(eventGroup.getMessageCollectionEvent(), e5);
        }
    }

    @Override // org.mule.api.lifecycle.Startable
    public void start() throws MuleException {
        this.logger.info("Starting event correlator: " + this.name);
        if (this.timeout != 0) {
            this.expiringGroupMonitoringThread = new ExpiringGroupMonitoringThread();
            this.expiringGroupMonitoringThread.start();
        }
    }

    @Override // org.mule.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        this.logger.info("Stopping event correlator: " + this.name);
        if (this.expiringGroupMonitoringThread != null) {
            this.expiringGroupMonitoringThread.stopProcessing();
        }
    }

    @Override // org.mule.api.lifecycle.Disposable
    public void dispose() {
        disposeIfDisposable(this.expiredAndDispatchedGroups);
        disposeIfDisposable(this.processedGroups);
        disposeIfDisposable(this.eventGroups);
        disposeIfDisposable(this.expiringGroupMonitoringThread);
    }

    private void disposeIfDisposable(Object obj) {
        if (obj == null || !(obj instanceof Disposable)) {
            return;
        }
        ((Disposable) obj).dispose();
    }
}
