package com.mulesoft.module.batch.engine.threading;

import com.mulesoft.module.batch.api.BatchJobInstanceStatus;
import com.mulesoft.module.batch.engine.BatchEngine;
import com.mulesoft.module.batch.engine.BatchJobAdapter;
import com.mulesoft.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.module.batch.engine.BatchStepAdapter;
import com.mulesoft.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.module.batch.record.BatchUtils;
import com.mulesoft.module.batch.record.Record;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import javax.resource.spi.work.Work;
import org.mule.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:mule/lib/mule/mule-module-batch-ee-3.7.1.jar:com/mulesoft/module/batch/engine/threading/BatchRecordWork.class */
public class BatchRecordWork implements Work {
    private static final Logger logger = LoggerFactory.getLogger(BatchRecordWork.class);
    private final BatchEngine batchEngine;
    private final BatchJobAdapter job;
    private final BatchTransactionContext ctx;
    private boolean shouldRollback;

    public BatchRecordWork(BatchEngine batchEngine, BatchJobAdapter batchJobAdapter, BatchTransactionContext batchTransactionContext) {
        this.batchEngine = batchEngine;
        this.job = batchJobAdapter;
        this.ctx = batchTransactionContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.shouldRollback = true;
        try {
            try {
                this.ctx.beginTransaction();
                List<Record> poll = this.batchEngine.getBatchQueueManager().steppingQueue(this.ctx.getJobInstance()).poll(this.ctx);
                if (CollectionUtils.isEmpty(poll)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("Stepping queue found empty for instance '%s' of job '%s'", this.ctx.getJobInstance().getId(), this.ctx.getJob().getName()));
                    }
                    if (this.shouldRollback) {
                        BatchUtils.rollback(this.ctx);
                        return;
                    }
                    return;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Just polled %d records from stepping queue of instance '%s' of job '%s'. %d blocks still remaining", Integer.valueOf(poll.size()), this.ctx.getJobInstance().getId(), this.ctx.getJob().getName(), Long.valueOf(this.batchEngine.getBatchQueueManager().steppingQueue(this.ctx.getJobInstance()).size(this.ctx))));
                }
                this.ctx.setBlockSize(poll.size());
                ArrayList arrayList = new ArrayList(poll.size());
                HashSet<BatchStepAdapter> hashSet = new HashSet();
                for (Record record : poll) {
                    BatchStepAdapter stepById = this.job.getStepById(record.getCurrentStepId());
                    hashSet.add(stepById);
                    Record onRecord = stepById.onRecord(record, this.ctx);
                    if (onRecord != null) {
                        arrayList.add(onRecord);
                    }
                }
                BatchJobInstanceAdapter updateStatisticsAndRoute = this.batchEngine.updateStatisticsAndRoute(this.ctx, arrayList);
                for (BatchStepAdapter batchStepAdapter : hashSet) {
                    if (!batchStepAdapter.finishIfCompleted(updateStatisticsAndRoute) && (updateStatisticsAndRoute.getStatus().isFailure() || updateStatisticsAndRoute.getStatus() == BatchJobInstanceStatus.STOPPED)) {
                        batchStepAdapter.releaseResources(updateStatisticsAndRoute);
                    }
                }
                this.shouldRollback = false;
                if (this.shouldRollback) {
                    BatchUtils.rollback(this.ctx);
                }
            } catch (OutOfMemoryError e) {
                logger.error(String.format("Ran out of memory while processing instance of batch job '%s'. Increase heap memory or reduce job's thread count", this.job.getName()), (Throwable) e);
                if (this.shouldRollback) {
                    BatchUtils.rollback(this.ctx);
                }
            } catch (Throwable th) {
                logger.error(String.format("Exception found while processing block '%s' for instance '%s' of batch job '%s'. Records will be queued back", this.ctx.getId(), this.ctx.getJobInstance().getId(), this.job.getName()), th);
                if (this.shouldRollback) {
                    BatchUtils.rollback(this.ctx);
                }
            }
        } catch (Throwable th2) {
            if (this.shouldRollback) {
                BatchUtils.rollback(this.ctx);
            }
            throw th2;
        }
    }

    @Override // javax.resource.spi.work.Work
    public void release() {
    }
}
