/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.FluxSinkWrapper;
import org.mule.runtime.core.internal.processor.strategy.ReactorSinkProvider;
import reactor.core.publisher.FluxSink;

public abstract class AbstractCachedThreadReactorSinkProvider
implements ReactorSinkProvider {
    private static final int THREAD_CACHE_TIME_LIMIT_IN_MINUTES = 60;
    private static final int TRANSACTION_CACHE_TIME_LIMIT_IN_MINUTES = 10;
    private boolean sinkIndexEnabled;
    private final Cache<Thread, List<FluxSinkWrapper>> sinks = Caffeine.newBuilder().weakKeys().removalListener((String2, coreEventFluxSinkList, removalCause) -> coreEventFluxSinkList.forEach(FluxSinkWrapper::complete)).expireAfterAccess(60L, TimeUnit.MINUTES).build();
    private final Cache<Thread, FluxSink<CoreEvent>> legacySinks = Caffeine.newBuilder().weakKeys().removalListener((thread, coreEventFluxSink, removalCause) -> coreEventFluxSink.complete()).expireAfterAccess(60L, TimeUnit.MINUTES).build();
    private final Cache<Transaction, List<FluxSinkWrapper>> sinksNestedTx = Caffeine.newBuilder().removalListener((transaction, coreEventFluxSinkList, removalCause) -> coreEventFluxSinkList.forEach(FluxSinkWrapper::complete)).expireAfterAccess(10L, TimeUnit.MINUTES).build();
    private final Cache<Transaction, FluxSink<CoreEvent>> legacySinksNestedTx = Caffeine.newBuilder().weakKeys().removalListener((transaction, coreEventFluxSink, removalCause) -> coreEventFluxSink.complete()).expireAfterAccess(10L, TimeUnit.MINUTES).build();

    public AbstractCachedThreadReactorSinkProvider() {
    }

    public AbstractCachedThreadReactorSinkProvider(boolean enabled) {
        this.sinkIndexEnabled = enabled;
    }

    @Override
    public void dispose() {
        this.sinks.asMap().values().forEach(sinkList -> sinkList.forEach(FluxSinkWrapper::complete));
        this.legacySinks.asMap().values().forEach(FluxSink::complete);
        this.sinksNestedTx.asMap().values().forEach(sinkList -> sinkList.forEach(FluxSinkWrapper::complete));
        this.legacySinksNestedTx.asMap().values().forEach(FluxSink::complete);
    }

    protected void invalidateAll() {
        this.sinks.invalidateAll();
        this.legacySinks.invalidateAll();
        this.sinksNestedTx.invalidateAll();
        this.legacySinksNestedTx.invalidateAll();
    }

    @Override
    public FluxSink<CoreEvent> getSink() {
        TransactionCoordination txCoord = TransactionCoordination.getInstance();
        if (txCoord.runningNestedTransaction()) {
            if (this.sinkIndexEnabled) {
                return this.getNestedTxFluxSinkWrapper(txCoord);
            }
            return this.legacySinksNestedTx.get(txCoord.getTransaction(), tx -> this.createSink());
        }
        if (this.sinkIndexEnabled) {
            return this.getSimpleFluxSinkWrapper();
        }
        return this.legacySinks.get(Thread.currentThread(), t2 -> this.createSink());
    }

    private FluxSink<CoreEvent> getNestedTxFluxSinkWrapper(TransactionCoordination txCoord) {
        return this.getOrCreateFluxSinkWrapper(this.sinksNestedTx.get(txCoord.getTransaction(), parameterKey -> new ArrayList()));
    }

    private FluxSinkWrapper getSimpleFluxSinkWrapper() {
        return this.getOrCreateFluxSinkWrapper(this.sinks.get(Thread.currentThread(), parameterKey -> new ArrayList()));
    }

    private FluxSinkWrapper getOrCreateFluxSinkWrapper(List<FluxSinkWrapper> fluxSinkWrapperList) {
        for (FluxSinkWrapper fluxSinkWrapper : fluxSinkWrapperList) {
            if (fluxSinkWrapper.isBeingUsed()) continue;
            return fluxSinkWrapper;
        }
        FluxSinkWrapper fluxSinkWrapper = new FluxSinkWrapper(this.createSink());
        fluxSinkWrapperList.add(fluxSinkWrapper);
        return fluxSinkWrapper;
    }

    protected abstract FluxSink<CoreEvent> createSink();
}

