/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.streaming;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Lifecycle;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.streaming.CursorProvider;
import org.mule.runtime.core.internal.streaming.CursorManager;
import org.mule.runtime.core.internal.streaming.CursorProviderJanitor;
import org.mule.runtime.core.internal.streaming.CursorUtils;
import org.mule.runtime.core.internal.streaming.ManagedCursorProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingGhostBuster
implements Lifecycle {
    private static final long POLL_INTERVAL = TimeUnit.SECONDS.toMillis(5L);
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingGhostBuster.class);
    private final ReferenceQueue<ManagedCursorProvider> referenceQueue = new ReferenceQueue();
    private volatile boolean stopped = false;
    private Future taskHandle;
    @Inject
    private SchedulerService schedulerService;
    private Scheduler scheduler;

    @Override
    public void initialise() throws InitialisationException {
        this.scheduler = this.schedulerService.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withName("StreamingManager-CursorProviderCollector"));
    }

    @Override
    public void start() throws MuleException {
        try {
            this.taskHandle = this.scheduler.submit(this::bustGhosts);
        }
        catch (RejectedExecutionException e) {
            throw new MuleRuntimeException(e);
        }
        this.stopped = false;
    }

    @Override
    public void stop() throws MuleException {
        this.stopped = true;
        this.taskHandle.cancel(true);
        this.taskHandle = null;
    }

    @Override
    public void dispose() {
        this.scheduler.stop();
    }

    public WeakReference<ManagedCursorProvider> track(ManagedCursorProvider cursorProvider) {
        return this.track(cursorProvider, null);
    }

    public WeakReference<ManagedCursorProvider> track(ManagedCursorProvider cursorProvider, Runnable callOnDispose) {
        return new StreamingWeakReference(cursorProvider, this.referenceQueue, callOnDispose);
    }

    private void bustGhosts() {
        while (!this.stopped && !Thread.currentThread().isInterrupted()) {
            try {
                StreamingWeakReference ghost = (StreamingWeakReference)this.referenceQueue.remove(POLL_INTERVAL);
                if (ghost == null) continue;
                this.bust(ghost);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (!LOGGER.isDebugEnabled()) continue;
                LOGGER.debug("Streaming GC thread was interrupted. Finalizing.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void bust(StreamingWeakReference ghost) {
        try {
            if (CursorManager.STREAMING_VERBOSE) {
                CursorProvider innerDelegate = CursorUtils.unwrap(((StreamingWeakReference)ghost).janitor.provider);
                Optional<ComponentLocation> originatingLocation = ((StreamingWeakReference)ghost).janitor.provider.getOriginatingLocation();
                LOGGER.info("StreamingGhostBuster disposing ghost: {}, provider: {} created by {}", ghost.id, System.identityHashCode(innerDelegate), originatingLocation.map(ComponentLocation::getLocation).orElse("unknown"));
            }
            ghost.dispose();
        }
        catch (Exception e) {
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("Found exception trying to dispose phantom CursorProvider", e);
            }
        }
        finally {
            ghost.clear();
        }
    }

    private class StreamingWeakReference
    extends WeakReference<ManagedCursorProvider> {
        private final int id;
        private final CursorProviderJanitor janitor;
        private boolean clear;
        private final Runnable callOnDispose;

        public StreamingWeakReference(ManagedCursorProvider referent, ReferenceQueue<ManagedCursorProvider> referenceQueue, Runnable callOnDispose) {
            super(referent, referenceQueue);
            this.clear = false;
            this.janitor = referent.getJanitor();
            this.id = referent.getId();
            this.callOnDispose = callOnDispose;
        }

        public void dispose() {
            if (!this.clear) {
                this.clear = true;
                this.janitor.releaseResources();
                if (this.callOnDispose != null) {
                    this.callOnDispose.run();
                }
            }
        }

        @Override
        public ManagedCursorProvider get() {
            return this.clear ? null : (ManagedCursorProvider)super.get();
        }

        @Override
        public void clear() {
            super.clear();
            this.clear = true;
        }
    }
}

