package org.mule.runtime.core.processor;

import java.beans.ExceptionListener;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.DefaultEventContext;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.message.InternalMessage;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.util.concurrent.Latch;
import org.mule.tck.MuleTestUtils;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;

/* loaded from: input_file:org/mule/runtime/core/processor/AsyncInterceptingMessageProcessorTestCase.class */
public class AsyncInterceptingMessageProcessorTestCase extends AbstractReactiveProcessorTestCase implements ExceptionListener {
    protected AsyncInterceptingMessageProcessor messageProcessor;
    protected TestListener target;
    protected Latch latch;
    protected Exception exceptionThrown;
    private final Processor failingProcessor;

    @Rule
    public ExpectedException expectedException;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/mule/runtime/core/processor/AsyncInterceptingMessageProcessorTestCase$TestListener.class */
    public class TestListener implements Processor {
        Event sensedEvent;
        Thread thread;

        TestListener() {
        }

        public Event process(Event event) throws MuleException {
            this.thread = Thread.currentThread();
            this.sensedEvent = event;
            AsyncInterceptingMessageProcessorTestCase.this.latch.countDown();
            return event;
        }
    }

    public AsyncInterceptingMessageProcessorTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
        this.target = new TestListener();
        this.latch = new Latch();
        this.failingProcessor = event -> {
            throw new DefaultMuleException("failure");
        };
        this.expectedException = ExpectedException.none();
        setStartContext(true);
    }

    protected void doSetUp() throws Exception {
        super.doSetUp();
        this.messageProcessor = createAsyncInterceptingMessageProcessor(this.target);
    }

    @Test
    public void testProcessOneWay() throws Exception {
        assertAsync(this.messageProcessor, eventBuilder().message(InternalMessage.of("Test Message")).build());
    }

    @Test
    public void testProcessRequestResponse() throws Exception {
        assertAsync(this.messageProcessor, eventBuilder().message(InternalMessage.of("Test Message")).build());
    }

    @Test
    public void testException() throws Exception {
        FlowConstruct build = Flow.builder("flow", muleContext).build();
        initialiseObject(build);
        Event build2 = Event.builder(DefaultEventContext.create(build, "test")).message(InternalMessage.of("Test Message")).build();
        this.messageProcessor.setListener(this.failingProcessor);
        this.messageProcessor.setMuleContext(muleContext);
        this.messageProcessor.setFlowConstruct(build);
        this.expectedException.expect(DefaultMuleException.class);
        process(this.messageProcessor, build2);
    }

    protected void assertAsync(Processor processor, Event event) throws Exception {
        Event process = process(processor, event);
        this.latch.await(10000L, TimeUnit.MILLISECONDS);
        Assert.assertNotNull(this.target.sensedEvent);
        Assert.assertSame(event, this.target.sensedEvent);
        Assert.assertEquals(event.getMessageAsString(muleContext), this.target.sensedEvent.getMessageAsString(muleContext));
        Assert.assertNotSame(Thread.currentThread(), this.target.thread);
        Assert.assertSame(event, process);
        Assert.assertNull(this.exceptionThrown);
    }

    protected AsyncInterceptingMessageProcessor createAsyncInterceptingMessageProcessor(Processor processor) throws Exception {
        AsyncInterceptingMessageProcessor asyncInterceptingMessageProcessor = new AsyncInterceptingMessageProcessor(() -> {
            return this.scheduler;
        });
        asyncInterceptingMessageProcessor.setMuleContext(muleContext);
        asyncInterceptingMessageProcessor.setFlowConstruct(MuleTestUtils.getTestFlow(muleContext));
        asyncInterceptingMessageProcessor.setListener(processor);
        return asyncInterceptingMessageProcessor;
    }

    public void exceptionThrown(Exception exc) {
        this.exceptionThrown = exc;
    }
}
