package org.dna.mqtt.bechnmark;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.dna.mqtt.bechnmark.mina.MQTTDropServer;
import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/dna/mqtt/bechnmark/ConsumerFuture.class */
public class ConsumerFuture implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerFuture.class);
    private String m_clientID;
    private static final String BENCHMARK_FILE = "consumer_bechmark.txt";
    private ByteArrayOutputStream m_baos = new ByteArrayOutputStream(1048576);
    private PrintWriter m_benchMarkOut = new PrintWriter(new BufferedWriter(new OutputStreamWriter(this.m_baos)));

    public ConsumerFuture(String str) {
        this.m_clientID = str;
        this.m_benchMarkOut.println("msg ID, ns");
    }

    @Override // java.lang.Runnable
    public void run() {
        MQTT mqtt = new MQTT();
        try {
            mqtt.setHost("localhost", MQTTDropServer.PORT);
            mqtt.setClientId(this.m_clientID);
            FutureConnection futureConnection = mqtt.futureConnection();
            try {
                futureConnection.connect().await();
                try {
                    LOG.info("Subscribed to topic");
                    Pattern compile = Pattern.compile(".*!!(\\d+)");
                    long currentTimeMillis = System.currentTimeMillis();
                    for (int i = 0; i < 100000; i++) {
                        try {
                            Message message = (Message) futureConnection.receive().await();
                            Matcher matcher = compile.matcher(new String(message.getPayload()));
                            matcher.matches();
                            this.m_benchMarkOut.println(String.format("%s, %d", matcher.group(1), Long.valueOf(System.nanoTime())));
                            LOG.debug(new StringBuffer().append("Topic: ").append(message.getTopic()).append(", payload: ").append(new String(message.getPayload())).toString());
                        } catch (Exception e) {
                            LOG.error((String) null, e);
                            return;
                        }
                    }
                    LOG.info(String.format("Consumer %s received %d messages in %d ms", Thread.currentThread().getName(), Integer.valueOf(Producer.PUB_LOOP), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
                    Future disconnect = futureConnection.disconnect();
                    try {
                        LOG.info("Disconneting");
                        disconnect.await();
                        LOG.info("Disconnected");
                    } catch (Exception e2) {
                        LOG.error("Cant't DISCONNECT to the server", e2);
                    }
                    this.m_benchMarkOut.close();
                    try {
                        FileOutputStream fileOutputStream = new FileOutputStream(BENCHMARK_FILE);
                        fileOutputStream.write(this.m_baos.toByteArray());
                        fileOutputStream.close();
                    } catch (IOException e3) {
                        e3.printStackTrace();
                    }
                } catch (Exception e4) {
                    LOG.error("Cant't PUSBLISH to the server", e4);
                }
            } catch (Exception e5) {
                LOG.error("Cant't CONNECT to the server", e5);
            }
        } catch (URISyntaxException e6) {
            LOG.error((String) null, e6);
        }
    }
}
