package org.mule.extensions.vm.internal.listener;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.mule.extensions.vm.api.VMMessageAttributes;
import org.mule.extensions.vm.internal.QueueDescriptor;
import org.mule.extensions.vm.internal.ReplyToCommand;
import org.mule.extensions.vm.internal.VMConnector;
import org.mule.extensions.vm.internal.VMConnectorQueueManager;
import org.mule.extensions.vm.internal.connection.VMConnection;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.util.queue.Queue;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias("listener")
@EmitsResponse
/* loaded from: input_file:org/mule/extensions/vm/internal/listener/VMListener.class */
public class VMListener extends Source<Serializable, VMMessageAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(VMListener.class);
    private static final String REPLY_TO_QUEUE_NAME = "replyTo";

    @Inject
    private VMConnectorQueueManager connectorQueueManager;

    @ParameterGroup(name = "queue")
    private QueueDescriptor queueDescriptor;

    @Optional(defaultValue = "4")
    @Parameter
    private int numberOfConsumers;

    @Config
    private VMConnector config;

    @Connection
    private ConnectionProvider<VMConnection> connectionProvider;

    @Inject
    private SchedulerService schedulerService;

    @Inject
    private ConfigurationComponentLocator componentLocator;
    private ComponentLocation location;
    private List<Consumer> consumers;
    private Scheduler scheduler;
    private Semaphore semaphore;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/extensions/vm/internal/listener/VMListener$Consumer.class */
    public class Consumer {
        private final SourceCallback<Serializable, VMMessageAttributes> sourceCallback;
        private final AtomicBoolean stop = new AtomicBoolean(false);

        public Consumer(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) {
            this.sourceCallback = sourceCallback;
        }

        public void start() {
            long queueTimeoutInMillis = VMListener.this.queueDescriptor.getQueueTimeoutInMillis();
            while (isAlive()) {
                SourceCallbackContext createContext = this.sourceCallback.createContext();
                try {
                    VMListener.this.semaphore.acquire();
                    TypedValue<Serializable> poll = connect(createContext).getQueue(VMListener.this.queueDescriptor.getQueueName()).poll(queueTimeoutInMillis);
                    if (poll == null) {
                        cancel(createContext);
                    } else {
                        Result.Builder attributes = Result.builder().attributes(new VMMessageAttributes(VMListener.this.queueDescriptor.getQueueName()));
                        if (poll instanceof ReplyToCommand) {
                            ReplyToCommand replyToCommand = (ReplyToCommand) poll;
                            createContext.addVariable(VMListener.REPLY_TO_QUEUE_NAME, replyToCommand.getReplyToQueueName());
                            poll = replyToCommand.getValue();
                        }
                        if (poll instanceof TypedValue) {
                            TypedValue<Serializable> typedValue = poll;
                            attributes.output(typedValue.getValue()).mediaType(typedValue.getDataType().getMediaType());
                        } else {
                            attributes.output(poll);
                        }
                        Result build = attributes.build();
                        if (isAlive()) {
                            this.sourceCallback.handle(build, createContext);
                        } else {
                            cancel(createContext);
                        }
                    }
                } catch (InterruptedException e) {
                    stop();
                    cancel(createContext);
                    VMListener.LOGGER.info("Consumer for <vm:listener> on flow '{}' was interrupted. No more consuming for thread '{}'", VMListener.this.location.getRootContainerName(), Thread.currentThread().getName());
                } catch (Exception e2) {
                    cancel(createContext);
                    if (VMListener.LOGGER.isErrorEnabled()) {
                        VMListener.LOGGER.error(String.format("Consumer for <vm:listener> on flow '%s' found unexpected exception. Consuming will continue '", VMListener.this.location.getRootContainerName()), e2);
                    }
                }
            }
        }

        private void cancel(SourceCallbackContext sourceCallbackContext) {
            try {
                sourceCallbackContext.getTransactionHandle().rollback();
            } catch (TransactionException e) {
                if (VMListener.LOGGER.isWarnEnabled()) {
                    VMListener.LOGGER.warn("Failed to rollback transaction: " + e.getMessage(), e);
                }
            }
            VMListener.this.semaphore.release();
            VMListener.this.connectionProvider.disconnect(sourceCallbackContext.getConnection());
        }

        private VMConnection connect(SourceCallbackContext sourceCallbackContext) throws ConnectionException, TransactionException {
            VMConnection vMConnection = (VMConnection) VMListener.this.connectionProvider.connect();
            sourceCallbackContext.bindConnection(vMConnection);
            return vMConnection;
        }

        private boolean isAlive() {
            return (this.stop.get() || Thread.currentThread().isInterrupted()) ? false : true;
        }

        public void stop() {
            this.stop.set(true);
        }
    }

    public void onStart(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) throws MuleException {
        this.connectorQueueManager.registerListenerQueue(this.config, this.queueDescriptor.getQueueName(), this.location);
        startConsumers(sourceCallback);
    }

    public void onStop() {
        if (this.consumers != null) {
            this.consumers.forEach((v0) -> {
                v0.stop();
            });
        }
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
        this.connectorQueueManager.unregisterListenerQueue(this.queueDescriptor.getQueueName());
    }

    @OnSuccess
    public void onSuccess(@ParameterGroup(name = "Response", showInDsl = true) VMResponseBuilder vMResponseBuilder, SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(REPLY_TO_QUEUE_NAME).ifPresent(str -> {
            try {
                Queue queue = ((VMConnection) sourceCallbackContext.getConnection()).getQueue(str);
                if (queue == null) {
                    LOGGER.warn("Could not send response to replyTo queue '{}' because it does not exists", str);
                    return;
                }
                try {
                    queue.offer(vMResponseBuilder.getContent(), this.queueDescriptor.getQueueTimeoutInMillis());
                } catch (Exception e) {
                    LOGGER.warn(String.format("Found exception trying to send response to replyTo queue '%s'", str), e);
                }
            } catch (Exception e2) {
                LOGGER.warn(String.format("Found exception trying to obtain replyTo queue '%s'", str), e2);
            }
        });
    }

    @OnTerminate
    public void onTerminate() {
        this.semaphore.release();
    }

    private void startConsumers(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) {
        createScheduler();
        this.consumers = new ArrayList(this.numberOfConsumers);
        this.semaphore = new Semaphore(getMaxConcurrency(), false);
        for (int i = 0; i < this.numberOfConsumers; i++) {
            Consumer consumer = new Consumer(sourceCallback);
            this.consumers.add(consumer);
            Scheduler scheduler = this.scheduler;
            consumer.getClass();
            scheduler.submit(consumer::start);
        }
    }

    private void createScheduler() {
        this.scheduler = this.schedulerService.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(this.numberOfConsumers).withName("vm listener on flow " + this.location.getRootContainerName()).withPrefix("vm-listener-flow-" + this.location.getRootContainerName()).withWaitAllowed(true).withShutdownTimeout(this.queueDescriptor.getTimeout(), this.queueDescriptor.getTimeoutUnit()));
    }

    private int getMaxConcurrency() {
        return ((Flow) this.componentLocator.find(Location.builder().globalName(this.location.getRootContainerName()).build()).get()).getMaxConcurrency();
    }
}
