package org.mule.routing;

import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.VoidMuleEvent;
import org.mule.api.MessagingException;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.MuleRuntimeException;
import org.mule.api.context.notification.MuleContextNotificationListener;
import org.mule.api.exception.MessagingExceptionHandler;
import org.mule.api.exception.MessagingExceptionHandlerAware;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.Initialisable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.lifecycle.Startable;
import org.mule.api.lifecycle.Stoppable;
import org.mule.api.store.ObjectStoreException;
import org.mule.config.ExceptionHelper;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
import org.mule.context.notification.MuleContextNotification;
import org.mule.context.notification.NotificationException;
import org.mule.message.DefaultExceptionPayload;
import org.mule.retry.RetryPolicyExhaustedException;
import org.mule.util.Preconditions;
import org.mule.util.concurrent.ThreadNameHelper;
import org.mule.util.queue.objectstore.QueueKey;
import org.mule.util.store.QueuePersistenceObjectStore;

/* loaded from: input_file:org/mule/routing/AsynchronousUntilSuccessfulProcessingStrategy.class */
public class AsynchronousUntilSuccessfulProcessingStrategy extends AbstractUntilSuccessfulProcessingStrategy implements Initialisable, Disposable, Startable, Stoppable, MessagingExceptionHandlerAware {
    private static final String UNTIL_SUCCESSFUL_MSG_PREFIX = "until-successful retries exhausted. Last exception message was: %s";
    private static final Random random = new Random();
    protected transient Log logger = LogFactory.getLog(getClass());
    private MessagingExceptionHandler messagingExceptionHandler;
    private ExecutorService pool;
    private ScheduledExecutorService scheduledRetriesPool;
    private MuleContextNotificationListener<MuleContextNotification> contextStartListener;
    private final MuleContext muleContext;

    public AsynchronousUntilSuccessfulProcessingStrategy(MuleContext muleContext) {
        Preconditions.checkArgument(muleContext != null, "muleContext cannot be null");
        this.muleContext = muleContext;
    }

    @Override // org.mule.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        if (getUntilSuccessfulConfiguration().getObjectStore() == null) {
            throw new InitialisationException(MessageFactory.createStaticMessage("A ListableObjectStore must be configured on UntilSuccessful."), this);
        }
        this.contextStartListener = new MuleContextNotificationListener<MuleContextNotification>() { // from class: org.mule.routing.AsynchronousUntilSuccessfulProcessingStrategy.1
            @Override // org.mule.api.context.notification.ServerNotificationListener
            public void onNotification(MuleContextNotification muleContextNotification) {
                if (muleContextNotification.getAction() == 104) {
                    AsynchronousUntilSuccessfulProcessingStrategy.this.muleContext.unregisterListener(this);
                    AsynchronousUntilSuccessfulProcessingStrategy.this.contextStartListener = null;
                    AsynchronousUntilSuccessfulProcessingStrategy.this.scheduleAllPendingEventsForProcessing();
                }
            }
        };
        try {
            this.muleContext.registerListener(this.contextStartListener);
        } catch (NotificationException e) {
            throw new InitialisationException(e, this);
        }
    }

    @Override // org.mule.api.lifecycle.Startable
    public void start() {
        String format = String.format("%s%s.%s", ThreadNameHelper.getPrefix(getUntilSuccessfulConfiguration().getMuleContext()), getUntilSuccessfulConfiguration().getFlowConstruct().getName(), "until-successful");
        this.pool = getUntilSuccessfulConfiguration().getThreadingProfile().createPool(format);
        this.scheduledRetriesPool = getUntilSuccessfulConfiguration().createScheduledRetriesPool(format);
    }

    @Override // org.mule.api.lifecycle.Stoppable
    public void stop() {
        this.scheduledRetriesPool.shutdown();
        this.scheduledRetriesPool = null;
        this.pool.shutdown();
        this.pool = null;
    }

    @Override // org.mule.routing.AbstractUntilSuccessfulProcessingStrategy
    protected MuleEvent doRoute(MuleEvent muleEvent) throws MessagingException {
        try {
            scheduleForProcessing(storeEvent(muleEvent), true);
            return getUntilSuccessfulConfiguration().getAckExpression() == null ? VoidMuleEvent.getInstance() : processResponseThroughAckResponseExpression(muleEvent);
        } catch (Exception e) {
            throw new MessagingException(MessageFactory.createStaticMessage("Failed to schedule the event for processing"), muleEvent, e, getUntilSuccessfulConfiguration().getRouter());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleAllPendingEventsForProcessing() {
        try {
            for (Serializable serializable : getUntilSuccessfulConfiguration().getObjectStore().allKeys()) {
                try {
                    scheduleForProcessing(serializable, true);
                } catch (Exception e) {
                    this.logger.error(MessageFactory.createStaticMessage("Failed to schedule for processing event stored with key: " + serializable), e);
                }
            }
        } catch (Exception e2) {
            this.logger.warn("Failure during scheduling of until successful previous jobs " + e2.getMessage());
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(e2);
            }
        }
    }

    private void scheduleForProcessing(final Serializable serializable, boolean z) {
        if (z) {
            submitForProcessing(serializable);
        } else {
            this.scheduledRetriesPool.schedule(new Callable<Object>() { // from class: org.mule.routing.AsynchronousUntilSuccessfulProcessingStrategy.2
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    AsynchronousUntilSuccessfulProcessingStrategy.this.submitForProcessing(serializable);
                    return null;
                }
            }, getUntilSuccessfulConfiguration().getMillisBetweenRetries(), TimeUnit.MILLISECONDS);
        }
    }

    protected void submitForProcessing(final Serializable serializable) {
        this.pool.execute(new Runnable() { // from class: org.mule.routing.AsynchronousUntilSuccessfulProcessingStrategy.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AsynchronousUntilSuccessfulProcessingStrategy.this.retrieveAndProcessEvent(serializable);
                } catch (ObjectStoreException e) {
                    throw new MuleRuntimeException(e);
                } catch (Exception e2) {
                    AsynchronousUntilSuccessfulProcessingStrategy.this.incrementProcessAttemptCountAndRescheduleOrRemoveFromStore(serializable, e2);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void incrementProcessAttemptCountAndRescheduleOrRemoveFromStore(Serializable serializable, Exception exc) {
        try {
            MuleEvent remove = getUntilSuccessfulConfiguration().getObjectStore().remove(serializable);
            MuleEvent threadSafeCopy = threadSafeCopy(remove);
            MuleMessage message = threadSafeCopy.getMessage();
            Integer num = (Integer) message.getInvocationProperty(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, 1);
            if (num.intValue() <= getUntilSuccessfulConfiguration().getMaxRetries()) {
                message.setInvocationProperty(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, Integer.valueOf(num.intValue() + 1));
                getUntilSuccessfulConfiguration().getObjectStore().store(serializable, threadSafeCopy);
                scheduleForProcessing(serializable, false);
            } else {
                abandonRetries(remove, threadSafeCopy, exc);
            }
        } catch (ObjectStoreException e) {
            this.logger.error("Failed to increment failure count for event stored with key: " + serializable, e);
        }
    }

    private Serializable storeEvent(MuleEvent muleEvent) throws ObjectStoreException {
        return storeEvent(muleEvent, ((Integer) muleEvent.getMessage().getInvocationProperty(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, 1)).intValue());
    }

    private Serializable storeEvent(MuleEvent muleEvent, int i) throws ObjectStoreException {
        muleEvent.getMessage().setInvocationProperty(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, Integer.valueOf(i));
        Serializable buildQueueKey = buildQueueKey(muleEvent);
        getUntilSuccessfulConfiguration().getObjectStore().store(buildQueueKey, muleEvent);
        return buildQueueKey;
    }

    public static Serializable buildQueueKey(MuleEvent muleEvent) {
        return new QueueKey(QueuePersistenceObjectStore.DEFAULT_QUEUE_STORE, String.format("%s-%s-%s-%d", muleEvent.getFlowConstruct(), muleEvent.getMuleContext().getClusterId(), muleEvent.getId(), Integer.valueOf(random.nextInt())));
    }

    private void abandonRetries(MuleEvent muleEvent, MuleEvent muleEvent2, Exception exc) {
        if (getUntilSuccessfulConfiguration().getDlqMP() == null) {
            this.logger.info("Retry attempts exhausted and no DLQ defined");
            this.messagingExceptionHandler.handleException(buildRetryPolicyExhaustedException(exc), muleEvent2);
            return;
        }
        MuleEvent threadSafeCopy = threadSafeCopy(muleEvent);
        this.logger.info("Retry attempts exhausted, routing message to DLQ: " + getUntilSuccessfulConfiguration().getDlqMP());
        try {
            muleEvent2.getMessage().setExceptionPayload(new DefaultExceptionPayload(buildRetryPolicyExhaustedException(exc)));
            getUntilSuccessfulConfiguration().getDlqMP().process(muleEvent2);
        } catch (MessagingException e) {
            this.messagingExceptionHandler.handleException(e, threadSafeCopy);
        } catch (Exception e2) {
            this.messagingExceptionHandler.handleException(new MessagingException(muleEvent, e2), threadSafeCopy);
        }
    }

    protected RetryPolicyExhaustedException buildRetryPolicyExhaustedException(Exception exc) {
        MuleException rootMuleException = ExceptionHelper.getRootMuleException(exc);
        if (rootMuleException == null) {
            return new RetryPolicyExhaustedException(CoreMessages.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, exc.getMessage()), exc, this);
        }
        if (rootMuleException.getCause() != null) {
            RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(CoreMessages.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, rootMuleException.getMessage()), rootMuleException.getCause());
            retryPolicyExhaustedException.getInfo().putAll(rootMuleException.getInfo());
            return retryPolicyExhaustedException;
        }
        RetryPolicyExhaustedException retryPolicyExhaustedException2 = new RetryPolicyExhaustedException(CoreMessages.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, rootMuleException.getMessage()), rootMuleException);
        retryPolicyExhaustedException2.getInfo().putAll(rootMuleException.getInfo());
        return retryPolicyExhaustedException2;
    }

    private void removeFromStore(Serializable serializable) {
        try {
            getUntilSuccessfulConfiguration().getObjectStore().remove(serializable);
        } catch (ObjectStoreException e) {
            this.logger.warn("Failed to remove following event from store with key: " + serializable);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retrieveAndProcessEvent(Serializable serializable) throws ObjectStoreException {
        processEvent(threadSafeCopy(getUntilSuccessfulConfiguration().getObjectStore().retrieve(serializable)));
        removeFromStore(serializable);
    }

    protected MuleEvent threadSafeCopy(MuleEvent muleEvent) {
        return new DefaultMuleEvent(new DefaultMuleMessage(muleEvent.getMessage().getPayload(), muleEvent.getMessage(), getUntilSuccessfulConfiguration().getMuleContext()), muleEvent);
    }

    @Override // org.mule.api.exception.MessagingExceptionHandlerAware
    public void setMessagingExceptionHandler(MessagingExceptionHandler messagingExceptionHandler) {
        this.messagingExceptionHandler = messagingExceptionHandler;
    }

    @Override // org.mule.api.lifecycle.Disposable
    public void dispose() {
        if (this.contextStartListener != null) {
            this.muleContext.unregisterListener(this.contextStartListener);
        }
    }
}
