/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.routing.requestreply;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.meta.AbstractAnnotatedObject;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.FlowConstructAware;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.RequestReplyRequesterMessageProcessor;
import org.mule.runtime.core.api.routing.ResponseTimeoutException;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.store.ListableObjectStore;
import org.mule.runtime.core.api.util.ObjectUtils;
import org.mule.runtime.core.api.util.concurrent.Latch;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.context.notification.RoutingNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalMessage;
import org.mule.runtime.core.processor.AbstractInterceptingMessageProcessorBase;
import org.mule.runtime.core.util.store.DeserializationPostInitialisable;

public abstract class AbstractAsyncRequestReplyRequester
extends AbstractInterceptingMessageProcessorBase
implements RequestReplyRequesterMessageProcessor,
FlowConstructAware,
Initialisable,
Startable,
Stoppable,
Disposable {
    public static final int MAX_PROCESSED_GROUPS = 50000;
    public static final int UNCLAIMED_TIME_TO_LIVE = 60000;
    public static int UNCLAIMED_INTERVAL = 60000;
    public static final String NAME_TEMPLATE = "%s.%s.%s.asyncReplies";
    protected String name;
    protected volatile long timeout = -1L;
    protected volatile boolean failOnTimeout = true;
    protected MessageSource replyMessageSource;
    protected FlowConstruct flowConstruct;
    private final Processor internalAsyncReplyMessageProcessor = new InternalAsyncReplyMessageProcessor();
    private Scheduler scheduler;
    private AsyncReplyMonitoringRunnable replyRunnable;
    protected final Map<String, Latch> locks = new ConcurrentHashMap<String, Latch>();
    private String storePrefix = "";
    protected final ConcurrentMap<String, Event> responseEvents = new ConcurrentHashMap<String, Event>();
    protected final Object processedLock = new Object();
    private final BoundedFifoBuffer processed = new BoundedFifoBuffer(50000);
    protected ListableObjectStore store;

    @Override
    public Event process(Event event) throws MuleException {
        if (this.replyMessageSource == null) {
            return this.processNext(event);
        }
        this.locks.put(this.getAsyncReplyCorrelationId(event), this.createEventLock());
        this.sendAsyncRequest(event);
        Event resultEvent = this.receiveAsyncReply(event);
        if (resultEvent != null) {
            if (((InternalMessage)resultEvent.getMessage()).getInboundProperty("MULE_SESSION") != null) {
                event.getSession().merge(resultEvent.getSession());
            }
            resultEvent = Event.builder(event).message(resultEvent.getMessage()).build();
            Event.setCurrentEvent(resultEvent);
        }
        return resultEvent;
    }

    protected Latch createEventLock() {
        return new Latch();
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public void setFailOnTimeout(boolean failOnTimeout) {
        this.failOnTimeout = failOnTimeout;
    }

    @Override
    public void setReplySource(MessageSource messageSource) {
        this.verifyReplyMessageSource(messageSource);
        this.replyMessageSource = messageSource;
        messageSource.setListener(this.internalAsyncReplyMessageProcessor);
    }

    @Override
    public void initialise() throws InitialisationException {
        this.name = String.format(NAME_TEMPLATE, this.storePrefix, this.muleContext.getConfiguration().getId(), this.flowConstruct == null ? "" : this.flowConstruct.getName());
        this.store = (ListableObjectStore)((ObjectStoreManager)this.muleContext.getRegistry().get("_muleObjectStoreManager")).getObjectStore(this.name, false, 50000, 60000L, UNCLAIMED_INTERVAL);
    }

    @Override
    public void start() throws MuleException {
        this.scheduler = this.muleContext.getSchedulerService().customScheduler(this.muleContext.getSchedulerBaseConfig().withName(this.name).withMaxConcurrentTasks(1).withShutdownTimeout(0L, TimeUnit.MILLISECONDS));
        this.replyRunnable = new AsyncReplyMonitoringRunnable();
        this.scheduler.scheduleWithFixedDelay(this.replyRunnable, 0L, 100L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() throws MuleException {
        this.scheduler.stop();
    }

    @Override
    public void dispose() {
        if (this.store != null) {
            try {
                ((ObjectStoreManager)this.muleContext.getRegistry().get("_muleObjectStoreManager")).disposeStore(this.store);
            }
            catch (ObjectStoreException e) {
                this.logger.debug("Exception disposingg of store", (Throwable)e);
            }
        }
    }

    public void setStorePrefix(String storePrefix) {
        this.storePrefix = storePrefix;
    }

    protected void verifyReplyMessageSource(MessageSource messageSource) {
    }

    protected String getAsyncReplyCorrelationId(Event event) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(event.getContext().getCorrelationId());
        event.getGroupCorrelation().getSequence().ifPresent(v -> stringBuilder.append("-" + v));
        return stringBuilder.toString();
    }

    protected void sendAsyncRequest(Event event) throws MuleException {
        this.processNext(event);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Event receiveAsyncReply(Event event) throws MuleException {
        Event result;
        boolean resultAvailable;
        String asyncReplyCorrelationId;
        block12: {
            asyncReplyCorrelationId = this.getAsyncReplyCorrelationId(event);
            System.out.println("receiveAsyncReply: " + asyncReplyCorrelationId);
            Latch asyncReplyLatch = this.locks.get(asyncReplyCorrelationId);
            boolean interruptedWhileWaiting = false;
            resultAvailable = false;
            result = null;
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Waiting for async reply message with id: " + asyncReplyCorrelationId);
                }
                if (this.timeout <= 0L) {
                    asyncReplyLatch.await();
                    resultAvailable = true;
                } else {
                    resultAvailable = asyncReplyLatch.await(this.timeout, TimeUnit.MILLISECONDS);
                }
                if (!resultAvailable) {
                    this.postLatchAwait(asyncReplyCorrelationId);
                    asyncReplyLatch.await(1000L, TimeUnit.MILLISECONDS);
                    resultAvailable = asyncReplyLatch.getCount() == 0L;
                }
            }
            catch (InterruptedException e) {
                interruptedWhileWaiting = true;
                return interruptedWhileWaiting;
            }
            finally {
                this.locks.remove(asyncReplyCorrelationId);
                result = (Event)this.responseEvents.remove(asyncReplyCorrelationId);
                if (!interruptedWhileWaiting) break block12;
                Thread.currentThread().interrupt();
                return null;
            }
        }
        if (resultAvailable) {
            if (result == null) {
                throw new IllegalStateException("Response MuleEvent is null");
            }
            Event.setCurrentEvent(result);
            return result;
        }
        this.addProcessed(asyncReplyCorrelationId);
        if (this.failOnTimeout) {
            this.muleContext.fireNotification(new RoutingNotification(event.getMessage(), null, 1302));
            throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId((int)this.timeout, asyncReplyCorrelationId), null);
        }
        return null;
    }

    protected void postLatchAwait(String asyncReplyCorrelationId) throws MessagingException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addProcessed(Object id) {
        Object object = this.processedLock;
        synchronized (object) {
            if (this.processed.isFull()) {
                this.processed.remove();
            }
            this.processed.add(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean isAlreadyProcessed(Object id) {
        Object object = this.processedLock;
        synchronized (object) {
            return this.processed.contains(id);
        }
    }

    @Override
    public String toString() {
        return ObjectUtils.toString(this);
    }

    @Override
    public void setFlowConstruct(FlowConstruct flowConstruct) {
        this.flowConstruct = flowConstruct;
    }

    @Override
    public ReactiveProcessor.ProcessingType getProcessingType() {
        return ReactiveProcessor.ProcessingType.BLOCKING;
    }

    private class AsyncReplyMonitoringRunnable
    implements Runnable {
        private AsyncReplyMonitoringRunnable() {
        }

        @Override
        public void run() {
            try {
                List<Serializable> ids = AbstractAsyncRequestReplyRequester.this.store.allKeys();
                AbstractAsyncRequestReplyRequester.this.logger.debug("Found " + ids.size() + " objects in store");
                for (Serializable id : ids) {
                    try {
                        boolean deleteEvent = false;
                        String correlationId = (String)((Object)id);
                        if (AbstractAsyncRequestReplyRequester.this.isAlreadyProcessed(correlationId)) {
                            deleteEvent = true;
                            Event event = (Event)AbstractAsyncRequestReplyRequester.this.store.retrieve((Serializable)((Object)correlationId));
                            if (AbstractAsyncRequestReplyRequester.this.logger.isDebugEnabled()) {
                                AbstractAsyncRequestReplyRequester.this.logger.debug("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. GroupCorrelation Id is: " + correlationId + ". Dropping event");
                            }
                            AbstractAsyncRequestReplyRequester.this.muleContext.fireNotification(new RoutingNotification(event.getMessage(), event.getContext().getOriginatingConnectorName(), 1301));
                        } else {
                            Latch l = AbstractAsyncRequestReplyRequester.this.locks.get(correlationId);
                            if (l != null) {
                                Event event = this.retrieveEvent(correlationId);
                                Event previousResult = AbstractAsyncRequestReplyRequester.this.responseEvents.putIfAbsent(correlationId, event);
                                if (previousResult != null) {
                                    throw new IllegalStateException("Detected duplicate result message with id: " + correlationId);
                                }
                                AbstractAsyncRequestReplyRequester.this.addProcessed(correlationId);
                                deleteEvent = true;
                                l.countDown();
                            }
                        }
                        if (!deleteEvent) continue;
                        AbstractAsyncRequestReplyRequester.this.store.remove((Serializable)((Object)correlationId));
                    }
                    catch (Exception ex) {
                        AbstractAsyncRequestReplyRequester.this.logger.debug("Error processing async replies", (Throwable)ex);
                    }
                }
            }
            catch (Exception ex) {
                AbstractAsyncRequestReplyRequester.this.logger.debug("Error processing async replies", (Throwable)ex);
            }
        }

        private Event retrieveEvent(String correlationId) throws ObjectStoreException, DefaultMuleException {
            Event event = (Event)AbstractAsyncRequestReplyRequester.this.store.retrieve((Serializable)((Object)correlationId));
            if (event.getFlowConstruct() == null) {
                try {
                    DeserializationPostInitialisable.Implementation.init(event, AbstractAsyncRequestReplyRequester.this.muleContext);
                }
                catch (Exception e) {
                    throw new DefaultMuleException(e);
                }
            }
            return event;
        }
    }

    class InternalAsyncReplyMessageProcessor
    extends AbstractAnnotatedObject
    implements Processor {
        InternalAsyncReplyMessageProcessor() {
        }

        @Override
        public Event process(Event event) throws MuleException {
            AbstractAsyncRequestReplyRequester.this.store.store((Serializable)((Object)AbstractAsyncRequestReplyRequester.this.getAsyncReplyCorrelationId(event)), event);
            AbstractAsyncRequestReplyRequester.this.replyRunnable.run();
            return null;
        }
    }
}

