package com.mulesoft.connectors.hl7.mllp.internal.source;

import com.mulesoft.connectors.hl7.mllp.api.ImmutableSocketAttributes;
import com.mulesoft.connectors.hl7.mllp.internal.config.MllpListenerConfiguration;
import com.mulesoft.connectors.hl7.mllp.internal.connection.MllpListenerConnection;
import com.mulesoft.connectors.hl7.mllp.internal.connection.tcp.SocketUtils;
import com.mulesoft.connectors.hl7.mllp.internal.protocol.MllpListenerSocketWorker;
import java.io.InputStream;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.dsl.xml.ParameterDsl;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MediaType(value = "*/*", strict = false)
@EmitsResponse
@Alias("mllp-listener")
/* loaded from: input_file:com/mulesoft/connectors/hl7/mllp/internal/source/MllpListenerSource.class */
public final class MllpListenerSource extends Source<InputStream, ImmutableSocketAttributes> {
    private static final Logger logger = LoggerFactory.getLogger(MllpListenerSource.class);

    @Inject
    private SchedulerService schedulerService;

    @Connection
    private ConnectionProvider<MllpListenerConnection> connectionProvider;
    private MllpListenerConnection connection;

    @Config
    private MllpListenerConfiguration config;
    private ComponentLocation location;
    private AtomicBoolean stopRequested = new AtomicBoolean(false);
    private Scheduler workManager;
    private Scheduler listenerExecutor;
    private Semaphore connectionCounter;
    private Future<?> submittedListenerTask;

    public void onStart(SourceCallback<InputStream, ImmutableSocketAttributes> sourceCallback) throws MuleException {
        try {
            logger.debug("Starting Listener");
            this.connection = (MllpListenerConnection) this.connectionProvider.connect();
            this.stopRequested.set(false);
            Integer acceptCount = this.config.getAcceptCount();
            String format = String.format("%s.socket.worker", this.location.getRootContainerName());
            if (acceptCount == null || acceptCount.intValue() <= 0) {
                logger.debug(String.format("Scheduler %s without connection limit. Connection count value: %d", format, acceptCount));
                this.connectionCounter = null;
                this.workManager = this.schedulerService.ioScheduler(SchedulerConfig.config().withName(format));
            } else {
                logger.debug(String.format("Scheduler %s with connection limit: %d", format, acceptCount));
                this.connectionCounter = new Semaphore(acceptCount.intValue());
                this.workManager = this.schedulerService.ioScheduler(SchedulerConfig.config().withMaxConcurrentTasks(acceptCount.intValue()).withName(String.format("%s.socket.worker", this.location.getRootContainerName())));
            }
            this.listenerExecutor = this.schedulerService.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withName(String.format("%s.socket.listener", this.location.getRootContainerName())));
            this.submittedListenerTask = this.listenerExecutor.submit(() -> {
                listen(sourceCallback);
            });
            logger.debug("Mllp listener ready to start");
        } catch (Exception e) {
            if (logger.isDebugEnabled()) {
                logger.debug("There was a problem on the connector starting", e);
            }
            throw new ConnectionException("There was a problem on the connector starting", e);
        }
    }

    public void onStop() {
        this.stopRequested.set(true);
        logger.debug("Stopping MLLP Connection");
        if (this.submittedListenerTask != null) {
            logger.debug("Canceling listener task");
            this.submittedListenerTask.cancel(false);
        }
        if (this.listenerExecutor != null) {
            logger.debug("Stoping listener executor");
            this.listenerExecutor.stop();
        }
        if (this.workManager != null) {
            logger.debug("Stoping work manager");
            this.workManager.stop();
        }
        if (this.connection != null) {
            logger.debug("Disconnecting connection");
            this.connectionProvider.disconnect(this.connection);
        }
        logger.debug("MLLP Listener ready to complete stop process");
    }

    private boolean isRequestedToStop() {
        return this.stopRequested.get() || Thread.currentThread().isInterrupted();
    }

    private void listen(SourceCallback<InputStream, ImmutableSocketAttributes> sourceCallback) {
        MllpListenerSocketWorker listen;
        while (!isRequestedToStop()) {
            try {
                if (this.connectionCounter == null) {
                    logger.debug("Mllp socket listening. Waiting for new incoming connection");
                    listen = this.connection.listen(sourceCallback, () -> {
                        logger.debug("Connection ready to close");
                    });
                } else {
                    logger.debug("Getting new connection");
                    this.connectionCounter.acquire();
                    logger.debug("Mllp socket listening. Waiting for new incoming connection.");
                    listen = this.connection.listen(sourceCallback, () -> {
                        this.connectionCounter.release();
                        logger.debug("Connection ready to close.");
                    });
                }
                MllpListenerSocketWorker mllpListenerSocketWorker = listen;
                listen.onError(exc -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("Got exception '%s'. Work being executed was: %s", exc.getClass().getName(), mllpListenerSocketWorker.toString()), exc);
                    }
                    (ConnectionException.class.isInstance(exc) ? Optional.of(exc) : Optional.empty()).ifPresent(th -> {
                        sourceCallback.onConnectionException((ConnectionException) th);
                    });
                });
                logger.debug("Connection established. Ready to get incoming messages-");
                this.workManager.execute(listen);
            } catch (Exception e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("An exception occurred while listening for new connections", e);
                }
                sourceCallback.onConnectionException(new ConnectionException("An exception occurred while listening for new connections"));
            }
        }
        logger.debug("MLLP Listener required to stop. Stop requested value: %b. Thread interrupted: %b.", Boolean.valueOf(this.stopRequested.get()), Boolean.valueOf(Thread.currentThread().isInterrupted()));
    }

    @OnSuccess
    public void onSuccess(@org.mule.runtime.extension.api.annotation.param.Optional(defaultValue = "#[payload]") @ParameterDsl(allowReferences = false) InputStream inputStream, SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(SocketUtils.WORK).ifPresent(mllpListenerSocketWorker -> {
            mllpListenerSocketWorker.onComplete(inputStream);
        });
    }

    @OnError
    public void onError(Error error, SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(SocketUtils.WORK).ifPresent(mllpListenerSocketWorker -> {
            mllpListenerSocketWorker.onError(error.getCause());
        });
    }
}
