package org.mule.extension.aggregator.internal.privileged.executor;

import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;
import org.mule.extension.aggregator.api.AggregationAttributes;
import org.mule.extension.aggregator.api.AggregatorConstants;
import org.mule.extension.aggregator.internal.config.AggregatorManager;
import org.mule.extension.aggregator.internal.errors.AggregatorError;
import org.mule.extension.aggregator.internal.privileged.CompletionCallbackWrapper;
import org.mule.extension.aggregator.internal.source.AggregatorListener;
import org.mule.extension.aggregator.internal.storage.content.AggregatedContent;
import org.mule.extension.aggregator.internal.storage.info.AggregatorSharedInformation;
import org.mule.extension.aggregator.internal.task.AsyncTask;
import org.mule.runtime.api.cluster.ClusterService;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Lifecycle;
import org.mule.runtime.api.lock.LockFactory;
import org.mule.runtime.api.message.ItemSequenceInfo;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.meta.model.operation.OperationModel;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.time.TimeSupplier;
import org.mule.runtime.api.transformation.TransformationService;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.ComponentExecutor;
import org.mule.runtime.extension.api.runtime.operation.ExecutionContext;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.route.Chain;
import org.mule.runtime.extension.api.runtime.route.Route;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.module.extension.api.runtime.privileged.ExecutionContextAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/extension/aggregator/internal/privileged/executor/AbstractAggregatorExecutor.class */
public abstract class AbstractAggregatorExecutor implements ComponentExecutor<OperationModel>, Lifecycle {
    private static final String AGGREGATORS_MODULE_KEY = "AGGREGATORS";

    @Inject
    @Named("_muleObjectStoreManager")
    private ObjectStoreManager objectStoreManager;

    @Inject
    private SchedulerService schedulerService;

    @Inject
    private AggregatorManager aggregatorManager;

    @Inject
    private LockFactory lockFactory;

    @Inject
    private TimeSupplier timeSupplier;

    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;

    @Inject
    private ClusterService clusterService;

    @Inject
    private TransformationService transformationService;
    private ObjectStore<AggregatorSharedInformation> objectStore;
    private String name;
    private Scheduler scheduler;
    private PrimaryNodeLifecycleNotificationListener notificationListener;
    private AggregatorSharedInformation sharedInfoLocalCopy;
    private LazyValue<ObjectStore<AggregatorSharedInformation>> storage;
    final Logger LOGGER = LoggerFactory.getLogger(getClass());
    private boolean started = false;
    private final Object stoppingLock = new Object();
    private boolean shouldSynchronizeToOS = true;

    /* JADX INFO: Access modifiers changed from: protected */
    public void injectParameters(Map<String, Object> map) {
        this.objectStore = (ObjectStore) map.get("objectStore");
        this.name = (String) map.get("name");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<ItemSequenceInfo> getItemSequenceInfo(ExecutionContext executionContext) {
        return ((ExecutionContextAdapter) executionContext).getEvent().getItemSequenceInfo();
    }

    private Object consumingStream(Object obj) {
        return obj instanceof InputStream ? this.transformationService.transform(obj, DataType.INPUT_STREAM, DataType.BYTE_ARRAY) : obj instanceof CursorStreamProvider ? this.transformationService.transform(obj, DataType.CURSOR_STREAM_PROVIDER, DataType.BYTE_ARRAY) : obj;
    }

    private TypedValue consumingStreams(TypedValue typedValue) {
        Object value = typedValue.getValue();
        MediaType mediaType = typedValue.getDataType().getMediaType();
        Object build = value instanceof Message ? Message.builder((Message) value).payload(consumingStreams(((Message) value).getPayload())).attributes(consumingStreams(((Message) value).getAttributes())).build() : consumingStream(value);
        return new TypedValue(build, DataType.builder().fromObject(build).mediaType(mediaType).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addToStorage(AggregatedContent aggregatedContent, TypedValue typedValue, Optional<ItemSequenceInfo> optional) {
        TypedValue consumingStreams = consumingStreams(typedValue);
        if (optional.isPresent()) {
            aggregatedContent.add(consumingStreams, getCurrentTime(), optional.get().getPosition());
        } else {
            aggregatedContent.add(consumingStreams, getCurrentTime());
        }
    }

    public void initialise() throws InitialisationException {
        LifecycleUtils.initialiseIfNeeded(this.aggregatorManager);
        this.aggregatorManager.registerAggregator(this.name, this::scheduleRegisteredAsyncAggregations);
        this.storage = new LazyValue<>(this::getConfiguredObjectStore);
        this.notificationListener = new PrimaryNodeLifecycleNotificationListener(this, this.notificationListenerRegistry);
        this.notificationListener.register();
    }

    ObjectStore getConfiguredObjectStore() {
        return this.objectStore == null ? this.objectStoreManager.getDefaultPartition() : this.objectStore;
    }

    public void start() throws MuleException {
        if (!this.clusterService.isPrimaryPollingInstance() || this.started) {
            return;
        }
        LifecycleUtils.startIfNeeded(this.objectStore);
        upgradeAggregatedContentIfNeeded();
        setRegisteredAsyncAggregationsAsNotScheduled();
        if (getStorage().isPersistent()) {
            this.scheduler = this.schedulerService.ioScheduler(SchedulerConfig.config().withShutdownTimeout(0L, TimeUnit.MILLISECONDS));
        } else {
            this.scheduler = this.schedulerService.cpuLightScheduler(SchedulerConfig.config().withShutdownTimeout(0L, TimeUnit.MILLISECONDS));
        }
        this.started = true;
    }

    public void stop() throws MuleException {
        synchronized (this.stoppingLock) {
            this.shouldSynchronizeToOS = false;
            this.started = false;
            if (this.scheduler != null) {
                this.scheduler.stop();
                this.scheduler = null;
            }
        }
    }

    public void dispose() {
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void executeRouteWithAggregatedElements(Route route, List<TypedValue> list, AggregationAttributes aggregationAttributes, CompletableFuture<Result<Object, Object>> completableFuture) {
        Chain chain = route.getChain();
        completableFuture.getClass();
        chain.process(list, aggregationAttributes, (v1) -> {
            r3.complete(v1);
        }, (th, result) -> {
            completableFuture.completeExceptionally(th);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finishExecution(CompletableFuture<Result<Object, Object>> completableFuture, CompletionCallbackWrapper completionCallbackWrapper) {
        try {
            completionCallbackWrapper.success(completableFuture.get());
        } catch (InterruptedException e) {
            completionCallbackWrapper.error(e);
        } catch (ExecutionException e2) {
            completionCallbackWrapper.error(e2.getCause());
        }
    }

    private void scheduleRegisteredAsyncAggregations() {
        executeSynchronized(this::doScheduleRegisteredAsyncAggregations);
    }

    abstract boolean doScheduleRegisteredAsyncAggregations();

    private void setRegisteredAsyncAggregationsAsNotScheduled() {
        executeSynchronized(this::doSetRegisteredAsyncAggregationsAsNotScheduled);
    }

    abstract boolean doSetRegisteredAsyncAggregationsAsNotScheduled();

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleTask(AsyncTask asyncTask, Runnable runnable) {
        this.scheduler.schedule(runnable, asyncTask.getDelayTimeUnit().toMillis(asyncTask.getDelay()) - (getCurrentTime().longValue() - asyncTask.getRegisteringTimestamp()), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void evaluateConfiguredDelay(String str, int i, TimeUnit timeUnit) throws ModuleException {
        if (timeUnit.toMillis(i) < this.aggregatorManager.getTaskSchedulingPeriodInMillis()) {
            throw new ModuleException(String.format("The configured %s : %d %s, is too small for the configured scheduling time period: %d MILLISECONDS. %s should be equal or bigger than the scheduling time period in order to accurately schedule it.%s Use %s global-config or %s SystemProperty to change it", str, Integer.valueOf(i), timeUnit, Long.valueOf(this.aggregatorManager.getTaskSchedulingPeriodInMillis()), str, System.lineSeparator(), AggregatorConstants.TASK_SCHEDULING_PERIOD_KEY, AggregatorConstants.TASK_SCHEDULING_PERIOD_SYSTEM_PROPERTY_KEY), AggregatorError.AGGREGATOR_CONFIG);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyListenerOnComplete(List<TypedValue> list, AggregationAttributes aggregationAttributes) {
        getListenerAndExecute(aggregatorListener -> {
            executeListener(aggregatorListener, list, aggregationAttributes);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyListenerOnTimeout(List<TypedValue> list, AggregationAttributes aggregationAttributes) {
        getListenerAndExecute(aggregatorListener -> {
            if (aggregatorListener.shouldIncludeTimedOutGroups()) {
                executeListener(aggregatorListener, list, aggregationAttributes);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    public void executeSynchronized(Supplier<Boolean> supplier) {
        synchronized (this.stoppingLock) {
            if (this.shouldSynchronizeToOS) {
                Lock createLock = this.lockFactory.createLock(getAggregatorKey());
                createLock.lock();
                try {
                    pullSharedInfo();
                    if (supplier.get().booleanValue()) {
                        pushSharedInfo();
                    }
                    createLock.unlock();
                } catch (Throwable th) {
                    createLock.unlock();
                    throw th;
                }
            }
        }
    }

    private String getAggregatorKey() {
        return String.format("%s:%s:%s", AGGREGATORS_MODULE_KEY, doGetAggregatorKey(), this.name);
    }

    abstract String doGetAggregatorKey();

    abstract AggregatorSharedInformation createSharedInfo();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getCurrentTime() {
        return this.timeSupplier.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AggregatorSharedInformation getSharedInfoLocalCopy() {
        return this.sharedInfoLocalCopy;
    }

    private ObjectStore<AggregatorSharedInformation> getStorage() {
        return (ObjectStore) this.storage.get();
    }

    private void pullSharedInfo() throws ModuleException {
        try {
            if (getStorage().contains(getAggregatorKey())) {
                this.sharedInfoLocalCopy = (AggregatorSharedInformation) getStorage().retrieve(getAggregatorKey());
            } else {
                this.sharedInfoLocalCopy = createSharedInfo();
            }
        } catch (ObjectStoreException e) {
            throw new ModuleException("Found error when trying to access ObjectStore", AggregatorError.OBJECT_STORE_ACCESS, e);
        }
    }

    private void pushSharedInfo() throws ModuleException {
        String aggregatorKey = getAggregatorKey();
        try {
            if (getStorage().contains(aggregatorKey)) {
                getStorage().remove(aggregatorKey);
            }
            getStorage().store(aggregatorKey, this.sharedInfoLocalCopy);
        } catch (ObjectStoreException e) {
            throw new ModuleException("Found error when trying to access ObjectStore", AggregatorError.OBJECT_STORE_ACCESS, e);
        }
    }

    private void getListenerAndExecute(Consumer<AggregatorListener> consumer) {
        this.aggregatorManager.getListener(this.name).ifPresent(consumer);
    }

    private void executeListener(AggregatorListener aggregatorListener, List<TypedValue> list, AggregationAttributes aggregationAttributes) {
        if (aggregatorListener.isStarted()) {
            SourceCallback callback = aggregatorListener.getCallback();
            SourceCallbackContext createContext = callback.createContext();
            createContext.setCorrelationId(aggregationAttributes.getAggregationId());
            callback.handle(Result.builder().output(list).attributes(aggregationAttributes).build(), createContext);
        }
    }

    @Deprecated
    private void upgradeAggregatedContentIfNeeded() {
        executeSynchronized(() -> {
            return Boolean.valueOf(this.sharedInfoLocalCopy.upgradeIfNeeded());
        });
    }
}
