Class KafkaTestBase


  • public abstract class KafkaTestBase
    extends Object
    The base for the Kafka tests. It brings up:
    • A ZooKeeper mini cluster
    • Three Kafka Brokers (mini clusters)

    Code in this test is based on the following GitHub repository: https://github.com/sakserv/hadoop-mini-clusters (ASL licensed), as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef

    • Field Detail

      • LOG

        protected static final org.slf4j.Logger LOG
      • NUMBER_OF_KAFKA_SERVERS

        protected static final int NUMBER_OF_KAFKA_SERVERS
        See Also:
        Constant Field Values
      • brokerConnectionStrings

        protected static String brokerConnectionStrings
      • standardProps

        protected static Properties standardProps
      • timeout

        protected static scala.concurrent.duration.FiniteDuration timeout
      • tempFolder

        public static org.junit.rules.TemporaryFolder tempFolder
      • secureProps

        protected static Properties secureProps
    • Constructor Detail

      • KafkaTestBase

        public KafkaTestBase()
    • Method Detail

      • shutDownServices

        public static void shutDownServices()
                                     throws Exception
        Throws:
        Exception
      • shutdownClusters

        protected static void shutdownClusters()
                                        throws Exception
        Throws:
        Exception
      • createTestTopic

        protected static void createTestTopic​(String topic,
                                              int numberOfPartitions,
                                              short replicationFactor)
      • deleteTestTopic

        protected static void deleteTestTopic​(String topic)
      • assertAtLeastOnceForTopic

        protected void assertAtLeastOnceForTopic​(Properties properties,
                                                 String topic,
                                                 int partition,
                                                 Set<Integer> expectedElements,
                                                 long timeoutMillis)
        We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error. After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
        Parameters:
        properties - Properties
        topic - String topic
        partition - int partition
        expectedElements - Set
        timeoutMillis - long timeout
      • assertExactlyOnceForTopic

        protected void assertExactlyOnceForTopic​(Properties properties,
                                                 String topic,
                                                 int partition,
                                                 List<Integer> expectedElements)
      • assertExactlyOnceForTopic

        protected void assertExactlyOnceForTopic​(Properties properties,
                                                 String topic,
                                                 int partition,
                                                 List<Integer> expectedElements,
                                                 long timeoutMillis)
        We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error. After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
        Parameters:
        properties - Properties
        topic - String topic
        partition - int partition
        expectedElements - List
        timeoutMillis - long timeout