package com.mulesoft.module.batch.engine.buffer;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.mulesoft.module.batch.engine.BatchEngine;
import com.mulesoft.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.module.batch.engine.BatchProcessingTemplate;
import com.mulesoft.module.batch.engine.BatchStepAdapter;
import com.mulesoft.module.batch.engine.queue.BatchQueueManager;
import com.mulesoft.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.module.batch.engine.transaction.BatchTransactionContextProvider;
import com.mulesoft.module.batch.engine.transaction.ManagedBatchTransactionContextProvider;
import com.mulesoft.module.batch.record.BatchUtils;
import com.mulesoft.module.batch.record.Record;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.mule.api.Closeable;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.MuleRuntimeException;
import org.mule.api.processor.MessageProcessor;
import org.mule.streaming.ProvidesTotalHint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:mule/lib/mule/mule-module-batch-ee-3.7.1.jar:com/mulesoft/module/batch/engine/buffer/StreamingCommitBuffer.class */
public class StreamingCommitBuffer extends CommitRecordBuffer {
    private static final Logger logger = LoggerFactory.getLogger(StreamingCommitBuffer.class);
    private final RecordBuffer inputQueueBuffer;
    private final RecordBuffer outputQueueBuffer;
    private BatchQueueManager queueManager;
    private Set<BatchJobInstanceAdapter> flushingInstances;
    private BatchTransactionContextProvider inputContexts;
    private BatchTransactionContextProvider outputContexts;
    private BatchTransactionContextProvider steppingContexts;

    /* loaded from: input_file:mule/lib/mule/mule-module-batch-ee-3.7.1.jar:com/mulesoft/module/batch/engine/buffer/StreamingCommitBuffer$IteratorWrapper.class */
    private class IteratorWrapper implements Iterator<Object>, ProvidesTotalHint, Closeable {
        private final Iterator<List<Record>> wrapped;
        private final BatchJobInstanceAdapter jobInstance;
        private Iterator<Record> block;

        private IteratorWrapper(Iterator<List<Record>> it, BatchJobInstanceAdapter batchJobInstanceAdapter) {
            this.block = null;
            this.wrapped = it;
            this.jobInstance = batchJobInstanceAdapter;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.block != null ? this.block.hasNext() : this.wrapped.hasNext();
        }

        @Override // java.util.Iterator
        public Object next() {
            if (this.block == null) {
                this.block = this.wrapped.next().iterator();
            }
            Record next = this.block.next();
            if (!this.block.hasNext()) {
                this.block = null;
            }
            StreamingCommitBuffer.this.addToBuffer(this.jobInstance, StreamingCommitBuffer.this.outputQueueBuffer, null, StreamingCommitBuffer.this.outputContexts, next);
            return next.getPayload();
        }

        @Override // org.mule.api.Closeable
        public void close() throws MuleException {
            if (this.wrapped instanceof Closeable) {
                ((Closeable) this.wrapped).close();
            }
        }

        @Override // java.util.Iterator
        public void remove() {
            this.wrapped.remove();
        }

        @Override // org.mule.streaming.ProvidesTotalHint
        public int size() {
            if (this.wrapped instanceof ProvidesTotalHint) {
                return ((ProvidesTotalHint) this.wrapped).size() * StreamingCommitBuffer.this.batchEngine.getBlockSize();
            }
            return -1;
        }

        /* synthetic */ IteratorWrapper(StreamingCommitBuffer streamingCommitBuffer, Iterator it, BatchJobInstanceAdapter batchJobInstanceAdapter, IteratorWrapper iteratorWrapper) {
            this(it, batchJobInstanceAdapter);
        }
    }

    public StreamingCommitBuffer(BatchEngine batchEngine, BatchStepAdapter batchStepAdapter, MessageProcessor messageProcessor, MuleContext muleContext) {
        super(String.format("batch-step-%s-streaming-commit-buffer", batchStepAdapter.getName()), 0, batchEngine, batchStepAdapter, messageProcessor, muleContext);
        this.flushingInstances = Collections.newSetFromMap(new ConcurrentHashMap());
        this.queueManager = batchEngine.getBatchQueueManager();
        this.inputQueueBuffer = new StreamingCommitInputQueueBuffer(batchEngine, batchStepAdapter, muleContext);
        this.outputQueueBuffer = new StreamingCommitOutputQueueBuffer(batchEngine, batchStepAdapter, muleContext);
        this.inputContexts = newProvider();
        this.outputContexts = newProvider();
        this.steppingContexts = newProvider();
    }

    @Override // com.mulesoft.module.batch.engine.buffer.InMemoryRecordBuffer, com.mulesoft.module.batch.engine.buffer.RecordBuffer
    public int add(BatchJobInstanceAdapter batchJobInstanceAdapter, BatchTransactionContext batchTransactionContext, Record record) {
        return addToBuffer(batchJobInstanceAdapter, this.inputQueueBuffer, batchTransactionContext, this.inputContexts, record);
    }

    @Override // com.mulesoft.module.batch.engine.buffer.InMemoryRecordBuffer, com.mulesoft.module.batch.engine.buffer.RecordBuffer
    public void forget(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        this.inputQueueBuffer.flushAndForget(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.module.batch.engine.buffer.InMemoryRecordBuffer, com.mulesoft.module.batch.engine.buffer.RecordBuffer
    public void flushAndForget(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        flush(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.module.batch.engine.buffer.InMemoryRecordBuffer, com.mulesoft.module.batch.engine.buffer.RecordBuffer
    public void flush(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        if (this.flushingInstances.add(batchJobInstanceAdapter)) {
            try {
                forget(batchJobInstanceAdapter);
                doFlush(batchJobInstanceAdapter, ArrayListMultimap.create());
            } finally {
                this.flushingInstances.remove(batchJobInstanceAdapter);
            }
        }
    }

    @Override // com.mulesoft.module.batch.engine.buffer.InMemoryRecordBuffer, com.mulesoft.module.batch.engine.buffer.RecordBuffer
    public synchronized long size(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        try {
            return this.inputQueueBuffer.size(batchJobInstanceAdapter) + (this.queueManager.streamingCommitInputQueue(batchJobInstanceAdapter, this.step).size(this.inputContexts.get(batchJobInstanceAdapter)) * this.batchEngine.getBlockSize());
        } catch (MuleException e) {
            throw new MuleRuntimeException(e);
        }
    }

    @Override // com.mulesoft.module.batch.engine.buffer.CommitRecordBuffer
    protected Object makeChainPayload(BatchJobInstanceAdapter batchJobInstanceAdapter, Multimap<BatchTransactionContext, Record> multimap) throws MuleException {
        return new IteratorWrapper(this, this.queueManager.streamingCommitInputQueue(batchJobInstanceAdapter, this.step).iterator(this.inputContexts.get(batchJobInstanceAdapter)), batchJobInstanceAdapter, null);
    }

    private void flushOutputBuffer(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        this.outputQueueBuffer.flushAndForget(batchJobInstanceAdapter);
    }

    private void backToSteppingQueue(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc) throws MuleException {
        Iterator<List<Record>> it = this.queueManager.streamingCommitOutputQueue(batchJobInstanceAdapter, this.step).iterator(this.outputContexts.get(batchJobInstanceAdapter));
        while (it.hasNext()) {
            try {
                updateStatisticsAndRoute(batchJobInstanceAdapter, it.next(), exc);
            } finally {
                if (it instanceof Closeable) {
                    ((Closeable) it).close();
                }
            }
        }
    }

    private void updateStatisticsAndRoute(BatchJobInstanceAdapter batchJobInstanceAdapter, List<Record> list, Exception exc) throws MuleException {
        if (exc != null) {
            markError(batchJobInstanceAdapter, list, exc);
        }
        this.batchEngine.updateStatisticsAndRoute(this.steppingContexts.get(batchJobInstanceAdapter), list);
        list.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5, types: [int] */
    public int addToBuffer(BatchJobInstanceAdapter batchJobInstanceAdapter, RecordBuffer recordBuffer, BatchTransactionContext batchTransactionContext, BatchTransactionContextProvider batchTransactionContextProvider, Record record) {
        ?? r0 = recordBuffer;
        synchronized (r0) {
            int add = recordBuffer.add(batchJobInstanceAdapter, batchTransactionContextProvider.get(batchJobInstanceAdapter), record);
            if (batchTransactionContext != null) {
                batchTransactionContext.ackSingleRecordProcessed();
                if (batchTransactionContext.isBlockCompleted()) {
                    BatchUtils.commit(batchTransactionContext);
                }
            }
            r0 = add;
        }
        return r0;
    }

    @Override // com.mulesoft.module.batch.engine.buffer.CommitRecordBuffer
    protected BatchProcessingTemplate makeProcessingTemplate(Multimap<BatchTransactionContext, Record> multimap, MessageProcessor messageProcessor, MuleContext muleContext) {
        return new BatchProcessingTemplate(messageProcessor, muleContext) { // from class: com.mulesoft.module.batch.engine.buffer.StreamingCommitBuffer.1
            private Iterator<List<Record>> originalPayload;

            @Override // com.mulesoft.module.batch.engine.BatchProcessingTemplate
            public MuleEvent process(BatchJobInstanceAdapter batchJobInstanceAdapter, MuleEvent muleEvent) throws MuleException {
                this.originalPayload = (Iterator) muleEvent.getMessage().getPayload();
                return super.process(batchJobInstanceAdapter, muleEvent);
            }

            @Override // com.mulesoft.module.batch.engine.BatchProcessingTemplate
            protected void onSuccess(BatchJobInstanceAdapter batchJobInstanceAdapter, MuleEvent muleEvent, MuleEvent muleEvent2) throws MuleException {
                StreamingCommitBuffer.this.route(batchJobInstanceAdapter, this.originalPayload, null);
            }

            @Override // com.mulesoft.module.batch.engine.BatchProcessingTemplate
            protected void onException(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc, MuleEvent muleEvent, MuleEvent muleEvent2) throws MuleException {
                if (StreamingCommitBuffer.logger.isDebugEnabled()) {
                    StreamingCommitBuffer.logger.debug(String.format("Exception was found processing streaming commit on step %s for instance %s of job %s", StreamingCommitBuffer.this.step.getName(), batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()), (Throwable) exc);
                }
                StreamingCommitBuffer.this.route(batchJobInstanceAdapter, this.originalPayload, exc);
            }
        };
    }

    private void consumePayload(Iterator<List<Record>> it) {
        while (it.hasNext()) {
            it.next();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void route(BatchJobInstanceAdapter batchJobInstanceAdapter, Iterator<List<Record>> it, Exception exc) throws MuleException {
        consumePayload(it);
        flushOutputBuffer(batchJobInstanceAdapter);
        backToSteppingQueue(batchJobInstanceAdapter, exc);
    }

    private BatchTransactionContextProvider newProvider() {
        return new ManagedBatchTransactionContextProvider(this.batchEngine, true);
    }
}
