package org.mule.extension.socket.api.source;

import java.io.InputStream;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.mule.extension.socket.api.SocketAttributes;
import org.mule.extension.socket.api.config.ListenerConfig;
import org.mule.extension.socket.api.connection.ListenerConnection;
import org.mule.extension.socket.api.worker.SocketWorker;
import org.mule.extension.socket.internal.SocketUtils;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.FlowConstructAware;
import org.mule.runtime.core.api.scheduler.Scheduler;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.util.concurrent.ThreadNameHelper;
import org.mule.runtime.extension.api.annotation.dsl.xml.XmlHints;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.UseConfig;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EmitsResponse
/* loaded from: input_file:org/mule/extension/socket/api/source/SocketListener.class */
public final class SocketListener extends Source<InputStream, SocketAttributes> implements FlowConstructAware {
    private static final Logger LOGGER = LoggerFactory.getLogger(SocketListener.class);
    private FlowConstruct flowConstruct;

    @Inject
    private MuleContext muleContext;

    @Inject
    private SchedulerService schedulerService;

    @Connection
    private ListenerConnection connection;

    @UseConfig
    private ListenerConfig config;
    private AtomicBoolean stopRequested = new AtomicBoolean(false);
    private Scheduler workManager;
    private Scheduler listenerExecutor;
    private Future<?> submittedListenerTask;

    public void onStart(SourceCallback<InputStream, SocketAttributes> sourceCallback) throws MuleException {
        this.workManager = this.schedulerService.ioScheduler();
        this.stopRequested.set(false);
        this.listenerExecutor = this.schedulerService.customScheduler(String.format("%s%s.socket.listener", ThreadNameHelper.getPrefix(this.muleContext), this.flowConstruct.getName()), 1);
        this.submittedListenerTask = this.listenerExecutor.submit(() -> {
            listen(sourceCallback);
        });
    }

    @OnSuccess
    public void onSuccess(@XmlHints(allowReferences = false) @Optional(defaultValue = "#[payload]") Object obj, SourceCallbackContext sourceCallbackContext) {
        ((SocketWorker) sourceCallbackContext.getVariable(SocketUtils.WORK)).onComplete(obj);
    }

    @OnError
    public void onError(Error error, SourceCallbackContext sourceCallbackContext) {
        ((SocketWorker) sourceCallbackContext.getVariable(SocketUtils.WORK)).onError(error.getCause());
    }

    public void onStop() {
        this.submittedListenerTask.cancel(false);
        this.stopRequested.set(true);
        this.listenerExecutor.stop(this.muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS);
        this.workManager.stop(this.muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS);
    }

    public void setFlowConstruct(FlowConstruct flowConstruct) {
        this.flowConstruct = flowConstruct;
    }

    private boolean isRequestedToStop() {
        return this.stopRequested.get() || Thread.currentThread().isInterrupted();
    }

    private void listen(SourceCallback<InputStream, SocketAttributes> sourceCallback) {
        while (!isRequestedToStop()) {
            try {
                SocketWorker listen = this.connection.listen(sourceCallback);
                listen.setEncoding(this.config.getDefaultEncoding());
                listen.onError(exc -> {
                    Exception exc = exc;
                    if (exc.getCause() != null) {
                        exc = exc.getCause();
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(String.format("Got exception '%s'. Work being executed was: %s", exc.getClass().getName(), listen.toString()));
                    }
                    if ((exc instanceof MessagingException) || (exc instanceof ConnectionException)) {
                        sourceCallback.onSourceException(exc);
                    }
                });
                this.workManager.execute(listen);
            } catch (Exception e) {
                if (isRequestedToStop()) {
                    return;
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("An exception occurred while listening for new connections", e);
                }
            } catch (ConnectionException e2) {
                if (!isRequestedToStop()) {
                    sourceCallback.onSourceException(e2);
                }
            }
        }
    }
}
