package com.mulesoft.mule.runtime.cache.internal;

import com.mulesoft.mule.runtime.cache.api.CachingStrategy;
import com.mulesoft.mule.runtime.cache.api.key.MuleEventKeyGenerator;
import com.mulesoft.mule.runtime.cache.api.response.MuleEventCopier;
import com.mulesoft.mule.runtime.cache.api.response.ResponseGenerator;
import com.mulesoft.mule.runtime.cache.internal.eventcopier.DefaultMuleEventCopier;
import com.mulesoft.mule.runtime.cache.internal.keygenerator.SHA256MuleEventKeyGenerator;
import com.mulesoft.mule.runtime.cache.internal.responsegenerator.DefaultResponseGenerator;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import javax.inject.Inject;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Lifecycle;
import org.mule.runtime.api.lock.LockFactory;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.context.MuleContextAware;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.util.StreamingUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/mulesoft/mule/runtime/cache/internal/AbstractCachingStrategy.class */
public abstract class AbstractCachingStrategy extends AbstractComponent implements CachingStrategy, MuleContextAware, Lifecycle {
    private static final Lock NO_OP_LOCK = new Lock() { // from class: com.mulesoft.mule.runtime.cache.internal.AbstractCachingStrategy.1
        @Override // java.util.concurrent.locks.Lock
        public void unlock() {
        }

        @Override // java.util.concurrent.locks.Lock
        public boolean tryLock(long j, TimeUnit timeUnit) throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException();
            }
            return true;
        }

        @Override // java.util.concurrent.locks.Lock
        public boolean tryLock() {
            return true;
        }

        @Override // java.util.concurrent.locks.Lock
        public Condition newCondition() {
            throw new UnsupportedOperationException("newCondition() not supported");
        }

        @Override // java.util.concurrent.locks.Lock
        public void lockInterruptibly() throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException();
            }
        }

        @Override // java.util.concurrent.locks.Lock
        public void lock() {
        }
    };
    private String name;
    private LockFactory lockFactory;

    @Inject
    private SchedulerService schedulerService;
    private Scheduler lockScheduler;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    private MuleEventKeyGenerator keyGenerator = new SHA256MuleEventKeyGenerator();
    private ResponseGenerator responseGenerator = new DefaultResponseGenerator();
    private MuleEventCopier muleEventCopier = new DefaultMuleEventCopier();
    private boolean synchronizedAccess = true;

    public void start() throws MuleException {
        this.lockScheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withName("cachingStrategy_" + this.name));
    }

    public void stop() throws MuleException {
        if (this.lockScheduler != null) {
            this.lockScheduler.stop();
        }
    }

    @Override // com.mulesoft.mule.runtime.cache.api.CachingStrategy
    public CoreEvent process(CoreEvent coreEvent, Processor processor) throws MuleException {
        try {
            return (CoreEvent) Mono.just(coreEvent).transform(transformProcessor(processor)).block();
        } catch (Throwable th) {
            throw Exceptions.rxExceptionToMuleException(th);
        }
    }

    @Override // com.mulesoft.mule.runtime.cache.api.CachingStrategy
    public ReactiveProcessor transformProcessor(Processor processor) {
        return publisher -> {
            return Flux.from(publisher).flatMap(coreEvent -> {
                if (coreEvent.getMessage().getPayload().getDataType().isStreamType()) {
                    this.logger.warn("Message will be processed without cache: payload is consumable");
                    return Mono.just(coreEvent).transform(processor);
                }
                try {
                    return Mono.just(coreEvent).transform(processMessageWithCache(this.keyGenerator.generateKey(coreEvent), processor));
                } catch (Exception e) {
                    this.logger.warn("Message will be processed without cache: key generation error", e);
                    return Mono.just(coreEvent).transform(processor);
                }
            });
        };
    }

    private Function<Publisher<CoreEvent>, Publisher<CoreEvent>> processMessageWithCache(String str, Processor processor) {
        return publisher -> {
            return Flux.from(publisher).flatMap(coreEvent -> {
                CoreEvent lookupEventInCache = lookupEventInCache(str);
                if (lookupEventInCache != null) {
                    return Mono.just(createResponseEvent(coreEvent, lookupEventInCache));
                }
                Lock lock = getLock(str);
                if (lock.tryLock()) {
                    try {
                        return cacheAwareProcess(str, processor, coreEvent);
                    } finally {
                    }
                }
                if (this.schedulerService.isCurrentThreadForCpuWork()) {
                    Objects.requireNonNull(getLockScheduler());
                    return Mono.just(coreEvent).publishOn(Schedulers.fromExecutorService(getLockScheduler())).flatMap(coreEvent -> {
                        lock.lock();
                        try {
                            return cacheAwareProcess(str, processor, coreEvent);
                        } finally {
                            lock.unlock();
                        }
                    });
                }
                lock.lock();
                try {
                    return cacheAwareProcess(str, processor, coreEvent);
                } finally {
                }
            });
        };
    }

    private Mono<? extends CoreEvent> cacheAwareProcess(String str, Processor processor, CoreEvent coreEvent) {
        CoreEvent lookupEventInCache = lookupEventInCache(str);
        return lookupEventInCache != null ? Mono.just(createResponseEvent(coreEvent, lookupEventInCache)) : isSynchronizedAccess() ? Mono.just(coreEvent).publishOn(Schedulers.fromExecutorService(getLockScheduler())).map(coreEvent2 -> {
            Lock lock = getLock(str);
            lock.lock();
            try {
                return (CoreEvent) Mono.just(coreEvent).transform(processor).doOnNext(Exceptions.checkedConsumer(coreEvent2 -> {
                    storeResponse(coreEvent2, str);
                })).block();
            } finally {
                lock.unlock();
            }
        }) : Mono.just(coreEvent).transform(processor).doOnNext(Exceptions.checkedConsumer(coreEvent3 -> {
            storeResponse(coreEvent3, str);
        }));
    }

    private CoreEvent createResponseEvent(CoreEvent coreEvent, CoreEvent coreEvent2) {
        return this.responseGenerator.create(coreEvent, this.muleEventCopier.createEventCopy(coreEvent2, coreEvent.getContext()));
    }

    private void storeResponse(CoreEvent coreEvent, String str) {
        boolean z;
        if (coreEvent != null) {
            z = !coreEvent.getMessage().getPayload().getDataType().isStreamType();
        } else {
            z = true;
        }
        if (z) {
            store(str, this.muleEventCopier.createEventCopy(StreamingUtils.consumeRepeatablePayload(coreEvent), coreEvent.getContext()));
        }
    }

    private CoreEvent lookupEventInCache(String str) {
        CoreEvent retrieve = retrieve(str);
        if (this.logger.isDebugEnabled()) {
            if (retrieve != null) {
                this.logger.debug("Cache hit for key: " + str + " Event: " + retrieve);
            } else {
                this.logger.debug("Cache miss for key: " + str);
            }
        }
        return retrieve;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Lock getLock(Serializable serializable) {
        Lock lock;
        if (!isSynchronizedAccess()) {
            lock = NO_OP_LOCK;
        } else {
            if (serializable != null && !(serializable instanceof String)) {
                throw new IllegalArgumentException(String.format("Cannot synchronize cache key. Key  must be '%s' but was '%s'", String.class.getName(), serializable.getClass().getName()));
            }
            lock = getLockFactory().createLock((String) serializable);
        }
        return lock;
    }

    protected abstract void store(String str, CoreEvent coreEvent);

    protected abstract CoreEvent retrieve(String str);

    public MuleEventKeyGenerator getKeyGenerator() {
        return this.keyGenerator;
    }

    public void setKeyGenerator(MuleEventKeyGenerator muleEventKeyGenerator) {
        this.keyGenerator = muleEventKeyGenerator;
    }

    public ResponseGenerator getResponseGenerator() {
        return this.responseGenerator;
    }

    public void setResponseGenerator(ResponseGenerator responseGenerator) {
        this.responseGenerator = responseGenerator;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public MuleEventCopier getMuleEventCopier() {
        return this.muleEventCopier;
    }

    public void setMuleEventCopier(MuleEventCopier muleEventCopier) {
        this.muleEventCopier = muleEventCopier;
    }

    public void setSynchronizedAccess(boolean z) {
        this.synchronizedAccess = z;
    }

    public boolean isSynchronizedAccess() {
        return this.synchronizedAccess;
    }

    public LockFactory getLockFactory() {
        return this.lockFactory;
    }

    @Inject
    public void setLockFactory(LockFactory lockFactory) {
        this.lockFactory = lockFactory;
    }

    public void setMuleContext(MuleContext muleContext) {
        if (this.keyGenerator instanceof MuleContextAware) {
            this.keyGenerator.setMuleContext(muleContext);
        }
    }

    public void setSchedulerService(SchedulerService schedulerService) {
        this.schedulerService = schedulerService;
    }

    protected Scheduler getLockScheduler() {
        return this.lockScheduler;
    }
}
