package org.mule.test.module.extension.notification;

import io.qameta.allure.Story;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mule.runtime.api.notification.ExtensionNotification;
import org.mule.runtime.api.notification.ExtensionNotificationListener;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.test.heisenberg.extension.model.PersonalInfo;
import org.mule.test.heisenberg.extension.model.SimpleKnockeableDoor;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;

/* loaded from: input_file:org/mule/test/module/extension/notification/ExtensionNotificationsTestCase.class */
public class ExtensionNotificationsTestCase extends AbstractExtensionFunctionalTestCase {
    public static final int POLL_DELAY_MILLIS = 300;
    public static final int FLOW_STOP_TIMEOUT = 2000;
    private static final String HEISENBERG = "Heisenberg".toUpperCase();
    private static final String NEW_BATCH = "NEW_BATCH";
    private static final String NEXT_BATCH = "NEXT_BATCH";
    private static final String BATCH_TERMINATED = "BATCH_TERMINATED";
    private static final String BATCH_DELIVERY_FAILED = "BATCH_DELIVERY_FAILED";
    private static final String BATCH_DELIVERED = "BATCH_DELIVERED";
    private static final String BATCH_FAILED = "BATCH_FAILED";
    private static final String KNOCKING_DOOR = "KNOCKING_DOOR";
    private static final String KNOCKED_DOOR = "KNOCKED_DOOR";
    private TestExtensionNotificationListener listener = null;

    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/test/module/extension/notification/ExtensionNotificationsTestCase$TestExtensionNotificationListener.class */
    public class TestExtensionNotificationListener implements ExtensionNotificationListener {
        private final Consumer<ExtensionNotification> onNotification;
        private final MultiMap<String, ExtensionNotification> notifications = new MultiMap<>();
        private Map<String, Integer> correlationCount;

        public TestExtensionNotificationListener(Consumer<ExtensionNotification> consumer, boolean z) {
            this.onNotification = consumer;
            if (z) {
                this.correlationCount = new HashMap();
            }
        }

        public void onNotification(ExtensionNotification extensionNotification) {
            synchronized (this.notifications) {
                this.notifications.put(extensionNotification.getAction().getIdentifier(), extensionNotification);
            }
            if (this.correlationCount != null) {
                synchronized (this.correlationCount) {
                    String correlationId = extensionNotification.getEvent().getCorrelationId();
                    this.correlationCount.put(correlationId, Integer.valueOf(this.correlationCount.computeIfAbsent(correlationId, str -> {
                        return 0;
                    }).intValue() + 1));
                }
            }
            this.onNotification.accept(extensionNotification);
        }

        public MultiMap<String, ExtensionNotification> getNotifications() {
            MultiMap<String, ExtensionNotification> immutableMultiMap;
            synchronized (this.notifications) {
                immutableMultiMap = this.notifications.toImmutableMultiMap();
            }
            return immutableMultiMap;
        }

        public synchronized Integer getCorrelationCount(String str) {
            Integer num;
            synchronized (this.correlationCount) {
                num = this.correlationCount.get(str);
            }
            return num;
        }
    }

    protected String getConfigFile() {
        return "notifications-config.xml";
    }

    @Test
    public void operationFiresNotificationsWithCustomData() throws Exception {
        Latch latch = new Latch();
        setUpListener(extensionNotification -> {
            checkIfDone(latch, 2);
        }, false);
        String correlationId = flowRunner("operationNotification").run().getCorrelationId();
        Assert.assertThat("Expected notifications not received.", Boolean.valueOf(latch.await(6000L, TimeUnit.MILLISECONDS)), Matchers.is(true));
        MultiMap<String, ExtensionNotification> notifications = this.listener.getNotifications();
        Set keySet = notifications.keySet();
        Assert.assertThat(keySet, Matchers.hasItem(KNOCKING_DOOR));
        Assert.assertThat(keySet, Matchers.hasItem(KNOCKED_DOOR));
        ExtensionNotification extensionNotification2 = (ExtensionNotification) notifications.get(KNOCKING_DOOR);
        Assert.assertThat(extensionNotification2, Matchers.is(Matchers.notNullValue()));
        Assert.assertThat(extensionNotification2.getAction().getNamespace(), Matchers.is(HEISENBERG));
        Assert.assertThat(extensionNotification2.getData().getValue(), Matchers.instanceOf(SimpleKnockeableDoor.class));
        Assert.assertThat(((SimpleKnockeableDoor) extensionNotification2.getData().getValue()).getSimpleName(), Matchers.is("Top Level Skyler @ 308 Negra Arroyo Lane"));
        Assert.assertThat(extensionNotification2.getEvent().getCorrelationId(), Matchers.is(correlationId));
        ExtensionNotification extensionNotification3 = (ExtensionNotification) notifications.get(KNOCKED_DOOR);
        Assert.assertThat(extensionNotification3, Matchers.is(Matchers.notNullValue()));
        Assert.assertThat(extensionNotification3.getAction().getNamespace(), Matchers.is(HEISENBERG));
        Assert.assertThat(extensionNotification3.getData().getValue(), Matchers.instanceOf(SimpleKnockeableDoor.class));
        Assert.assertThat(((SimpleKnockeableDoor) extensionNotification3.getData().getValue()).getSimpleName(), Matchers.is("Top Level Skyler @ 308 Negra Arroyo Lane"));
        Assert.assertThat(extensionNotification3.getEvent().getCorrelationId(), Matchers.is(correlationId));
    }

    @Test
    public void sourceFiresNotificationsOnSuccess() throws Exception {
        Latch latch = new Latch();
        setUpListener(extensionNotification -> {
            checkIfDone(latch, 4);
        }, false);
        getFlowConstruct("sourceNotifications").start();
        Assert.assertThat("Expected notifications not received.", Boolean.valueOf(latch.await(6000L, TimeUnit.MILLISECONDS)), Matchers.is(true));
        MultiMap<String, ExtensionNotification> notifications = this.listener.getNotifications();
        Set keySet = notifications.keySet();
        Assert.assertThat(keySet, Matchers.hasItem(NEW_BATCH));
        Assert.assertThat(keySet, Matchers.hasItem(NEXT_BATCH));
        Assert.assertThat(keySet, Matchers.hasItem(BATCH_DELIVERED));
        Assert.assertThat(keySet, Matchers.hasItem(BATCH_TERMINATED));
        String correlationId = verifyNotificationAndValue((ExtensionNotification) notifications.get(NEW_BATCH), 1).getEvent().getCorrelationId();
        Assert.assertThat(verifyNotificationAndValue((ExtensionNotification) notifications.get(NEXT_BATCH), 1000000L).getEvent().getCorrelationId(), Matchers.is(correlationId));
        Assert.assertThat(verifyNotificationAndValue((ExtensionNotification) notifications.get(BATCH_DELIVERED), 100L).getEvent().getCorrelationId(), Matchers.is(correlationId));
        Assert.assertThat(verifyNotificationAndValue((ExtensionNotification) notifications.get(BATCH_TERMINATED), 1).getEvent().getCorrelationId(), Matchers.is(correlationId));
        requestFlowToStopAndWait("sourceNotifications");
    }

    @Test
    public void sourceFiresNotificationsOnError() throws Exception {
        Latch latch = new Latch();
        setUpListener(extensionNotification -> {
            checkIfDone(latch, 4);
        }, false);
        getFlowConstruct("sourceNotificationsError").start();
        Assert.assertThat("Expected notifications not received", Boolean.valueOf(latch.await(6000L, TimeUnit.MILLISECONDS)), Matchers.is(true));
        MultiMap<String, ExtensionNotification> notifications = this.listener.getNotifications();
        Set keySet = notifications.keySet();
        Assert.assertThat(keySet, Matchers.hasItem(NEW_BATCH));
        Assert.assertThat(keySet, Matchers.hasItem(NEXT_BATCH));
        Assert.assertThat(keySet, Matchers.hasItem(BATCH_DELIVERY_FAILED));
        Assert.assertThat(keySet, Matchers.hasItem(BATCH_TERMINATED));
        String correlationId = verifyNotificationAndValue((ExtensionNotification) notifications.get(NEW_BATCH), 1).getEvent().getCorrelationId();
        Assert.assertThat(verifyNotificationAndValue((ExtensionNotification) notifications.get(NEXT_BATCH), 1000000L).getEvent().getCorrelationId(), Matchers.is(correlationId));
        ExtensionNotification extensionNotification2 = (ExtensionNotification) notifications.get(BATCH_DELIVERY_FAILED);
        Assert.assertThat(extensionNotification2, Matchers.is(Matchers.notNullValue()));
        Assert.assertThat(extensionNotification2.getAction().getNamespace(), Matchers.is(HEISENBERG));
        Assert.assertThat(extensionNotification2.getData().getValue(), Matchers.instanceOf(PersonalInfo.class));
        Assert.assertThat(((PersonalInfo) extensionNotification2.getData().getValue()).getAge(), Matchers.is(27));
        Assert.assertThat(extensionNotification2.getEvent().getCorrelationId(), Matchers.is(correlationId));
        Assert.assertThat(verifyNotificationAndValue((ExtensionNotification) notifications.get(BATCH_TERMINATED), 1).getEvent().getCorrelationId(), Matchers.is(correlationId));
        requestFlowToStopAndWait("sourceNotificationsError");
    }

    @Test
    @Story("Backpressure")
    public void sourceFiresNotificationsOnBackPressure() throws Exception {
        Latch latch = new Latch();
        Reference reference = new Reference();
        setUpListener(extensionNotification -> {
            if (BATCH_FAILED.equals(extensionNotification.getAction().getIdentifier())) {
                reference.set(extensionNotification);
                latch.release();
            }
        }, true);
        requestFlowToStartAndWait("sourceNotificationsBackPressure");
        Assert.assertThat("Batch failure notification not received.", Boolean.valueOf(latch.await(10000L, TimeUnit.MILLISECONDS)), Matchers.is(true));
        ExtensionNotification extensionNotification2 = (ExtensionNotification) reference.get();
        Assert.assertThat(extensionNotification2, Matchers.is(Matchers.notNullValue()));
        final String correlationId = extensionNotification2.getEvent().getCorrelationId();
        new PollingProber(10000L, 200L).check(new Probe() { // from class: org.mule.test.module.extension.notification.ExtensionNotificationsTestCase.1
            public boolean isSatisfied() {
                Stream stream = ExtensionNotificationsTestCase.this.listener.getNotifications().getAll(ExtensionNotificationsTestCase.BATCH_FAILED).stream();
                String str = correlationId;
                return stream.anyMatch(extensionNotification3 -> {
                    return extensionNotification3.getEvent().getCorrelationId().equals(str);
                });
            }

            public String describeFailure() {
                return "Expected notifications not found.";
            }
        });
        requestFlowToStopAndWait("sourceNotificationsBackPressure");
        MultiMap<String, ExtensionNotification> notifications = this.listener.getNotifications();
        Set keySet = notifications.keySet();
        Assert.assertThat(keySet, Matchers.hasItem(NEW_BATCH));
        Assert.assertThat(keySet, Matchers.hasItem(NEXT_BATCH));
        Assert.assertThat(keySet, Matchers.hasItem(BATCH_FAILED));
        Assert.assertThat(keySet, Matchers.hasItem(BATCH_TERMINATED));
        int intValue = ((Integer) extensionNotification2.getData().getValue()).intValue();
        verifyNotificationAndValue(extensionNotification2, Integer.valueOf(intValue));
        verifyNotificationAndValue(getNotificationMatch(notifications, correlationId, BATCH_TERMINATED), Integer.valueOf(intValue));
    }

    private void checkIfDone(Latch latch, int i) {
        if (this.listener.getNotifications().keySet().size() == i) {
            latch.release();
        }
    }

    private ExtensionNotification getNotificationMatch(MultiMap<String, ExtensionNotification> multiMap, String str, String str2) {
        return (ExtensionNotification) multiMap.getAll(str2).stream().filter(extensionNotification -> {
            return str.equals(extensionNotification.getEvent().getCorrelationId());
        }).findAny().orElse(null);
    }

    private <T> ExtensionNotification verifyNotificationAndValue(ExtensionNotification extensionNotification, T t) {
        Assert.assertThat(extensionNotification.getAction().getNamespace(), Matchers.is(HEISENBERG));
        Assert.assertThat(extensionNotification.getData().getValue(), Matchers.instanceOf(t.getClass()));
        Assert.assertThat(extensionNotification.getData().getValue(), Matchers.is(t));
        return extensionNotification;
    }

    private void setUpListener(Consumer<ExtensionNotification> consumer, boolean z) {
        this.listener = new TestExtensionNotificationListener(consumer, z);
        this.notificationListenerRegistry.registerListener(this.listener);
    }

    protected void requestFlowToStartAndWait(String str) throws Exception {
        startFlow(str);
        checkFlowIsStarted(str);
    }

    protected void startFlow(String str) throws Exception {
        getFlowConstruct(str).start();
    }

    protected void stopFlow(String str) throws Exception {
        getFlowConstruct(str).stop();
    }

    protected void requestFlowToStopAndWait(String str) throws Exception {
        stopFlow(str);
        checkFlowIsStopped(str);
    }

    protected void checkFlowIsStopped(String str) throws Exception {
        Flow flowConstruct = getFlowConstruct(str);
        new PollingProber(2000L, 300L).check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(flowConstruct.getLifecycleState().isStopped());
        }, "The flow did not stop in a reasonable amount of time"));
    }

    private void checkFlowIsStarted(String str) throws Exception {
        Flow flowConstruct = getFlowConstruct(str);
        new PollingProber(2000L, 300L).check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(flowConstruct.getLifecycleState().isStarted());
        }, "The flow did not start in a reasonable amount of time"));
    }
}
