package com.mulesoft.mule.runtime.module.batch.internal.engine.buffer;

import com.google.common.collect.ImmutableSetMultimap;
import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchEngine;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueDelegate;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueManager;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.internal.engine.transaction.BatchTransactionContextProvider;
import io.qameta.allure.Description;
import io.qameta.allure.Issue;
import java.util.ArrayList;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.context.notification.FlowTraceManager;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.internal.profiling.NoOpProfilingService;
import org.mule.runtime.dsl.api.component.config.DefaultComponentLocation;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.util.MuleContextUtils;

/* loaded from: input_file:com/mulesoft/mule/runtime/module/batch/internal/engine/buffer/StreamingAggregatorBufferTestCase.class */
public class StreamingAggregatorBufferTestCase extends AbstractMuleContextTestCase {
    private static final int BLOCKS = 3;
    private BatchEngine engine;
    private BatchQueueManager queueManager;
    private BatchJobInstanceAdapter jobInstanceAdapter;
    private BatchStepAdapter stepAdapter;
    private final InternalProfilingService profilingService = new NoOpProfilingService();

    @Before
    public void before() {
        muleContext = (MuleContext) Mockito.spy(muleContext);
        Mockito.when(muleContext.getFlowTraceManager()).thenReturn((FlowTraceManager) Mockito.mock(FlowTraceManager.class));
        this.stepAdapter = (BatchStepAdapter) Mockito.mock(BatchStepAdapter.class);
        Mockito.when(this.stepAdapter.getLocation()).thenReturn(DefaultComponentLocation.fromSingleComponent("mockStep"));
        this.jobInstanceAdapter = (BatchJobInstanceAdapter) Mockito.mock(BatchJobInstanceAdapter.class);
        this.queueManager = (BatchQueueManager) Mockito.mock(BatchQueueManager.class);
        this.engine = (BatchEngine) Mockito.mock(BatchEngine.class);
        Mockito.when(this.engine.getBatchQueueManager()).thenReturn(this.queueManager);
    }

    @Test
    @Description("Before this change, the reading from the queue to send the records to the following step was done in a single large transaction. For large data sets, this caused the transaction log of such transaction to grow really huge, effectively causing OOM errors in small machines for large datasets.")
    @Issue("EE-7243")
    public void backToSteppingQueueDoesTxPerBlock() throws MuleException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < BLOCKS; i++) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(new Record(new TypedValue("Block #" + i, DataType.STRING)));
            arrayList.add(arrayList2);
        }
        Spliterator spliterator = arrayList.spliterator();
        ((BatchQueueManager) Mockito.doAnswer(invocationOnMock -> {
            BatchQueueDelegate batchQueueDelegate = (BatchQueueDelegate) Mockito.mock(BatchQueueDelegate.class);
            Mockito.when(Long.valueOf(batchQueueDelegate.size((BatchTransactionContext) ArgumentMatchers.any(BatchTransactionContext.class)))).thenReturn(Long.valueOf(spliterator.estimateSize()));
            Mockito.when(batchQueueDelegate.poll((BatchTransactionContext) ArgumentMatchers.any(BatchTransactionContext.class))).thenAnswer(invocationOnMock -> {
                AtomicReference atomicReference = new AtomicReference();
                spliterator.tryAdvance(list -> {
                    atomicReference.set(list);
                });
                return atomicReference.get();
            });
            return batchQueueDelegate;
        }).when(this.queueManager)).streamingAggregatorOutputQueue(this.jobInstanceAdapter, this.stepAdapter);
        BatchTransactionContext batchTransactionContext = (BatchTransactionContext) Mockito.mock(BatchTransactionContext.class);
        Mockito.when(this.engine.createTransactionContext(this.jobInstanceAdapter)).thenReturn(batchTransactionContext);
        new StreamingAggregatorBuffer(this.engine, this.stepAdapter, coreEvent -> {
            return coreEvent;
        }, MuleContextUtils.getNotificationDispatcher(muleContext), muleContext, this.profilingService.getCoreEventTracer()).makeProcessingTemplate(ImmutableSetMultimap.builder().build(), coreEvent2 -> {
            return coreEvent2;
        }, muleContext).process(this.jobInstanceAdapter, getEventBuilder().message(Message.of(new StreamingAggregatorIteratorWrapper(arrayList.iterator(), (BatchTransactionContextProvider) Mockito.mock(BatchTransactionContextProvider.class), (BatchTransactionContext) Mockito.mock(BatchTransactionContext.class), this.jobInstanceAdapter, (RecordBuffer) Mockito.mock(RecordBuffer.class), 5))).build(), this.profilingService.getCoreEventTracer());
        ((BatchEngine) Mockito.verify(this.engine, Mockito.times(BLOCKS))).updateStatisticsAndRoute((BatchTransactionContext) ArgumentMatchers.any(BatchTransactionContext.class), ArgumentMatchers.anyList());
        ((BatchQueueManager) Mockito.verify(this.queueManager, Mockito.times(4))).streamingAggregatorOutputQueue(this.jobInstanceAdapter, this.stepAdapter);
        ((BatchTransactionContext) Mockito.verify(batchTransactionContext, Mockito.times(4))).commit();
    }
}
