package com.mulesoft.mule.runtime.module.batch.internal.engine;

import com.mulesoft.mule.runtime.module.batch.api.extension.structure.BatchJobInstance;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.exception.BatchException;
import com.mulesoft.mule.runtime.module.batch.internal.exception.BatchFailureHandlingException;
import java.io.ByteArrayOutputStream;
import java.util.Optional;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.notification.EnrichedNotificationInfo;
import org.mule.runtime.core.api.context.notification.FlowTraceManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.message.OutputHandler;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.util.StreamCloserService;
import org.mule.runtime.core.internal.util.rx.ReactorTransactionUtils;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.tracer.api.EventTracer;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/mulesoft/mule/runtime/module/batch/internal/engine/BatchProcessingTemplate.class */
public abstract class BatchProcessingTemplate {
    private final ReactiveProcessor messageProcessor;
    private final StreamCloserService closer;
    private final FlowTraceManager flowTraceManager;

    public BatchProcessingTemplate(ReactiveProcessor reactiveProcessor, ComponentLocation componentLocation, FlowTraceManager flowTraceManager, StreamCloserService streamCloserService) {
        this.messageProcessor = publisher -> {
            return Mono.from(publisher).subscriberContext(ReactorTransactionUtils.popTxFromSubscriberContext()).transform(reactiveProcessor).subscriberContext(ReactorTransactionUtils.pushTxToSubscriberContext(componentLocation.getLocation()));
        };
        this.flowTraceManager = flowTraceManager;
        this.closer = streamCloserService;
    }

    public CoreEvent process(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent, EventTracer<CoreEvent> eventTracer) throws MuleException {
        CoreEvent.Builder builder = CoreEvent.builder(coreEvent);
        CoreEvent coreEvent2 = null;
        try {
            try {
                before(batchJobInstanceAdapter, coreEvent);
                try {
                    coreEvent2 = MessageProcessors.processToApply(coreEvent, this.messageProcessor);
                    if (coreEvent2 == null) {
                        coreEvent2 = coreEvent;
                    }
                    CoreEvent.Builder builder2 = CoreEvent.builder(coreEvent2);
                    Optional error = coreEvent2.getError();
                    if (error.isPresent()) {
                        MuleException cause = ((Error) error.get()).getCause();
                        handleException(batchJobInstanceAdapter, cause instanceof Exception ? (Exception) cause : new BatchException((Throwable) cause, (BatchJobInstance) batchJobInstanceAdapter), coreEvent);
                    } else {
                        handleSuccess(batchJobInstanceAdapter, coreEvent2, builder2);
                    }
                    onFinally(batchJobInstanceAdapter, coreEvent, coreEvent2);
                    if (coreEvent2 != null) {
                        coreEvent2.getContext().success();
                    }
                    eventTracer.endCurrentSpan(coreEvent);
                } catch (MessagingException e) {
                    CoreEvent.builder(e.getEvent());
                    throw e;
                }
            } catch (Exception e2) {
                if (e2.getCause() instanceof InterruptedException) {
                    throw new DefaultMuleException(e2.getCause());
                }
                handleException(batchJobInstanceAdapter, e2, builder.build());
                onFinally(batchJobInstanceAdapter, coreEvent, null);
                if (0 != 0) {
                    coreEvent2.getContext().success();
                }
                eventTracer.endCurrentSpan(coreEvent);
            }
            return coreEvent2;
        } catch (Throwable th) {
            onFinally(batchJobInstanceAdapter, coreEvent, null);
            if (0 != 0) {
                coreEvent2.getContext().success();
            }
            eventTracer.endCurrentSpan(coreEvent);
            throw th;
        }
    }

    protected void handleSuccess(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent, CoreEvent.Builder builder) throws Exception {
        resolveOutputHandlerPayload(coreEvent, builder);
        onSuccess(batchJobInstanceAdapter, builder.build());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v3, types: [com.mulesoft.mule.runtime.module.batch.internal.exception.BatchFailureHandlingException] */
    protected void handleException(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc, CoreEvent coreEvent) throws MuleException {
        Exception exc2 = exc;
        CoreEvent.Builder builder = CoreEvent.builder(coreEvent);
        try {
            try {
                resolveOutputHandlerPayload(coreEvent, builder);
                onException(batchJobInstanceAdapter, exc2, builder.build());
            } catch (Exception e) {
                exc2 = new BatchFailureHandlingException(exc, e, batchJobInstanceAdapter);
                onException(batchJobInstanceAdapter, exc2, builder.build());
            }
        } catch (Throwable th) {
            onException(batchJobInstanceAdapter, exc2, builder.build());
            throw th;
        }
    }

    protected void resolveOutputHandlerPayload(CoreEvent coreEvent, CoreEvent.Builder builder) throws Exception {
        Message message = coreEvent.getMessage();
        if (message.getPayload().getValue() instanceof OutputHandler) {
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                ((OutputHandler) message.getPayload().getValue()).write(coreEvent, byteArrayOutputStream);
                builder.message(Message.builder(message).value(byteArrayOutputStream.toByteArray()).build());
            } catch (Exception e) {
                builder.message(Message.builder(message).nullValue().build());
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void before(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent) throws MuleException {
        this.flowTraceManager.onFlowStart(EnrichedNotificationInfo.createInfo(coreEvent, (Exception) null, (Component) null), String.format("%s [jobId: %s]", batchJobInstanceAdapter.getOwnerJobName(), batchJobInstanceAdapter.getId()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onFinally(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent, CoreEvent coreEvent2) throws MuleException {
        if (coreEvent2 != null) {
            this.closer.closeStream(coreEvent2.getMessage().getPayload().getValue());
        }
        this.flowTraceManager.onFlowComplete(EnrichedNotificationInfo.createInfo(coreEvent2 != null ? coreEvent2 : coreEvent, (Exception) null, (Component) null));
    }

    protected abstract void onException(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc, CoreEvent coreEvent) throws MuleException;

    protected abstract void onSuccess(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent) throws MuleException;
}
