package org.mule.runtime.core.internal.streaming;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.streaming.Cursor;
import org.mule.runtime.api.streaming.CursorProvider;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.InternalEventContext;
import org.mule.runtime.core.internal.streaming.bytes.ManagedCursorStreamProvider;
import org.mule.runtime.core.internal.streaming.object.ManagedCursorIteratorProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/internal/streaming/CursorManager.class */
public class CursorManager {
    private static Logger LOGGER = LoggerFactory.getLogger((Class<?>) CursorManager.class);
    private final LoadingCache<String, EventStreamingState> registry = CacheBuilder.newBuilder().removalListener(removalNotification -> {
        ((EventStreamingState) removalNotification.getValue()).dispose();
    }).build(new CacheLoader<String, EventStreamingState>() { // from class: org.mule.runtime.core.internal.streaming.CursorManager.1
        @Override // com.google.common.cache.CacheLoader
        public EventStreamingState load(String str) throws Exception {
            return new EventStreamingState();
        }
    });
    private MutableStreamingStatistics statistics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/streaming/CursorManager$EventStreamingState.class */
    public class EventStreamingState {
        private boolean disposed;
        private final LoadingCache<CursorProvider, List<Cursor>> cursors;

        private EventStreamingState() {
            this.disposed = false;
            this.cursors = CacheBuilder.newBuilder().build(new CacheLoader<CursorProvider, List<Cursor>>() { // from class: org.mule.runtime.core.internal.streaming.CursorManager.EventStreamingState.1
                @Override // com.google.common.cache.CacheLoader
                public List<Cursor> load(CursorProvider cursorProvider) throws Exception {
                    CursorManager.this.statistics.incrementOpenProviders();
                    return new LinkedList();
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void addProvider(CursorProvider cursorProvider) {
            this.cursors.getUnchecked(cursorProvider);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addCursor(CursorProvider cursorProvider, Cursor cursor) {
            this.cursors.getUnchecked(cursorProvider).add(cursor);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean removeCursor(CursorProvider cursorProvider, Cursor cursor) {
            List<Cursor> unchecked = this.cursors.getUnchecked(cursorProvider);
            if (unchecked.remove(cursor)) {
                CursorManager.this.statistics.decrementOpenCursors();
            }
            if (!unchecked.isEmpty() || !cursorProvider.isClosed()) {
                return false;
            }
            dispose();
            this.cursors.invalidate(cursorProvider);
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void dispose() {
            if (this.disposed) {
                return;
            }
            this.cursors.asMap().forEach((cursorProvider, list) -> {
                try {
                    closeProvider(cursorProvider);
                    releaseAll(list);
                } finally {
                    cursorProvider.releaseResources();
                }
            });
            this.disposed = true;
        }

        private void releaseAll(List<Cursor> list) {
            list.forEach(cursor -> {
                try {
                    cursor.release();
                    CursorManager.this.statistics.decrementOpenCursors();
                } catch (Exception e) {
                    CursorManager.LOGGER.warn("Exception was found trying to close cursor. Execution will continue", (Throwable) e);
                }
            });
        }

        private void closeProvider(CursorProvider cursorProvider) {
            if (cursorProvider.isClosed()) {
                return;
            }
            cursorProvider.close();
            CursorManager.this.statistics.decrementOpenProviders();
        }
    }

    public CursorManager(MutableStreamingStatistics mutableStreamingStatistics) {
        this.statistics = mutableStreamingStatistics;
    }

    public CursorProvider manage(CursorProvider cursorProvider, InternalEvent internalEvent) {
        InternalEventContext root = getRoot(internalEvent.getContext());
        registerEventContext(root);
        this.registry.getUnchecked(root.getId()).addProvider(cursorProvider);
        CursorContext cursorContext = new CursorContext(cursorProvider, root);
        if (cursorProvider instanceof CursorStreamProvider) {
            return new ManagedCursorStreamProvider(cursorContext, this);
        }
        if (cursorProvider instanceof CursorIteratorProvider) {
            return new ManagedCursorIteratorProvider(cursorContext, this);
        }
        throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("Unknown cursor provider type: " + cursorContext.getClass().getName()));
    }

    public void onOpen(Cursor cursor, CursorContext cursorContext) {
        this.registry.getUnchecked(cursorContext.getOwnerContext().getId()).addCursor(cursorContext.getCursorProvider(), cursor);
        this.statistics.incrementOpenCursors();
    }

    public void onClose(Cursor cursor, CursorContext cursorContext) {
        String id = cursorContext.getOwnerContext().getId();
        EventStreamingState ifPresent = this.registry.getIfPresent(id);
        if (ifPresent == null || !ifPresent.removeCursor(cursorContext.getCursorProvider(), cursor)) {
            return;
        }
        ifPresent.dispose();
        this.registry.invalidate(id);
    }

    private void terminated(InternalEventContext internalEventContext) {
        EventStreamingState ifPresent = this.registry.getIfPresent(internalEventContext.getId());
        if (ifPresent != null) {
            ifPresent.dispose();
            this.registry.invalidate(internalEventContext.getId());
        }
    }

    private void registerEventContext(InternalEventContext internalEventContext) {
        Mono.from(internalEventContext.getCompletionPublisher()).subscribe((Consumer) null, (Consumer) null, () -> {
            terminated(internalEventContext);
        });
    }

    private InternalEventContext getRoot(InternalEventContext internalEventContext) {
        return (InternalEventContext) internalEventContext.getParentContext().map(this::getRoot).orElse(internalEventContext);
    }
}
