package org.mule.runtime.core.routing.requestreply;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.core.AbstractAnnotatedObject;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.config.MuleProperties;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.FlowConstructAware;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.RequestReplyRequesterMessageProcessor;
import org.mule.runtime.core.api.routing.ResponseTimeoutException;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.store.ListableObjectStore;
import org.mule.runtime.core.api.store.ObjectStoreException;
import org.mule.runtime.core.api.store.ObjectStoreManager;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.context.notification.RoutingNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.processor.AbstractInterceptingMessageProcessorBase;
import org.mule.runtime.core.routing.EventProcessingThread;
import org.mule.runtime.core.util.ObjectUtils;
import org.mule.runtime.core.util.StringUtils;
import org.mule.runtime.core.util.concurrent.Latch;
import org.mule.runtime.core.util.concurrent.ThreadNameHelper;
import org.mule.runtime.core.util.store.DeserializationPostInitialisable;

/* loaded from: input_file:org/mule/runtime/core/routing/requestreply/AbstractAsyncRequestReplyRequester.class */
public abstract class AbstractAsyncRequestReplyRequester extends AbstractInterceptingMessageProcessorBase implements RequestReplyRequesterMessageProcessor, FlowConstructAware, Initialisable, Startable, Stoppable, Disposable {
    public static final int MAX_PROCESSED_GROUPS = 50000;
    public static final int UNCLAIMED_TIME_TO_LIVE = 60000;
    public static int UNCLAIMED_INTERVAL = UNCLAIMED_TIME_TO_LIVE;
    public static final String NAME_TEMPLATE = "%s.%s.%s.asyncReplies";
    protected String name;
    protected MessageSource replyMessageSource;
    protected FlowConstruct flowConstruct;
    private AsyncReplyMonitoringThread replyThread;
    protected ListableObjectStore store;
    protected volatile long timeout = -1;
    protected volatile boolean failOnTimeout = true;
    private final Processor internalAsyncReplyMessageProcessor = new InternalAsyncReplyMessageProcessor();
    protected final Map<String, Latch> locks = new ConcurrentHashMap();
    private String storePrefix = "";
    protected final ConcurrentMap<String, Event> responseEvents = new ConcurrentHashMap();
    protected final Object processedLock = new Object();
    protected final BoundedFifoBuffer processed = new BoundedFifoBuffer(50000);

    /* loaded from: input_file:org/mule/runtime/core/routing/requestreply/AbstractAsyncRequestReplyRequester$AsyncReplyMonitoringThread.class */
    private class AsyncReplyMonitoringThread extends EventProcessingThread {
        AsyncReplyMonitoringThread(String str) {
            super(str, 100L);
        }

        @Override // org.mule.runtime.core.routing.EventProcessingThread
        protected void doRun() {
            try {
                List<Serializable> allKeys = AbstractAsyncRequestReplyRequester.this.store.allKeys();
                this.logger.debug("Found " + allKeys.size() + " objects in store");
                Iterator<Serializable> it = allKeys.iterator();
                while (it.hasNext()) {
                    try {
                        boolean z = false;
                        String str = (String) it.next();
                        if (AbstractAsyncRequestReplyRequester.this.isAlreadyProcessed(str)) {
                            z = true;
                            Event event = (Event) AbstractAsyncRequestReplyRequester.this.store.retrieve(str);
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. GroupCorrelation Id is: " + str + ". Dropping event");
                            }
                            AbstractAsyncRequestReplyRequester.this.muleContext.fireNotification(new RoutingNotification(event.getMessage(), event.getContext().getOriginatingConnectorName(), RoutingNotification.MISSED_ASYNC_REPLY));
                        } else {
                            Latch latch = AbstractAsyncRequestReplyRequester.this.locks.get(str);
                            if (latch != null) {
                                if (AbstractAsyncRequestReplyRequester.this.responseEvents.putIfAbsent(str, retrieveEvent(str)) != null) {
                                    throw new IllegalStateException("Detected duplicate result message with id: " + str);
                                    break;
                                } else {
                                    AbstractAsyncRequestReplyRequester.this.addProcessed(str);
                                    z = true;
                                    latch.countDown();
                                }
                            }
                        }
                        if (z) {
                            AbstractAsyncRequestReplyRequester.this.store.remove(str);
                        }
                    } catch (Exception e) {
                        this.logger.debug("Error processing async replies", e);
                    }
                }
            } catch (Exception e2) {
                this.logger.debug("Error processing async replies", e2);
            }
        }

        private Event retrieveEvent(String str) throws ObjectStoreException, DefaultMuleException {
            Event event = (Event) AbstractAsyncRequestReplyRequester.this.store.retrieve(str);
            if (event.getFlowConstruct() == null) {
                try {
                    DeserializationPostInitialisable.Implementation.init(event, AbstractAsyncRequestReplyRequester.this.muleContext);
                } catch (Exception e) {
                    throw new DefaultMuleException(e);
                }
            }
            return event;
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/routing/requestreply/AbstractAsyncRequestReplyRequester$InternalAsyncReplyMessageProcessor.class */
    class InternalAsyncReplyMessageProcessor extends AbstractAnnotatedObject implements Processor {
        InternalAsyncReplyMessageProcessor() {
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public Event process(Event event) throws MuleException {
            AbstractAsyncRequestReplyRequester.this.store.store(AbstractAsyncRequestReplyRequester.this.getAsyncReplyCorrelationId(event), event);
            AbstractAsyncRequestReplyRequester.this.replyThread.processNow();
            return null;
        }
    }

    @Override // org.mule.runtime.core.api.processor.Processor
    public Event process(Event event) throws MuleException {
        if (this.replyMessageSource == null) {
            return processNext(event);
        }
        this.locks.put(getAsyncReplyCorrelationId(event), createEventLock());
        sendAsyncRequest(event);
        Event receiveAsyncReply = receiveAsyncReply(event);
        if (receiveAsyncReply != null) {
            if (receiveAsyncReply.getMessage().getInboundProperty(MuleProperties.MULE_SESSION_PROPERTY) != null) {
                event.getSession().merge(receiveAsyncReply.getSession());
            }
            receiveAsyncReply = Event.builder(event).message(receiveAsyncReply.getMessage()).build();
            Event.setCurrentEvent(receiveAsyncReply);
        }
        return receiveAsyncReply;
    }

    protected Latch createEventLock() {
        return new Latch();
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void setFailOnTimeout(boolean z) {
        this.failOnTimeout = z;
    }

    @Override // org.mule.runtime.core.api.processor.RequestReplyRequesterMessageProcessor
    public void setReplySource(MessageSource messageSource) {
        verifyReplyMessageSource(messageSource);
        this.replyMessageSource = messageSource;
        messageSource.setListener(this.internalAsyncReplyMessageProcessor);
    }

    public void initialise() throws InitialisationException {
        Object[] objArr = new Object[3];
        objArr[0] = this.storePrefix;
        objArr[1] = ThreadNameHelper.getPrefix(this.muleContext);
        objArr[2] = this.flowConstruct == null ? "" : this.flowConstruct.getName();
        this.name = String.format(NAME_TEMPLATE, objArr);
        this.store = (ListableObjectStore) ((ObjectStoreManager) this.muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).getObjectStore(this.name, false, 50000, UNCLAIMED_TIME_TO_LIVE, UNCLAIMED_INTERVAL);
    }

    public void start() throws MuleException {
        this.replyThread = new AsyncReplyMonitoringThread(this.name);
        this.replyThread.start();
    }

    public void stop() throws MuleException {
        if (this.replyThread != null) {
            this.replyThread.stopProcessing();
        }
    }

    public void dispose() {
        if (this.store != null) {
            try {
                ((ObjectStoreManager) this.muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).disposeStore(this.store);
            } catch (ObjectStoreException e) {
                this.logger.debug("Exception disposingg of store", e);
            }
        }
    }

    public void setStorePrefix(String str) {
        this.storePrefix = str;
    }

    protected void verifyReplyMessageSource(MessageSource messageSource) {
    }

    protected String getAsyncReplyCorrelationId(Event event) {
        StringBuilder sb = new StringBuilder();
        sb.append(event.getContext().getCorrelationId());
        event.getGroupCorrelation().getSequence().ifPresent(num -> {
            sb.append(StringUtils.DASH + num);
        });
        return sb.toString();
    }

    protected void sendAsyncRequest(Event event) throws MuleException {
        processNext(event);
    }

    protected Event receiveAsyncReply(Event event) throws MuleException {
        Event remove;
        String asyncReplyCorrelationId = getAsyncReplyCorrelationId(event);
        System.out.println("receiveAsyncReply: " + asyncReplyCorrelationId);
        Latch latch = this.locks.get(asyncReplyCorrelationId);
        boolean z = false;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Waiting for async reply message with id: " + asyncReplyCorrelationId);
            }
            if (this.timeout <= 0) {
                latch.await();
                z = true;
            } else {
                z = latch.await(this.timeout, TimeUnit.MILLISECONDS);
            }
            if (!z) {
                postLatchAwait(asyncReplyCorrelationId);
                latch.await(1000L, TimeUnit.MILLISECONDS);
                z = latch.getCount() == 0;
            }
            this.locks.remove(asyncReplyCorrelationId);
            remove = this.responseEvents.remove(asyncReplyCorrelationId);
            if (0 != 0) {
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (InterruptedException e) {
            this.locks.remove(asyncReplyCorrelationId);
            remove = this.responseEvents.remove(asyncReplyCorrelationId);
            if (1 != 0) {
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (Throwable th) {
            this.locks.remove(asyncReplyCorrelationId);
            this.responseEvents.remove(asyncReplyCorrelationId);
            if (0 == 0) {
                throw th;
            }
            Thread.currentThread().interrupt();
            return null;
        }
        if (z) {
            if (remove == null) {
                throw new IllegalStateException("Response MuleEvent is null");
            }
            Event.setCurrentEvent(remove);
            return remove;
        }
        addProcessed(asyncReplyCorrelationId);
        if (!this.failOnTimeout) {
            return null;
        }
        this.muleContext.fireNotification(new RoutingNotification(event.getMessage(), null, RoutingNotification.ASYNC_REPLY_TIMEOUT));
        throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId((int) this.timeout, asyncReplyCorrelationId), null);
    }

    protected void postLatchAwait(String str) throws MessagingException {
    }

    protected void addProcessed(Object obj) {
        synchronized (this.processedLock) {
            if (this.processed.isFull()) {
                this.processed.remove();
            }
            this.processed.add(obj);
        }
    }

    protected boolean isAlreadyProcessed(Object obj) {
        boolean contains;
        synchronized (this.processedLock) {
            contains = this.processed.contains(obj);
        }
        return contains;
    }

    @Override // org.mule.runtime.core.processor.AbstractInterceptingMessageProcessorBase
    public String toString() {
        return ObjectUtils.toString(this);
    }

    @Override // org.mule.runtime.core.processor.AbstractInterceptingMessageProcessorBase, org.mule.runtime.core.api.construct.FlowConstructAware
    public void setFlowConstruct(FlowConstruct flowConstruct) {
        this.flowConstruct = flowConstruct;
    }

    @Override // org.mule.runtime.core.api.processor.ReactiveProcessor
    public ReactiveProcessor.ProcessingType getProcessingType() {
        return ReactiveProcessor.ProcessingType.BLOCKING;
    }
}
