package com.mulesoft.mule.runtime.module.batch.internal;

import com.mulesoft.mule.runtime.module.batch.api.BatchStep;
import com.mulesoft.mule.runtime.module.batch.api.extension.record.AcceptRecordPolicy;
import com.mulesoft.mule.runtime.module.batch.api.extension.structure.BatchJobInstance;
import com.mulesoft.mule.runtime.module.batch.api.extension.structure.BatchJobInstanceStatus;
import com.mulesoft.mule.runtime.module.batch.api.notification.BatchNotification;
import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchEngine;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.exception.ExceptionHistory;
import com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate;
import com.mulesoft.mule.runtime.module.batch.internal.engine.RecordCompletableFuture;
import com.mulesoft.mule.runtime.module.batch.internal.record.RecordFilteringStrategy;
import com.mulesoft.mule.runtime.module.batch.reporting.StepExceptionReporter;
import com.mulesoft.mule.runtime.module.batch.util.BatchUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.inject.Inject;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.notification.EnrichedNotificationInfo;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.profiling.tracing.Span;
import org.mule.runtime.core.api.context.notification.FlowTraceManager;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.AbstractMessageProcessorOwner;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.runtime.tracer.api.EventTracer;
import org.mule.runtime.tracer.api.component.ComponentTracerFactory;
import org.mule.runtime.tracer.api.context.getter.MapDistributedTraceContextGetter;
import org.mule.runtime.tracer.customization.api.InitialSpanInfoProvider;
import org.mule.runtime.tracer.impl.span.InternalSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/mule/runtime/module/batch/internal/DefaultBatchStep.class */
public class DefaultBatchStep extends AbstractMessageProcessorOwner implements BatchStepAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBatchStep.class);
    public static final String BATCH_STEP_SPAN_NAME = "batch-step";
    public static final String BATCH_STEP_RECORD_SPAN_NAME = "batch-step-record";

    @Inject
    private ExpressionManager expressionManager;

    @Inject
    private ConfigurationComponentLocator locator;

    @Inject
    private NotificationDispatcher notificationDispatcher;

    @Inject
    private InternalProfilingService internalProfilingService;

    @Inject
    private ComponentTracerFactory componentTracerFactory;

    @Inject
    private InitialSpanInfoProvider initialSpanInfoProvider;
    private String name;
    private DefaultBatchStepAggregator batchStepAggregator;
    private String acceptExpression;
    private BatchStep nextStep;
    private BatchProcessingTemplate processingTemplate;
    private MessageProcessorChain messageProcessorChain;
    private RecordFilteringStrategy recordFilteringStrategy;
    private BatchEngine batchEngine;
    private BatchJobAdapter job;
    private EventTracer<CoreEvent> coreEventTracer;
    private Span stepSpan;
    private AcceptRecordPolicy acceptPolicy = AcceptRecordPolicy.NO_FAILURES;
    private StepExceptionReporter exceptionReporter = new StepExceptionReporter(this, LOGGER);
    private boolean last = false;
    private List<Processor> messageProcessors = Collections.emptyList();
    private Map<String, String> serializedBatchStepSpan = Collections.emptyMap();
    private final Boolean lock = false;
    private String processingBatchJobInstanceId = null;

    public void initialise() throws InitialisationException {
        this.messageProcessorChain = MessageProcessors.newChain(Optional.empty(), Collections.emptyList(), this.componentTracerFactory.fromComponent(this, BATCH_STEP_RECORD_SPAN_NAME, ""));
        try {
            this.coreEventTracer = this.internalProfilingService.getCoreEventTracer();
            this.messageProcessorChain = MessageProcessors.newChain(MessageProcessors.getProcessingStrategy(this.locator, this), this.messageProcessors, this.componentTracerFactory.fromComponent(this, BATCH_STEP_RECORD_SPAN_NAME, ""));
            super.initialise();
            LifecycleUtils.initialiseIfNeeded(this.expressionManager);
            this.recordFilteringStrategy = createFilteringStrategy();
            initStepAggregator();
            this.batchEngine.addBatchProcessingListener(this.job, new BaseBatchProcessingListener() { // from class: com.mulesoft.mule.runtime.module.batch.internal.DefaultBatchStep.1
                @Override // com.mulesoft.mule.runtime.module.batch.internal.BaseBatchProcessingListener, com.mulesoft.mule.runtime.module.batch.BatchProcessingListener
                public void onJobStopped(BatchJobInstanceAdapter batchJobInstanceAdapter) {
                    DefaultBatchStep.this.releaseResources(batchJobInstanceAdapter);
                }

                @Override // com.mulesoft.mule.runtime.module.batch.internal.BaseBatchProcessingListener, com.mulesoft.mule.runtime.module.batch.BatchProcessingListener
                public void onJobFinished(BatchJobInstanceAdapter batchJobInstanceAdapter) {
                    DefaultBatchStep.this.exceptionReporter.reset(batchJobInstanceAdapter);
                }
            });
            this.processingTemplate = createProcessingTemplate();
        } catch (Exception e) {
            throw new InitialisationException(e, this);
        }
    }

    private BatchProcessingTemplate createProcessingTemplate() {
        final FlowTraceManager flowTraceManager = this.muleContext.getFlowTraceManager();
        return new BatchProcessingTemplate(this.messageProcessorChain, getLocation(), flowTraceManager, this.muleContext.getStreamCloserService()) { // from class: com.mulesoft.mule.runtime.module.batch.internal.DefaultBatchStep.2
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            public void before(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent) throws MuleException {
                super.before(batchJobInstanceAdapter, coreEvent);
                flowTraceManager.onFlowStart(EnrichedNotificationInfo.createInfo(coreEvent, (Exception) null, (Component) null), DefaultBatchStep.this.getName());
            }

            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            protected void onSuccess(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent) throws MuleException {
                BatchUtils.getRecord(coreEvent).updateWith(coreEvent);
            }

            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            protected void onException(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc, CoreEvent coreEvent) throws MuleException {
                DefaultBatchStep.this.exceptionReporter.report(batchJobInstanceAdapter, exc);
                Record record = BatchUtils.getRecord(coreEvent);
                record.updateWith(coreEvent);
                MuleException batchException = BatchUtils.toBatchException(exc, batchJobInstanceAdapter);
                DefaultBatchStep.this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, DefaultBatchStep.this, record, batchException, BatchNotification.STEP_RECORD_FAILED));
                throw batchException;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            public void onFinally(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent, CoreEvent coreEvent2) throws MuleException {
                super.onFinally(batchJobInstanceAdapter, coreEvent, coreEvent2);
                flowTraceManager.onFlowComplete(EnrichedNotificationInfo.createInfo(coreEvent, (Exception) null, (Component) null));
            }
        };
    }

    private void initStepAggregator() throws InitialisationException {
        if (this.batchStepAggregator != null) {
            this.batchStepAggregator.setMuleContext(this.muleContext);
            this.batchStepAggregator.setStep(this);
            this.batchStepAggregator.setBatchEngine(this.batchEngine);
            this.batchStepAggregator.setExceptionReporter(this.exceptionReporter);
            this.batchStepAggregator.initialise();
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public synchronized void releaseResources(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        if (this.batchStepAggregator != null) {
            this.batchStepAggregator.releaseResources(batchJobInstanceAdapter);
        }
        this.exceptionReporter.reset(batchJobInstanceAdapter);
    }

    private RecordFilteringStrategy createFilteringStrategy() {
        return new RecordFilteringStrategy(this.acceptPolicy, this.acceptExpression, this.expressionManager);
    }

    public void start() throws MuleException {
        if (getOwnedObjects() != null) {
            super.start();
        }
        if (this.batchStepAggregator != null) {
            this.batchStepAggregator.start();
        }
    }

    public void stop() throws MuleException {
        super.stop();
        if (this.batchStepAggregator != null) {
            this.batchStepAggregator.stop();
        }
    }

    public void dispose() {
        super.dispose();
        if (this.batchStepAggregator != null) {
            this.batchStepAggregator.dispose();
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public Record onRecord(Record record, BatchTransactionContext batchTransactionContext) throws MuleException {
        BatchJobInstanceAdapter jobInstance = batchTransactionContext.getJobInstance();
        try {
            if (processRecord(record, jobInstance, batchTransactionContext) != null && this.batchStepAggregator != null) {
                try {
                    this.batchStepAggregator.add(record, batchTransactionContext);
                    return null;
                } catch (Exception e) {
                    handleRecordException(record, jobInstance, e);
                }
            }
        } catch (Exception e2) {
            handleRecordException(record, jobInstance, e2);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Exception found in step %s processing record for instance %s of job %s. Record has been marked as failed in the step", this.name, jobInstance.getId(), jobInstance.getOwnerJobName()), e2);
            }
        }
        return record;
    }

    private void handleRecordException(Record record, BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc) {
        if (exc.getCause() instanceof InterruptedException) {
            throw new MuleRuntimeException(exc.getCause());
        }
        record.markAsFailedInStep(this, exc);
        this.exceptionReporter.report(batchJobInstanceAdapter, exc);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public boolean finishIfCompleted(BatchJobInstanceAdapter batchJobInstanceAdapter, BatchTransactionContext batchTransactionContext) {
        boolean isCompletedFor = isCompletedFor(batchJobInstanceAdapter, this.batchStepAggregator != null ? this.batchStepAggregator.size(batchJobInstanceAdapter) : 0L);
        if (isCompletedFor) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("Step %s finished processing all records for instance %s of job %s", this.name, batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
            }
            this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, this, BatchNotification.STEP_JOB_END));
            this.exceptionReporter.reset(batchJobInstanceAdapter);
            flushAggregator(batchJobInstanceAdapter);
            if (!batchJobInstanceAdapter.getStatus().isExecutable() && !batchJobInstanceAdapter.getStatus().equals(BatchJobInstanceStatus.STOPPED)) {
                this.batchEngine.getBatchQueueManager().disposeQueues(batchTransactionContext);
            }
        }
        return isCompletedFor;
    }

    private CoreEvent processRecord(Record record, BatchJobInstanceAdapter batchJobInstanceAdapter, BatchTransactionContext batchTransactionContext) throws Exception {
        record.markAsProcessedByStep(this);
        this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, this, record, BatchNotification.STEP_RECORD_START));
        RecordCompletableFuture recordCompletableFuture = new RecordCompletableFuture(batchTransactionContext.getResolutionCallback());
        record.setCompletionCallback(recordCompletableFuture);
        CoreEvent resolveBatchJobStepRecordCoreEvent = resolveBatchJobStepRecordCoreEvent(record, batchJobInstanceAdapter, batchTransactionContext, recordCompletableFuture);
        CoreEvent coreEvent = null;
        if (this.recordFilteringStrategy.acceptRecord(resolveBatchJobStepRecordCoreEvent, record)) {
            coreEvent = this.processingTemplate.process(batchJobInstanceAdapter, resolveBatchJobStepRecordCoreEvent, this.coreEventTracer);
        }
        this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, this, record, BatchNotification.STEP_RECORD_END));
        return coreEvent;
    }

    private CoreEvent resolveBatchJobStepRecordCoreEvent(Record record, BatchJobInstanceAdapter batchJobInstanceAdapter, BatchTransactionContext batchTransactionContext, RecordCompletableFuture recordCompletableFuture) {
        CoreEvent build;
        synchronized (this.lock) {
            if (isNewBatchJobInstanceToProcess(batchTransactionContext.getJobInstance().getId())) {
                build = this.batchEngine.createEventBuilder(record, batchJobInstanceAdapter, recordCompletableFuture).build();
                this.coreEventTracer.injectDistributedTraceContext(build.getContext(), new MapDistributedTraceContextGetter(this.serializedBatchStepSpan));
            } else {
                this.processingBatchJobInstanceId = batchTransactionContext.getJobInstance().getId();
                build = this.batchEngine.createEventBuilder(record, batchJobInstanceAdapter, recordCompletableFuture).build();
                this.coreEventTracer.injectDistributedTraceContext(build.getContext(), new MapDistributedTraceContextGetter(batchTransactionContext.getJobInstance().getBatchEvent().getSerializedBatchJobInstanceSpan()));
                this.stepSpan = (Span) this.coreEventTracer.startSpan(build, this.initialSpanInfoProvider.getInitialSpanInfo(this, BATCH_STEP_SPAN_NAME, "")).orElse(null);
                this.serializedBatchStepSpan = this.coreEventTracer.getDistributedTraceContextMap(build);
            }
        }
        return build;
    }

    private boolean isNewBatchJobInstanceToProcess(String str) {
        return str.equals(this.processingBatchJobInstanceId);
    }

    private void flushAggregator(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        if (this.batchStepAggregator != null) {
            this.batchStepAggregator.flush(batchJobInstanceAdapter);
        }
    }

    private boolean isCompletedFor(BatchJobInstance batchJobInstance, long j) {
        return batchJobInstance.getResult().getResultForStep(this.name).getReceivedRecords() + j >= batchJobInstance.getRecordCount();
    }

    protected List<Processor> getOwnedMessageProcessors() {
        return Collections.singletonList(this.messageProcessorChain);
    }

    public void setMessageProcessors(List<Processor> list) {
        this.messageProcessors = list;
    }

    public void setBatchStepAggregator(DefaultBatchStepAggregator defaultBatchStepAggregator) {
        this.batchStepAggregator = defaultBatchStepAggregator;
    }

    public void setAcceptPolicy(AcceptRecordPolicy acceptRecordPolicy) {
        this.acceptPolicy = acceptRecordPolicy;
    }

    public void setAcceptExpression(String str) {
        this.acceptExpression = str;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.api.BatchStep
    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    @Inject
    public void setBatchEngine(BatchEngine batchEngine) {
        this.batchEngine = batchEngine;
    }

    public void setLast(boolean z) {
        this.last = z;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public boolean isLast() {
        return this.last;
    }

    public void setNextStep(BatchStep batchStep) {
        this.nextStep = batchStep;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public BatchStep getNextStep() {
        return this.nextStep;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public ExceptionHistory getExceptionHistory(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        return this.exceptionReporter.getExceptionHistory(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public DefaultBatchStepAggregator getBatchStepAggregator() {
        return this.batchStepAggregator;
    }

    public BatchEngine getBatchEngine() {
        return this.batchEngine;
    }

    public void setJob(BatchJobAdapter batchJobAdapter) {
        this.job = batchJobAdapter;
    }

    public void setExceptionReporter(StepExceptionReporter stepExceptionReporter) {
        this.exceptionReporter = stepExceptionReporter;
    }

    public boolean equals(Object obj) {
        if (obj instanceof BatchStep) {
            return this.name.equals(((BatchStep) obj).getName());
        }
        return false;
    }

    public int hashCode() {
        return this.name.hashCode();
    }

    public void setExpressionManager(ExpressionManager expressionManager) {
        this.expressionManager = expressionManager;
    }

    public void setNotificationDispatcher(NotificationDispatcher notificationDispatcher) {
        this.notificationDispatcher = notificationDispatcher;
    }

    public void endStepSpan() {
        synchronized (this.lock) {
            this.serializedBatchStepSpan = Collections.emptyMap();
            if (this.stepSpan != null) {
                InternalSpan.getAsInternalSpan(this.stepSpan).end();
                InternalSpan.getAsInternalSpan(this.stepSpan.getParent()).end();
            }
            this.stepSpan = null;
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter
    public Map<String, String> getSerializedBatchStepSpan() {
        return this.serializedBatchStepSpan;
    }
}
