package org.mule.routing.requestreply;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.DefaultMuleEvent;
import org.mule.OptimizedRequestContext;
import org.mule.RequestContext;
import org.mule.api.DefaultMuleException;
import org.mule.api.MessagingException;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessageCollection;
import org.mule.api.config.MuleProperties;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.construct.FlowConstructAware;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.lifecycle.Startable;
import org.mule.api.lifecycle.Stoppable;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.RequestReplyRequesterMessageProcessor;
import org.mule.api.routing.ResponseTimeoutException;
import org.mule.api.source.MessageSource;
import org.mule.api.store.ListableObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.store.ObjectStoreManager;
import org.mule.config.i18n.CoreMessages;
import org.mule.context.notification.RoutingNotification;
import org.mule.processor.AbstractInterceptingMessageProcessorBase;
import org.mule.routing.EventProcessingThread;
import org.mule.util.ObjectUtils;
import org.mule.util.concurrent.Latch;
import org.mule.util.concurrent.ThreadNameHelper;
import org.mule.util.store.DeserializationPostInitialisable;

/* loaded from: input_file:org/mule/routing/requestreply/AbstractAsyncRequestReplyRequester.class */
public abstract class AbstractAsyncRequestReplyRequester extends AbstractInterceptingMessageProcessorBase implements RequestReplyRequesterMessageProcessor, FlowConstructAware, Initialisable, Startable, Stoppable, Disposable {
    public static final int MAX_PROCESSED_GROUPS = 50000;
    public static final int UNCLAIMED_TIME_TO_LIVE = 60000;
    public static int UNCLAIMED_INTERVAL = UNCLAIMED_TIME_TO_LIVE;
    private static final Log logger = LogFactory.getLog(AbstractAsyncRequestReplyRequester.class);
    public static final String NAME_TEMPLATE = "%s.%s.%s.asyncReplies";
    protected String name;
    protected MessageSource replyMessageSource;
    protected FlowConstruct flowConstruct;
    private AsyncReplyMonitoringThread replyThread;
    protected ListableObjectStore store;
    protected volatile long timeout = -1;
    protected volatile boolean failOnTimeout = true;
    private final MessageProcessor internalAsyncReplyMessageProcessor = new InternalAsyncReplyMessageProcessor();
    protected final Map<String, RequestReplyLatch> locks = new ConcurrentHashMap();
    private String storePrefix = "";
    protected final ConcurrentMap<String, MuleEvent> responseEvents = new ConcurrentHashMap();
    protected final Object processedLock = new Object();
    protected final BoundedFifoBuffer processed = new BoundedFifoBuffer(50000);

    /* loaded from: input_file:org/mule/routing/requestreply/AbstractAsyncRequestReplyRequester$AsyncReplyMonitoringThread.class */
    private class AsyncReplyMonitoringThread extends EventProcessingThread {
        AsyncReplyMonitoringThread(String str) {
            super(str, 100L);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.mule.routing.EventProcessingThread
        protected void doRun() {
            try {
                List<Serializable> allKeys = AbstractAsyncRequestReplyRequester.this.store.allKeys();
                this.logger.debug("Found " + allKeys.size() + " objects in store");
                Iterator<Serializable> it = allKeys.iterator();
                while (it.hasNext()) {
                    try {
                        boolean z = false;
                        String str = (String) it.next();
                        MultipleRequestReplierEvent multipleRequestReplierEvent = (MultipleRequestReplierEvent) AbstractAsyncRequestReplyRequester.this.store.retrieve(str);
                        if (AbstractAsyncRequestReplyRequester.this.isAlreadyProcessed(new ProcessedEvents(str, EndReason.FINISHED_BY_TIMEOUT))) {
                            z = true;
                            MuleEvent event = multipleRequestReplierEvent.getEvent();
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("An event was received for an event group that has already been processed, this is because the async-reply timed out. Correlation Id is: " + str + ". Dropping event");
                            }
                            event.getMuleContext().fireNotification(new RoutingNotification(event.getMessage(), event.getMessageSourceURI().toString(), RoutingNotification.MISSED_ASYNC_REPLY));
                        } else {
                            RequestReplyLatch requestReplyLatch = AbstractAsyncRequestReplyRequester.this.locks.get(str);
                            if (requestReplyLatch != null) {
                                if (AbstractAsyncRequestReplyRequester.this.responseEvents.putIfAbsent(str, retrieveEvent(str)) != null) {
                                    throw new IllegalStateException("Detected duplicate result message with id: " + str);
                                    break;
                                }
                                if (!requestReplyLatch.isSequenceEvent()) {
                                    AbstractAsyncRequestReplyRequester.this.addProcessed(new ProcessedEvents(str));
                                    z = true;
                                } else if (requestReplyLatch.isLastEvent()) {
                                    AbstractAsyncRequestReplyRequester.this.addProcessed(new ProcessedEvents(str));
                                    z = true;
                                }
                                requestReplyLatch.countDown();
                                multipleRequestReplierEvent.removeEvent();
                            }
                        }
                        if (z) {
                            AbstractAsyncRequestReplyRequester.this.store.remove(str);
                        }
                    } catch (Exception e) {
                        this.logger.debug("Error processing async replies", e);
                    }
                }
            } catch (Exception e2) {
                this.logger.debug("Error processing async replies", e2);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private MuleEvent retrieveEvent(String str) throws ObjectStoreException, DefaultMuleException {
            MuleEvent event = ((MultipleRequestReplierEvent) AbstractAsyncRequestReplyRequester.this.store.retrieve(str)).getEvent();
            if (event.getMuleContext() == null) {
                try {
                    DeserializationPostInitialisable.Implementation.init(event, AbstractAsyncRequestReplyRequester.this.muleContext);
                } catch (Exception e) {
                    throw new DefaultMuleException(e);
                }
            }
            return event;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/routing/requestreply/AbstractAsyncRequestReplyRequester$EndReason.class */
    public enum EndReason {
        PROCESSED,
        FINISHED_BY_TIMEOUT
    }

    /* loaded from: input_file:org/mule/routing/requestreply/AbstractAsyncRequestReplyRequester$InternalAsyncReplyMessageProcessor.class */
    class InternalAsyncReplyMessageProcessor implements MessageProcessor {
        InternalAsyncReplyMessageProcessor() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.mule.api.processor.MessageProcessor
        public MuleEvent process(MuleEvent muleEvent) throws MuleException {
            String asyncReplyCorrelationId = AbstractAsyncRequestReplyRequester.this.getAsyncReplyCorrelationId(muleEvent);
            RequestReplyLatch requestReplyLatch = AbstractAsyncRequestReplyRequester.this.locks.get(asyncReplyCorrelationId);
            if (requestReplyLatch != null && requestReplyLatch.isSequenceEvent() && AbstractAsyncRequestReplyRequester.this.store.contains(asyncReplyCorrelationId)) {
                ((MultipleRequestReplierEvent) AbstractAsyncRequestReplyRequester.this.store.retrieve(asyncReplyCorrelationId)).addEvent(muleEvent);
            } else {
                MultipleRequestReplierEvent multipleRequestReplierEvent = new MultipleRequestReplierEvent();
                multipleRequestReplierEvent.addEvent(muleEvent);
                AbstractAsyncRequestReplyRequester.this.store.store(asyncReplyCorrelationId, multipleRequestReplierEvent);
            }
            AbstractAsyncRequestReplyRequester.this.replyThread.processNow();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/routing/requestreply/AbstractAsyncRequestReplyRequester$ProcessedEvents.class */
    public class ProcessedEvents {
        private String id;
        private EndReason endReason;

        public ProcessedEvents(String str, EndReason endReason) {
            this.id = str;
            this.endReason = endReason;
        }

        public ProcessedEvents(String str) {
            this.id = str;
            this.endReason = EndReason.PROCESSED;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProcessedEvents processedEvents = (ProcessedEvents) obj;
            return this.id.equals(processedEvents.id) && this.endReason == processedEvents.endReason;
        }

        public int hashCode() {
            return (31 * this.id.hashCode()) + this.endReason.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/routing/requestreply/AbstractAsyncRequestReplyRequester$RequestReplyLatch.class */
    public class RequestReplyLatch {
        private final int groupSize;
        private final int correlationSequence;
        private final Latch latch;

        private RequestReplyLatch(int i, int i2) {
            this.latch = AbstractAsyncRequestReplyRequester.this.createEventLock();
            this.groupSize = i;
            this.correlationSequence = i2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isSequenceEvent() {
            return this.groupSize != -1;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void countDown() {
            this.latch.countDown();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isLastEvent() {
            return this.groupSize == this.correlationSequence;
        }
    }

    @Override // org.mule.api.processor.MessageProcessor
    public MuleEvent process(MuleEvent muleEvent) throws MuleException {
        if (this.replyMessageSource == null) {
            return processNext(muleEvent);
        }
        addLock(muleEvent);
        sendAsyncRequest(muleEvent);
        MuleEvent receiveAsyncReply = receiveAsyncReply(muleEvent);
        if (receiveAsyncReply != null) {
            if (receiveAsyncReply.getMessage().getInboundProperty(MuleProperties.MULE_SESSION_PROPERTY) != null) {
                muleEvent.getSession().merge(receiveAsyncReply.getSession());
            }
            receiveAsyncReply = RequestContext.setEvent(new DefaultMuleEvent(receiveAsyncReply.getMessage(), muleEvent));
        }
        return receiveAsyncReply;
    }

    protected Latch createEventLock() {
        return new Latch();
    }

    private void addLock(MuleEvent muleEvent) {
        this.locks.put(getAsyncReplyCorrelationId(muleEvent), new RequestReplyLatch(muleEvent.getMessage().getCorrelationGroupSize(), muleEvent.getMessage().getCorrelationSequence()));
    }

    private Latch getLatch(String str) {
        return this.locks.get(str).latch;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void setFailOnTimeout(boolean z) {
        this.failOnTimeout = z;
    }

    @Override // org.mule.api.processor.RequestReplyRequesterMessageProcessor
    public void setReplySource(MessageSource messageSource) {
        verifyReplyMessageSource(messageSource);
        this.replyMessageSource = messageSource;
        messageSource.setListener(this.internalAsyncReplyMessageProcessor);
    }

    @Override // org.mule.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        Object[] objArr = new Object[3];
        objArr[0] = this.storePrefix;
        objArr[1] = ThreadNameHelper.getPrefix(this.muleContext);
        objArr[2] = this.flowConstruct == null ? "" : this.flowConstruct.getName();
        this.name = String.format(NAME_TEMPLATE, objArr);
        this.store = (ListableObjectStore) ((ObjectStoreManager) this.muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).getObjectStore(this.name, false, 50000, UNCLAIMED_TIME_TO_LIVE, UNCLAIMED_INTERVAL);
    }

    @Override // org.mule.api.lifecycle.Startable
    public void start() throws MuleException {
        this.replyThread = new AsyncReplyMonitoringThread(this.name);
        this.replyThread.start();
    }

    @Override // org.mule.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        if (this.replyThread != null) {
            this.replyThread.stopProcessing();
        }
    }

    @Override // org.mule.api.lifecycle.Disposable
    public void dispose() {
        if (this.store != null) {
            try {
                ((ObjectStoreManager) this.muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).disposeStore(this.store);
            } catch (ObjectStoreException e) {
                logger.debug("Exception disposingg of store", e);
            }
        }
    }

    public void setStorePrefix(String str) {
        this.storePrefix = str;
    }

    protected void verifyReplyMessageSource(MessageSource messageSource) {
    }

    protected String getAsyncReplyCorrelationId(MuleEvent muleEvent) {
        return muleEvent.getMessage() instanceof MuleMessageCollection ? muleEvent.getMessage().getCorrelationId() : muleEvent.getFlowConstruct().getMessageInfoMapping().getCorrelationId(muleEvent.getMessage());
    }

    protected void sendAsyncRequest(MuleEvent muleEvent) throws MuleException {
        processNext(muleEvent);
    }

    protected MuleEvent receiveAsyncReply(MuleEvent muleEvent) throws MessagingException {
        MuleEvent remove;
        String asyncReplyCorrelationId = getAsyncReplyCorrelationId(muleEvent);
        Latch latch = getLatch(asyncReplyCorrelationId);
        boolean z = false;
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Waiting for async reply message with id: " + asyncReplyCorrelationId);
            }
            if (this.timeout <= 0) {
                latch.await();
                z = true;
            } else {
                z = latch.await(this.timeout, TimeUnit.MILLISECONDS);
            }
            if (!z) {
                postLatchAwait(asyncReplyCorrelationId);
                latch.await(1000L, TimeUnit.MILLISECONDS);
                z = latch.getCount() == 0;
            }
            this.locks.remove(asyncReplyCorrelationId);
            remove = this.responseEvents.remove(asyncReplyCorrelationId);
            if (0 != 0) {
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (InterruptedException e) {
            this.locks.remove(asyncReplyCorrelationId);
            remove = this.responseEvents.remove(asyncReplyCorrelationId);
            if (1 != 0) {
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (Throwable th) {
            this.locks.remove(asyncReplyCorrelationId);
            this.responseEvents.remove(asyncReplyCorrelationId);
            if (0 == 0) {
                throw th;
            }
            Thread.currentThread().interrupt();
            return null;
        }
        if (z) {
            if (remove == null) {
                throw new IllegalStateException("Response MuleEvent is null");
            }
            return OptimizedRequestContext.criticalSetEvent(remove);
        }
        addProcessed(new ProcessedEvents(asyncReplyCorrelationId, EndReason.FINISHED_BY_TIMEOUT));
        if (!this.failOnTimeout) {
            return null;
        }
        muleEvent.getMuleContext().fireNotification(new RoutingNotification(muleEvent.getMessage(), null, RoutingNotification.ASYNC_REPLY_TIMEOUT));
        throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId((int) this.timeout, asyncReplyCorrelationId), muleEvent, null);
    }

    protected void postLatchAwait(String str) throws MessagingException {
    }

    protected void addProcessed(Object obj) {
        synchronized (this.processedLock) {
            if (this.processed.isFull()) {
                this.processed.remove();
            }
            this.processed.add(obj);
        }
    }

    protected boolean isAlreadyProcessed(Object obj) {
        boolean contains;
        synchronized (this.processedLock) {
            contains = this.processed.contains(obj);
        }
        return contains;
    }

    @Override // org.mule.processor.AbstractInterceptingMessageProcessorBase
    public String toString() {
        return ObjectUtils.toString(this);
    }

    @Override // org.mule.api.construct.FlowConstructAware
    public void setFlowConstruct(FlowConstruct flowConstruct) {
        this.flowConstruct = flowConstruct;
    }
}
