package org.mule.extensions.vm.internal.operations;

import java.io.Serializable;
import java.util.Optional;
import javax.inject.Inject;
import org.mule.extensions.vm.api.VMError;
import org.mule.extensions.vm.api.VMMessageAttributes;
import org.mule.extensions.vm.internal.QueueDescriptor;
import org.mule.extensions.vm.internal.ReplyToCommand;
import org.mule.extensions.vm.internal.VMConnectorQueueManager;
import org.mule.extensions.vm.internal.connection.VMConnection;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.queue.Queue;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Content;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.Result;

/* loaded from: input_file:repository/org/mule/connectors/mule-vm-connector/0.9.0/mule-vm-connector-0.9.0-mule-plugin.jar:org/mule/extensions/vm/internal/operations/VMOperations.class */
public class VMOperations implements Startable, Stoppable {

    @Inject
    private VMConnectorQueueManager queueManager;

    @Inject
    private SchedulerService schedulerService;
    private Scheduler scheduler;

    public void start() throws MuleException {
        this.scheduler = this.schedulerService.ioScheduler();
    }

    public void stop() throws MuleException {
        if (this.scheduler != null) {
            this.scheduler.stop();
            this.scheduler = null;
        }
    }

    @Throws({PublishErrorTypeProvider.class})
    public void publish(@Content TypedValue<Serializable> typedValue, @ParameterGroup(name = "queue") QueueDescriptor queueDescriptor, @Connection VMConnection vMConnection, ComponentLocation componentLocation) {
        doPublish(typedValue, queueDescriptor, getQueue(queueDescriptor, vMConnection, componentLocation, "publish"));
    }

    @Throws({ConsumeErrorTypeProvider.class})
    public Result<Serializable, VMMessageAttributes> consume(@ParameterGroup(name = "queue") QueueDescriptor queueDescriptor, @Connection VMConnection vMConnection, ComponentLocation componentLocation) {
        return (Result) doConsume(getQueue(queueDescriptor, vMConnection, componentLocation, "consume"), queueDescriptor).map(serializable -> {
            return asConsumeResponse(serializable, queueDescriptor);
        }).orElseThrow(() -> {
            return new ModuleException(String.format("Tried to consume messages from VM queue '%s' but it was empty after timeout of %d %s", queueDescriptor.getQueueName(), Integer.valueOf(queueDescriptor.getTimeout()), queueDescriptor.getTimeoutUnit()), VMError.EMPTY_QUEUE);
        });
    }

    @Throws({PublishConsumeErrorTypeProvider.class})
    public Result<Serializable, VMMessageAttributes> publishConsume(@Content TypedValue<Serializable> typedValue, @ParameterGroup(name = "queue") QueueDescriptor queueDescriptor, @Connection VMConnection vMConnection, ComponentLocation componentLocation) {
        Queue queue = getQueue(queueDescriptor, vMConnection, componentLocation, "publishConsume");
        Queue createReplyToQueue = this.queueManager.createReplyToQueue(queue, vMConnection);
        try {
            try {
                doPublish(new ReplyToCommand(typedValue, createReplyToQueue.getName()), queueDescriptor, queue);
                Result<Serializable, VMMessageAttributes> result = (Result) doConsume(createReplyToQueue, queueDescriptor).map(serializable -> {
                    return asConsumeResponse(serializable, queueDescriptor);
                }).orElseThrow(() -> {
                    return new ModuleException(String.format("Published messages to queue '%s' but got no response after timeout of %d %s", queueDescriptor.getQueueName(), Integer.valueOf(queueDescriptor.getTimeout()), queueDescriptor.getTimeoutUnit()), VMError.QUEUE_TIMEOUT);
                });
                this.queueManager.disposeReplyToQueue(createReplyToQueue);
                return result;
            } catch (Exception e) {
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Found error trying to perform publish-consume to VM queue '%s'", queueDescriptor.getQueueName())), e);
            } catch (ModuleException e2) {
                throw e2;
            }
        } catch (Throwable th) {
            this.queueManager.disposeReplyToQueue(createReplyToQueue);
            throw th;
        }
    }

    private Result<Serializable, VMMessageAttributes> asConsumeResponse(Serializable serializable, QueueDescriptor queueDescriptor) {
        Result.Builder builder = Result.builder();
        builder.attributes(new VMMessageAttributes(queueDescriptor.getQueueName()));
        if (serializable instanceof TypedValue) {
            TypedValue typedValue = (TypedValue) serializable;
            builder.output(typedValue.getValue()).mediaType(typedValue.getDataType().getMediaType());
        } else {
            builder.output(serializable);
        }
        return builder.build();
    }

    private void doPublish(Serializable serializable, QueueDescriptor queueDescriptor, Queue queue) {
        try {
            if (queue.offer(serializable, queueDescriptor.getQueueTimeoutInMillis())) {
            } else {
                throw new ModuleException("Timeout publishing message to VM queue " + queueDescriptor.getQueueName(), VMError.QUEUE_TIMEOUT);
            }
        } catch (InterruptedException e) {
            throw new ModuleException(VMError.QUEUE_TIMEOUT, e);
        }
    }

    private Optional<Serializable> doConsume(Queue queue, QueueDescriptor queueDescriptor) {
        try {
            return Optional.ofNullable(queue.poll(queueDescriptor.getQueueTimeoutInMillis()));
        } catch (Exception e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Found error trying to consume messages from VM queue '%s'", queueDescriptor.getQueueName())), e);
        }
    }

    private Queue getQueue(QueueDescriptor queueDescriptor, VMConnection vMConnection, ComponentLocation componentLocation, String str) {
        this.queueManager.validateQueue(queueDescriptor.getQueueName(), str, componentLocation);
        return vMConnection.getQueue(queueDescriptor.getQueueName());
    }
}
