package org.mule.runtime.core.internal.policy;

import com.google.common.collect.ImmutableMap;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.policy.SourcePolicyParametersTransformer;
import org.mule.runtime.core.api.util.concurrent.FunctionalReadWriteLock;
import org.mule.runtime.core.internal.event.EventQuickCopy;
import org.mule.runtime.core.internal.exception.DefaultErrorTypeRepository;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.util.rx.FluxSinkSupplier;
import org.mule.runtime.core.internal.util.rx.RoundRobinFluxSinkSupplier;
import org.mule.runtime.core.internal.util.rx.TransactionAwareFluxSinkSupplier;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/mule/runtime/core/internal/policy/CommonSourcePolicy.class */
public class CommonSourcePolicy {
    public static final String POLICY_SOURCE_PARAMETERS_PROCESSOR = "policy.source.parametersProcessor";
    public static final String POLICY_SOURCE_CALLER_SINK = "policy.source.callerSink";
    private final FluxSinkSupplier<CoreEvent> policySink;
    private final AtomicBoolean disposed;
    private final FunctionalReadWriteLock readWriteLock;
    private final Optional<SourcePolicyParametersTransformer> sourcePolicyParametersTransformer;
    private final DefaultErrorTypeRepository errorTypeRepository;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CommonSourcePolicy(Supplier<FluxSink<CoreEvent>> supplier) {
        this(supplier, Optional.empty());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CommonSourcePolicy(Supplier<FluxSink<CoreEvent>> supplier, Optional<SourcePolicyParametersTransformer> optional) {
        this.policySink = new TransactionAwareFluxSinkSupplier(supplier, new RoundRobinFluxSinkSupplier(Runtime.getRuntime().availableProcessors(), supplier));
        this.sourcePolicyParametersTransformer = optional;
        this.readWriteLock = FunctionalReadWriteLock.readWriteLock();
        this.disposed = new AtomicBoolean(false);
        this.errorTypeRepository = new DefaultErrorTypeRepository();
    }

    public Publisher<Either<SourcePolicyFailureResult, SourcePolicySuccessResult>> process(CoreEvent coreEvent, MessageSourceResponseParametersProcessor messageSourceResponseParametersProcessor) {
        return (Publisher) this.readWriteLock.withReadLock(lockReleaser -> {
            return !this.disposed.get() ? Mono.create(monoSink -> {
                this.policySink.get().next(EventQuickCopy.quickCopy(coreEvent, ImmutableMap.of(POLICY_SOURCE_PARAMETERS_PROCESSOR, (MonoSink) messageSourceResponseParametersProcessor, POLICY_SOURCE_CALLER_SINK, monoSink)));
            }) : Mono.just(coreEvent).map(coreEvent2 -> {
                MessagingException messagingException = new MessagingException(I18nMessageFactory.createStaticMessage("Source policy already disposed"), coreEvent);
                messagingException.setProcessedEvent(CoreEvent.builder(coreEvent).error(ErrorBuilder.builder(messagingException).errorType(this.errorTypeRepository.getAnyErrorType()).build()).build());
                Supplier supplier = this.sourcePolicyParametersTransformer.isPresent() ? () -> {
                    return this.sourcePolicyParametersTransformer.get().fromMessageToErrorResponseParameters(coreEvent.getMessage());
                } : () -> {
                    return messageSourceResponseParametersProcessor.getFailedExecutionResponseParametersFunction().apply(coreEvent);
                };
                ((BaseEventContext) coreEvent2.getContext()).error(messagingException);
                return Either.left(new SourcePolicyFailureResult(messagingException, supplier));
            });
        });
    }

    public MessageSourceResponseParametersProcessor getResponseParamsProcessor(CoreEvent coreEvent) {
        return (MessageSourceResponseParametersProcessor) ((InternalEvent) coreEvent).getInternalParameter(POLICY_SOURCE_PARAMETERS_PROCESSOR);
    }

    public void finishFlowProcessing(CoreEvent coreEvent, Either<SourcePolicyFailureResult, SourcePolicySuccessResult> either) {
        if (!((BaseEventContext) coreEvent.getContext()).isComplete()) {
            ((BaseEventContext) coreEvent.getContext()).success(coreEvent);
        }
        ((MonoSink) ((InternalEvent) coreEvent).getInternalParameter(POLICY_SOURCE_CALLER_SINK)).success(either);
    }

    public void finishFlowProcessing(CoreEvent coreEvent, Either<SourcePolicyFailureResult, SourcePolicySuccessResult> either, Throwable th) {
        if (!((BaseEventContext) coreEvent.getContext()).isComplete()) {
            ((BaseEventContext) coreEvent.getContext()).error(th);
        }
        ((MonoSink) ((InternalEvent) coreEvent).getInternalParameter(POLICY_SOURCE_CALLER_SINK)).success(either);
    }

    public void dispose() {
        this.readWriteLock.withWriteLock(() -> {
            this.policySink.dispose();
            this.disposed.set(true);
        });
    }
}
