package com.mule.extensions.amqp.internal.operation;

import com.mule.extensions.amqp.api.config.AckMode;
import com.mule.extensions.amqp.api.config.DeliveryMode;
import com.mule.extensions.amqp.api.exception.AmqpExtensionException;
import com.mule.extensions.amqp.api.exception.AmqpPublishConsumeErrorTypeProvider;
import com.mule.extensions.amqp.api.exception.AmqpPublishConsumeException;
import com.mule.extensions.amqp.api.message.AmqpAttributes;
import com.mule.extensions.amqp.api.message.AmqpMessageBuilder;
import com.mule.extensions.amqp.api.model.ExchangeDefinition;
import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.config.AmqpConfig;
import com.mule.extensions.amqp.internal.connection.AmqpTransactionalConnection;
import com.mule.extensions.amqp.internal.connection.channel.AmqpChannelManager;
import com.mule.extensions.amqp.internal.connection.channel.MuleAmqpChannel;
import com.mule.extensions.amqp.internal.exception.resolver.ConsumeExceptionResolver;
import com.mule.extensions.amqp.internal.exception.resolver.PublishExceptionResolver;
import com.mule.extensions.amqp.internal.message.AmqpMessage;
import com.mule.extensions.amqp.internal.message.AmqpResultFactory;
import com.mule.extensions.amqp.internal.metadata.AmqpOutputResolver;
import com.mule.extensions.amqp.internal.model.message.Message;
import com.mule.extensions.amqp.internal.publish.AmqpPublishParameters;
import com.mule.extensions.amqp.internal.publisher.AmqpPublisherCommons;
import com.mule.extensions.amqp.internal.publisher.DefaultAmqpMessagePublisher;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.transformation.TransformationService;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.metadata.OutputResolver;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.tx.OperationTransactionalAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mule/extensions/amqp/internal/operation/AmqpPublishConsume.class */
public final class AmqpPublishConsume {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPublishConsume.class);

    @Inject
    private AmqpChannelManager channelManager;

    @Inject
    private TransformationService transformationService;
    private PublishExceptionResolver publishExceptionResolver = new PublishExceptionResolver();
    private ConsumeExceptionResolver consumeExceptionResolver = new ConsumeExceptionResolver();
    private final AmqpResultFactory resultFactory = new AmqpResultFactory();

    @OutputResolver(output = AmqpOutputResolver.class)
    @Throws({AmqpPublishConsumeErrorTypeProvider.class})
    @MediaType(value = "*/*", strict = false)
    public Result<InputStream, AmqpAttributes> publishConsume(@Config AmqpConfig amqpConfig, @Connection AmqpTransactionalConnection amqpTransactionalConnection, @Summary("The name of the exchange to publish the message to") String str, @Example("application/json") @Optional @Summary("The content type of the message body") String str2, @Example("UTF-8") @Optional @Summary("The encoding of the message body") String str3, @Optional @Summary("The queue exchange to use for exchange declaration in case there is no exchange with the exchangeName") @Expression(ExpressionSupport.NOT_SUPPORTED) ExchangeDefinition exchangeDefinition, @Optional @Summary("The routing key to publish to") String str4, @ConfigOverride @Optional @Summary("The delivery mode to use when publishing to the AMQP broker") DeliveryMode deliveryMode, @Optional(defaultValue = "10000") @Summary("Maximum time to wait for a message to arrive before timeout") Long l, @Example("MILLISECONDS") @Optional(defaultValue = "MILLISECONDS") @Summary("Time unit to be used in the maximumWaitTime configuration") TimeUnit timeUnit, @Summary("A builder for the message that will be published") @ParameterGroup(name = "Message", showInDsl = true) AmqpMessageBuilder amqpMessageBuilder, @ParameterGroup(name = "Publish Configuration") AmqpPublishParameters amqpPublishParameters, OperationTransactionalAction operationTransactionalAction, @ConfigOverride OutboundCorrelationStrategy outboundCorrelationStrategy, @ConfigOverride boolean z, CorrelationInfo correlationInfo) throws AmqpExtensionException {
        String replyToQueue = amqpMessageBuilder.getReplyToQueue();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Begin [publish-consume] to : [" + str + "]");
        }
        AmqpPublisherCommons.checkBrokerNotBlocked(amqpTransactionalConnection);
        try {
            MuleAmqpChannel declareFallbackExchangeIfNeeded = AmqpCommons.declareFallbackExchangeIfNeeded(amqpTransactionalConnection, str, exchangeDefinition, operationTransactionalAction, AmqpCommons.createAmqpChannel(amqpTransactionalConnection, this.channelManager, operationTransactionalAction, amqpConfig.getQualityOfService()), this.channelManager, z);
            DefaultAmqpMessagePublisher build = DefaultAmqpMessagePublisher.Builder.newInstance().withChannel(declareFallbackExchangeIfNeeded).withExchangeName(str).withRequestBrokerConfirms(AmqpPublisherCommons.resolveRequestBrokerConfirms(amqpConfig, amqpPublishParameters)).withReturnedMessageExchange(AmqpPublisherCommons.resolvedReturnedMessageExchange(amqpConfig, amqpPublishParameters)).build();
            if (replyToQueue == null) {
                replyToQueue = declareFallbackExchangeIfNeeded.queueDeclare().getQueue();
                amqpMessageBuilder.setReplyTo(replyToQueue);
            }
            amqpMessageBuilder.overridePriorityIfNeeded(Integer.valueOf(amqpConfig.getPublisherConfig().getPriority()));
            Message build2 = amqpMessageBuilder.build(((Boolean) AmqpCommons.resolveOverride(Boolean.valueOf(amqpConfig.getPublisherConfig().isImmediate()), Boolean.valueOf(amqpPublishParameters.isImmediate()))).booleanValue(), ((Boolean) AmqpCommons.resolveOverride(Boolean.valueOf(amqpConfig.getPublisherConfig().isMandatory()), Boolean.valueOf(amqpPublishParameters.isMandatory()))).booleanValue(), str4, str, amqpConfig.getContentType(), amqpConfig.getEncoding(), deliveryMode, outboundCorrelationStrategy, correlationInfo, this.transformationService);
            build.publish(build2);
            try {
                AmqpMessage consume = amqpTransactionalConnection.createConsumer(amqpTransactionalConnection.createChannel(), replyToQueue, timeUnit.toMillis(l.longValue()), AckMode.IMMEDIATE.getInternalAckMode()).consume(build2.getProperties().getCorrelationId());
                String messageAckId = AmqpCommons.getMessageAckId(declareFallbackExchangeIfNeeded, consume.getEnvelope().getDeliveryTag());
                String str5 = (String) AmqpCommons.resolveOverride(AmqpCommons.resolveMessageContentType(consume.getProperties().getContentType(), amqpConfig.getContentType()), str2);
                String str6 = (String) AmqpCommons.resolveOverride(AmqpCommons.resolveMessageEncoding(consume.getProperties().getContentEncoding(), amqpConfig.getEncoding()), str3);
                AmqpCommons.evaluateMessageAck(declareFallbackExchangeIfNeeded, consume, this.channelManager, AckMode.IMMEDIATE.getInternalAckMode(), messageAckId);
                return this.resultFactory.createResult(consume, str5, str6, messageAckId);
            } catch (AmqpExtensionException e) {
                throw e;
            } catch (IOException e2) {
                throw this.consumeExceptionResolver.resolveException(e2);
            } catch (Exception e3) {
                throw new AmqpPublishConsumeException(String.format("An error occurred while consuming a message from the queue [%s]: %s", replyToQueue, e3.getMessage()), e3);
            }
        } catch (AmqpExtensionException e4) {
            throw e4;
        } catch (IOException e5) {
            throw this.publishExceptionResolver.resolveException(e5);
        } catch (Exception e6) {
            throw new AmqpPublishConsumeException(String.format("An error occurred while publishing a message from the exchange [%s]: %s", str, e6.getMessage()), e6);
        }
    }
}
