package org.mule.extension.salesforce.internal.source;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import org.cometd.client.BayeuxClient;
import org.mule.extension.salesforce.internal.connection.SOAPConnection;
import org.mule.extension.salesforce.internal.datasense.SObjectMetadataCategoryResolver;
import org.mule.extension.salesforce.internal.service.connection.bayeux.BayeuxSessionEventListener;
import org.mule.extension.salesforce.internal.service.connection.bayeux.SalesforceBayeuxClient;
import org.mule.extension.salesforce.internal.service.connection.bayeux.SalesforceBayeuxReplayMessageListener;
import org.mule.extension.salesforce.internal.service.connection.bayeux.StreamingEventStatus;
import org.mule.extension.salesforce.internal.service.exception.SalesforceException;
import org.mule.extension.salesforce.internal.service.streaming.generic.ReplayOption;
import org.mule.extension.salesforce.internal.service.util.SalesforceUtils;
import org.mule.extension.salesforce.internal.service.validator.dto.ReplayOptionsValidator;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Attributes;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.util.store.ObjectStorePartition;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Streaming;
import org.mule.runtime.extension.api.annotation.metadata.MetadataScope;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Streaming
@MetadataScope(outputResolver = SObjectMetadataCategoryResolver.class)
@Alias("replay-topic")
@EmitsResponse
/* loaded from: input_file:org/mule/extension/salesforce/internal/source/ReplayTopicSource.class */
public class ReplayTopicSource extends Source<Object, Attributes> {
    private static final Logger logger = LoggerFactory.getLogger(ReplayTopicSource.class);
    private static final String STREAMING_OBJECT_STORE = "StreamingObjectStore";
    private static final int TIME_TO_WAIT_FOR_CONNECTION_MS = 5000;

    @Parameter
    private String topic;

    @Parameter
    private ReplayOption replayOption;

    @Optional
    @DisplayName("Replay Id")
    @Parameter
    private String replayId;

    @DisplayName("Resume from the Last Replay Id")
    @Parameter
    private boolean autoReplay;

    @Connection
    private SOAPConnection connection;

    @Inject
    private MuleContext muleContext;
    private Map<String, Long> replayMap = new HashMap();
    private SalesforceBayeuxClient bayeuxClient;

    public void onStop() {
        disconnectBayexClient();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disconnectBayexClient() {
        if (this.bayeuxClient != null) {
            this.bayeuxClient.unsubscribe(getTopic(this.topic));
            this.bayeuxClient.disconnect();
            this.bayeuxClient = null;
        }
    }

    public void onStart(SourceCallback<Object, Attributes> sourceCallback) throws MuleException {
        try {
            doStart(sourceCallback);
        } catch (Exception e) {
            sourceCallback.onSourceException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doStart(SourceCallback<Object, Attributes> sourceCallback) throws SalesforceException {
        String topic = getTopic(this.topic);
        if (readyToSubscribe()) {
            createBayeuxClient(topic, sourceCallback, this.connection.getParamsBundle().getUserName(), this.autoReplay, false);
            installSessionEventListener(sourceCallback, topic);
        }
    }

    private void installSessionEventListener(final SourceCallback<Object, Attributes> sourceCallback, String str) {
        this.bayeuxClient.addConnectionListener(new BayeuxSessionEventListener() { // from class: org.mule.extension.salesforce.internal.source.ReplayTopicSource.1
            @Override // org.mule.extension.salesforce.internal.service.connection.bayeux.BayeuxSessionEventListener
            public void handleSessionDown() throws SalesforceException {
                try {
                    ReplayTopicSource.this.disconnectBayexClient();
                    ReplayTopicSource.this.recreateSOAPConnection();
                    ReplayTopicSource.this.doStart(sourceCallback);
                } catch (ConnectionException e) {
                    throw new SalesforceException((Throwable) e);
                }
            }

            @Override // org.mule.extension.salesforce.internal.service.connection.bayeux.BayeuxSessionEventListener
            public void handleConnectionDown() throws SalesforceException {
                ReplayTopicSource.this.disconnectBayexClient();
                ReplayTopicSource.this.doStart(sourceCallback);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recreateSOAPConnection() throws ConnectionException {
        this.connection = (SOAPConnection) this.connection.getSourceConnectionProvider().connect();
    }

    private String getTopic(String str) {
        return str.startsWith("/") ? "/topic" + str : "/topic/" + str;
    }

    private Long computeReplayId() throws SalesforceException {
        String str = this.replayId;
        if (!ReplayOption.FROM_REPLAY_ID.equals(this.replayOption)) {
            str = this.replayOption.getValue();
        }
        return ReplayOptionsValidator.validateReplayId(SalesforceUtils.evaluateReplayIdExpression(this.muleContext, str));
    }

    public boolean readyToSubscribe() {
        return retrieveSessionId() != null;
    }

    public String retrieveSessionId() {
        if (this.connection.getPartnerConnection() == null || this.connection.getPartnerConnection().getConfig() == null) {
            return null;
        }
        return this.connection.getPartnerConnection().getConfig().getSessionId();
    }

    private void createBayeuxClient(String str, SourceCallback sourceCallback, String str2, boolean z, boolean z2) throws SalesforceException {
        Long computeReplayId = computeReplayId();
        ObjectStore<Serializable> streamingObjectStore = getStreamingObjectStore();
        try {
            cleanupStreamingObjectStore();
        } catch (ObjectStoreException e) {
            logger.error("Exception occured while performing streaming object store cleanup.", e);
        }
        SalesforceBayeuxReplayMessageListener salesforceBayeuxReplayMessageListener = new SalesforceBayeuxReplayMessageListener(sourceCallback, streamingObjectStore, str2);
        this.bayeuxClient = this.connection.getBayeuxClientWithReplay();
        Long l = computeReplayId;
        if (z || z2) {
            l = Long.valueOf(getStartReplayId(streamingObjectStore, str2 + str, computeReplayId));
        }
        if (this.bayeuxClient.getExtensions().isEmpty()) {
            this.replayMap.put(str, l);
            this.bayeuxClient.addExtension(new SalesforceReplayExtension(this.replayMap));
        } else {
            ((SalesforceReplayExtension) this.bayeuxClient.getExtensions().get(0)).getDataMap().put(str, l);
        }
        this.bayeuxClient.handshake();
        if (!this.bayeuxClient.waitFor(5000L, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            throw new SalesforceException("Waiting for Bayeux client connected state expired!");
        }
        this.bayeuxClient.subscribe(str, salesforceBayeuxReplayMessageListener);
    }

    private ObjectStore<Serializable> getStreamingObjectStore() {
        return this.muleContext.getObjectStoreManager().getObjectStore(STREAMING_OBJECT_STORE, true);
    }

    private void cleanupStreamingObjectStore() throws ObjectStoreException {
        String str;
        logger.info("Starting cleanup for streaming object store");
        ObjectStorePartition streamingObjectStore = getStreamingObjectStore();
        if (streamingObjectStore instanceof ObjectStorePartition) {
            synchronized (ReplayTopicSource.class) {
                for (Serializable serializable : streamingObjectStore.allKeys()) {
                    if (serializable instanceof String) {
                        Serializable retrieve = streamingObjectStore.retrieve(serializable);
                        if ((retrieve instanceof Map) && (str = (String) ((Map) retrieve).get("eventTimestamp")) != null && System.currentTimeMillis() - Long.valueOf(str).longValue() > 90000000) {
                            logger.info("Removing old entry: " + retrieve.toString());
                            streamingObjectStore.remove(serializable);
                        }
                    }
                }
            }
        }
    }

    private long getStartReplayId(ObjectStore<Serializable> objectStore, String str, Long l) {
        Map map;
        synchronized (objectStore) {
            try {
                map = (Map) objectStore.retrieve(str);
            } catch (ObjectStoreException e) {
                logger.warn("Streaming entry with key: " + str + " could not be retrieved. Replaying based on selected replay option.", e);
                return l.longValue();
            }
        }
        if (map == null) {
            logger.warn("Streaming entry with key: " + str + " could not be retrieved. Replaying based on selected replay option.");
            return l.longValue();
        }
        String str2 = (String) map.get("eventTimestamp");
        if (str2 == null) {
            logger.error("Timestamp for the last processed event is missing. Replaying based on selected replay option.");
            return l.longValue();
        }
        if (!isDurabilityTimeFrameStillActive(str2)) {
            return l.longValue();
        }
        String str3 = (String) map.get("replayId");
        if (str3 == null) {
            logger.error("Replay id for the last processed event is missing. Replaying based on selected replay option.");
            return l.longValue();
        }
        long parseLong = Long.parseLong(str3);
        return StreamingEventStatus.PROCESSED.toString().equals(map.get("eventStatus")) ? parseLong : parseLong - 1;
    }

    private boolean isDurabilityTimeFrameStillActive(String str) {
        return System.currentTimeMillis() - Long.parseLong(str) < 86400000;
    }
}
