/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.util.queue;

import java.io.Serializable;
import java.util.Collection;
import org.mule.runtime.core.api.transaction.xa.ResourceManagerException;
import org.mule.runtime.core.internal.util.journal.queue.LocalQueueTxJournalEntry;
import org.mule.runtime.core.internal.util.journal.queue.LocalTxQueueTransactionJournal;
import org.mule.runtime.core.internal.util.queue.LocalQueueTransactionContext;
import org.mule.runtime.core.internal.util.queue.QueueProvider;
import org.mule.runtime.core.internal.util.queue.QueueStore;

public class PersistentQueueTransactionContext
implements LocalQueueTransactionContext {
    private static int lastId = 0;
    private final LocalTxQueueTransactionJournal transactionJournal;
    private final QueueProvider queueProvider;
    private int txId;

    public PersistentQueueTransactionContext(LocalTxQueueTransactionJournal localTxQueueTransactionJournal, QueueProvider queueProvider) {
        this.transactionJournal = localTxQueueTransactionJournal;
        this.queueProvider = queueProvider;
        this.txId = PersistentQueueTransactionContext.getNextId();
    }

    private static synchronized int getNextId() {
        int nextId;
        if ((nextId = lastId++) < 0) {
            lastId = 0;
            nextId = 0;
        }
        return nextId;
    }

    @Override
    public boolean offer(QueueStore queue, Serializable item, long offerTimeout) throws InterruptedException {
        this.transactionJournal.logAdd(this.txId, queue, item);
        return true;
    }

    @Override
    public void untake(QueueStore queue, Serializable item) throws InterruptedException {
        this.transactionJournal.logAddFirst(this.txId, queue, item);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clear(QueueStore queue) throws InterruptedException {
        QueueStore queueStore = queue;
        synchronized (queueStore) {
            while (this.poll(queue, 10L) != null) {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Serializable poll(QueueStore queue, long pollTimeout) throws InterruptedException {
        QueueStore queueStore = queue;
        synchronized (queueStore) {
            Serializable value = queue.poll(pollTimeout);
            if (value != null) {
                this.transactionJournal.logRemove(this.txId, queue, value);
            }
            return value;
        }
    }

    @Override
    public Serializable peek(QueueStore queue) throws InterruptedException {
        return queue.peek();
    }

    @Override
    public int size(QueueStore queue) {
        int numberOfElementsAdded = 0;
        Collection logEntries = this.transactionJournal.getLogEntriesForTx(this.txId);
        for (LocalQueueTxJournalEntry logEntry : logEntries) {
            if (!logEntry.getQueueName().equals(queue.getName()) || !logEntry.isAdd() && !logEntry.isAddFirst()) continue;
            ++numberOfElementsAdded;
        }
        return queue.getSize() + numberOfElementsAdded;
    }

    @Override
    public void doCommit() throws ResourceManagerException {
        try {
            Collection logEntries = this.transactionJournal.getLogEntriesForTx(this.txId);
            for (LocalQueueTxJournalEntry entry : logEntries) {
                if (entry.isAdd()) {
                    this.queueProvider.getQueue(entry.getQueueName()).putNow(entry.getValue());
                    continue;
                }
                if (!entry.isAddFirst()) continue;
                this.queueProvider.getQueue(entry.getQueueName()).untake(entry.getValue());
            }
            this.transactionJournal.logCommit(this.txId);
        }
        catch (Exception e) {
            throw new ResourceManagerException(e);
        }
    }

    @Override
    public void doRollback() throws ResourceManagerException {
        Collection logEntries = this.transactionJournal.getLogEntriesForTx(this.txId);
        for (LocalQueueTxJournalEntry entry : logEntries) {
            if (!entry.isRemove()) continue;
            try {
                this.queueProvider.getQueue(entry.getQueueName()).putNow(entry.getValue());
            }
            catch (InterruptedException e) {
                throw new ResourceManagerException(e);
            }
        }
        this.transactionJournal.logRollback(this.txId);
    }
}

