package org.mule.test.http.functional;

import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mule.extension.http.api.HttpResponseAttributes;
import org.mule.functional.junit4.matchers.MessageMatchers;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.tck.junit4.rule.DynamicPort;
import org.mule.test.http.AllureConstants;

@Story(AllureConstants.HttpFeature.HttpStory.STREAMING)
@Feature(AllureConstants.HttpFeature.HTTP_EXTENSION)
/* loaded from: input_file:org/mule/test/http/functional/HttpStreamingTestCase.class */
public class HttpStreamingTestCase extends AbstractHttpTestCase {
    protected static AtomicBoolean stop;

    @Rule
    public DynamicPort httpPort = new DynamicPort("httpPort");

    /* loaded from: input_file:org/mule/test/http/functional/HttpStreamingTestCase$StoppableInputStreamProcessor.class */
    protected static class StoppableInputStreamProcessor implements Processor {
        protected StoppableInputStreamProcessor() {
        }

        public Event process(Event event) throws MuleException {
            return Event.builder(event).message(Message.of(new InputStream() { // from class: org.mule.test.http.functional.HttpStreamingTestCase.StoppableInputStreamProcessor.1
                @Override // java.io.InputStream
                public int read() throws IOException {
                    return HttpStreamingTestCase.stop.get() ? -1 : 1;
                }
            })).build();
        }
    }

    protected String getConfigFile() {
        return "http-streaming-config.xml";
    }

    @Before
    public void setUp() {
        stop = new AtomicBoolean(false);
    }

    @Test
    public void requesterStreams() throws Exception {
        Event run = flowRunner("client").run();
        stop.set(true);
        Assert.assertThat(run.getMessage(), MessageMatchers.hasAttributes(Matchers.instanceOf(HttpResponseAttributes.class)));
        Assert.assertThat(run.getMessage().getPayload().getValue(), Matchers.instanceOf(CursorStreamProvider.class));
    }

    @Test
    public void listenerStreams() throws Exception {
        String format = String.format("http://localhost:%s/", Integer.valueOf(this.httpPort.getNumber()));
        CloseableHttpAsyncClient createDefault = HttpAsyncClients.createDefault();
        try {
            createDefault.start();
            Future execute = createDefault.execute(HttpAsyncMethods.createGet(format), new BasicAsyncResponseConsumer(), (FutureCallback) null);
            stop.set(true);
            Assert.assertThat(((HttpResponse) execute.get()).getFirstHeader("Transfer-Encoding").getValue(), Matchers.containsString("chunked"));
            createDefault.close();
        } catch (Throwable th) {
            createDefault.close();
            throw th;
        }
    }
}
