package com.mulesoft.connector.azure.messaging.internal.source;

import com.microsoft.azure.servicebus.IMessageReceiver;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.MessageBody;
import com.microsoft.azure.servicebus.MessageBodyType;
import com.microsoft.azure.servicebus.primitives.MessagingEntityDisabledException;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.mulesoft.connector.azure.messaging.api.AckMode;
import com.mulesoft.connector.azure.messaging.api.AzureMessageAttributes;
import com.mulesoft.connector.azure.messaging.internal.connection.AzureServiceBusConnection;
import java.io.Closeable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.extension.api.error.MuleErrors;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connector/azure/messaging/internal/source/ReceiverTask.class */
public class ReceiverTask implements Runnable, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class);
    public static final String UUID_VAR = "uuidKey";
    public static final String SEMAPHORE_VAR = "semaphoreKey";
    public static final String REPLY_TO_DESTINATION_VAR = "REPLY_TO_DESTINATION";
    public static final String REPLY_TO_SESSION_ID_VAR = "REPLY_TO_SESSION_ID";
    private IMessageReceiver receiver;
    private SourceCallback<byte[], AzureMessageAttributes> sourceCallback;
    private int threadNumber;
    private AzureServiceBusConnection connection;
    private AckMode ackMode;
    private String zoneId;
    private boolean bufferedMode;
    private volatile boolean running = true;
    private Semaphore semaphore = new Semaphore(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.mulesoft.connector.azure.messaging.internal.source.ReceiverTask$1, reason: invalid class name */
    /* loaded from: input_file:com/mulesoft/connector/azure/messaging/internal/source/ReceiverTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$microsoft$azure$servicebus$MessageBodyType = new int[MessageBodyType.values().length];

        static {
            try {
                $SwitchMap$com$microsoft$azure$servicebus$MessageBodyType[MessageBodyType.BINARY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$microsoft$azure$servicebus$MessageBodyType[MessageBodyType.VALUE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ReceiverTask(AzureServiceBusConnection azureServiceBusConnection, IMessageReceiver iMessageReceiver, SourceCallback<byte[], AzureMessageAttributes> sourceCallback, AckMode ackMode, int i, String str, boolean z) {
        this.connection = azureServiceBusConnection;
        this.receiver = iMessageReceiver;
        this.sourceCallback = sourceCallback;
        this.threadNumber = i;
        this.ackMode = ackMode;
        this.zoneId = str;
        this.bufferedMode = z;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                if (!this.bufferedMode) {
                    this.semaphore.acquire();
                }
                Message message = (Message) this.receiver.receive();
                if (message != null) {
                    handleMessage(message);
                } else if (!this.bufferedMode) {
                    this.semaphore.release();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ModuleException(MuleErrors.ANY, e);
            } catch (ServiceBusException e2) {
                if (!e2.getIsTransient() && !(e2 instanceof MessagingEntityDisabledException)) {
                    throw new ModuleException(MuleErrors.ANY, e2);
                }
                this.sourceCallback.onConnectionException(new ConnectionException(e2));
            }
        }
    }

    private void handleMessage(Message message) {
        SourceCallbackContext createContext = this.sourceCallback.createContext();
        if (!this.bufferedMode) {
            createContext.addVariable(SEMAPHORE_VAR, this.semaphore);
        }
        if (!this.ackMode.equals(AckMode.IMMEDIATE)) {
            String uuid = message.getLockToken().toString();
            this.connection.registerReceiverToLock(uuid, this.receiver);
            createContext.addVariable(UUID_VAR, uuid);
        }
        if (!StringUtils.isEmpty(message.getReplyTo())) {
            createContext.addVariable(REPLY_TO_DESTINATION_VAR, message.getReplyTo());
        }
        if (!StringUtils.isEmpty(message.getReplyToSessionId())) {
            createContext.addVariable(REPLY_TO_SESSION_ID_VAR, message.getReplyToSessionId());
        }
        if (!StringUtils.isEmpty(message.getCorrelationId())) {
            createContext.setCorrelationId(message.getCorrelationId());
        }
        this.sourceCallback.handle(Result.builder().attributes(buildAttributes(message, this.zoneId)).mediaType(getMessageMediaType(message)).output(getMessageBody(message)).build(), createContext);
        logger.debug("Thread-{}", Integer.valueOf(this.threadNumber));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.debug("Closing receiver in Thread-{}", Integer.valueOf(this.threadNumber));
        try {
            this.running = false;
            this.receiver.close();
        } catch (ServiceBusException e) {
            logger.warn(e.getMessage());
        }
    }

    private byte[] getMessageBody(Message message) {
        if (message.getMessageBody() == null) {
            logger.debug("Null MessageBody");
            return new byte[0];
        }
        MessageBody messageBody = message.getMessageBody();
        switch (AnonymousClass1.$SwitchMap$com$microsoft$azure$servicebus$MessageBodyType[messageBody.getBodyType().ordinal()]) {
            case 1:
                return (byte[]) messageBody.getBinaryData().get(0);
            case 2:
                return ((String) messageBody.getValueData()).getBytes();
            default:
                logger.debug("Incompatible MessageBody type");
                return new byte[0];
        }
    }

    private MediaType getMessageMediaType(Message message) {
        try {
            return MediaType.parse(message.getContentType());
        } catch (Exception e) {
            logger.info(e.getMessage());
            return MediaType.ANY;
        }
    }

    private AzureMessageAttributes buildAttributes(Message message, String str) {
        return new AzureMessageAttributes(message.getMessageId(), message.getSessionId(), message.getCorrelationId(), message.getReplyTo(), message.getReplyToSessionId(), message.getLabel(), message.getPartitionKey(), message.getTimeToLive(), (LocalDateTime) Optional.ofNullable(message.getScheduledEnqueueTimeUtc()).map(instant -> {
            return LocalDateTime.ofInstant(instant, (ZoneId) Optional.ofNullable(str).map(ZoneId::of).orElseGet(ZoneId::systemDefault));
        }).orElse(null), (LocalDateTime) Optional.ofNullable(message.getExpiresAtUtc()).map(instant2 -> {
            return LocalDateTime.ofInstant(instant2, (ZoneId) Optional.ofNullable(str).map(ZoneId::of).orElseGet(ZoneId::systemDefault));
        }).orElse(null), message.getProperties(), Long.valueOf(message.getDeliveryCount()), Long.valueOf(message.getSequenceNumber()), (LocalDateTime) Optional.ofNullable(message.getEnqueuedTimeUtc()).map(instant3 -> {
            return LocalDateTime.ofInstant(instant3, (ZoneId) Optional.ofNullable(str).map(ZoneId::of).orElseGet(ZoneId::systemDefault));
        }).orElse(null), (LocalDateTime) Optional.ofNullable(message.getLockedUntilUtc()).map(instant4 -> {
            return LocalDateTime.ofInstant(instant4, (ZoneId) Optional.ofNullable(str).map(ZoneId::of).orElseGet(ZoneId::systemDefault));
        }).orElse(null), message.getTo(), message.getViaPartitionKey(), message.getDeadLetterSource(), message.getLockToken() != null ? message.getLockToken().toString() : null);
    }
}
