package org.mule.test.http.functional;

import io.qameta.allure.Story;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.http.HttpResponse;
import org.apache.http.client.fluent.Executor;
import org.apache.http.client.fluent.Request;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mule.functional.api.component.FunctionalTestProcessor;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.tck.junit4.rule.DynamicPort;

@Story("Source overload handling")
/* loaded from: input_file:org/mule/test/http/functional/HttpListenerSourceOverloadTestCase.class */
public class HttpListenerSourceOverloadTestCase extends AbstractHttpTestCase {
    private static final int MAX_CONNECTIONS = 512;

    @Rule
    public DynamicPort listenPort = new DynamicPort("port");
    private Executor httpClientExecutor;
    private Latch keepProcessorsActive;
    private AtomicInteger numProcessedRequests;
    private AtomicInteger numServiceBusy;
    private ConcurrentLinkedQueue<Throwable> accumulatedErrors;
    private Semaphore waitForNextRequester;

    protected String getConfigFile() {
        return "http-listener-overload-config.xml";
    }

    @Before
    public void setup() {
        PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
        poolingHttpClientConnectionManager.setDefaultMaxPerRoute(MAX_CONNECTIONS);
        poolingHttpClientConnectionManager.setMaxTotal(MAX_CONNECTIONS);
        this.httpClientExecutor = Executor.newInstance(HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager).build());
        this.keepProcessorsActive = new Latch();
        this.numProcessedRequests = new AtomicInteger(0);
        this.numServiceBusy = new AtomicInteger(0);
        this.accumulatedErrors = new ConcurrentLinkedQueue<>();
        this.waitForNextRequester = new Semaphore(0);
    }

    @Test
    public void overloadScenario() throws Exception {
        String format = String.format("http://localhost:%s/", Integer.valueOf(this.listenPort.getNumber()));
        ArrayList arrayList = new ArrayList();
        configureTestComponent("testFlow");
        while (this.numServiceBusy.get() < 5) {
            arrayList.add(executeRequestInAnotherThread(format));
            this.waitForNextRequester.acquire();
        }
        this.keepProcessorsActive.release();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).join();
        }
        if (this.accumulatedErrors.size() > 0) {
            Assert.fail("Errors encountered in test: \n" + String.join("\n", (Iterable<? extends CharSequence>) this.accumulatedErrors.stream().limit(10L).map(th -> {
                return th.toString();
            }).collect(Collectors.toList())));
        }
        Assert.assertThat(Integer.valueOf(this.numProcessedRequests.get()), Matchers.greaterThan(Integer.valueOf(arrayList.size() / 2)));
    }

    private Thread executeRequestInAnotherThread(String str) {
        Thread thread = new Thread(() -> {
            try {
                try {
                    HttpResponse returnResponse = this.httpClientExecutor.execute(Request.Get(str).connectTimeout(5000).socketTimeout(5000)).returnResponse();
                    int statusCode = returnResponse.getStatusLine().getStatusCode();
                    if (statusCode == HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getStatusCode()) {
                        Assert.assertThat(returnResponse.getStatusLine().getReasonPhrase(), CoreMatchers.is(HttpConstants.HttpStatus.SERVICE_UNAVAILABLE.getReasonPhrase()));
                        Assert.assertThat(IOUtils.toString(returnResponse.getEntity().getContent()), CoreMatchers.is("Scheduler unavailable"));
                    } else if (statusCode == HttpConstants.HttpStatus.OK.getStatusCode()) {
                        Assert.assertThat(IOUtils.toString(returnResponse.getEntity().getContent()), CoreMatchers.is("the result"));
                    } else {
                        this.accumulatedErrors.add(new AssertionError("request returned invalid status code: " + statusCode));
                    }
                    this.numServiceBusy.incrementAndGet();
                    this.waitForNextRequester.release();
                } catch (SocketException e) {
                    this.numServiceBusy.incrementAndGet();
                    this.waitForNextRequester.release();
                } catch (Throwable th) {
                    this.accumulatedErrors.add(th);
                    this.numServiceBusy.incrementAndGet();
                    this.waitForNextRequester.release();
                }
            } catch (Throwable th2) {
                this.numServiceBusy.incrementAndGet();
                this.waitForNextRequester.release();
                throw th2;
            }
        });
        thread.start();
        return thread;
    }

    private void configureTestComponent(String str) throws Exception {
        FunctionalTestProcessor.getFromFlow(this.locator, str).setEventCallback((coreEvent, obj, muleContext) -> {
            try {
                this.numProcessedRequests.incrementAndGet();
                this.waitForNextRequester.release();
                this.keepProcessorsActive.await();
            } catch (Throwable th) {
                this.accumulatedErrors.add(th);
            }
        });
    }
}
