Package com.networknt.tram.test
Class KafkaTestBase
- java.lang.Object
-
- com.networknt.tram.test.KafkaTestBase
-
public abstract class KafkaTestBase extends Object
The base for the Kafka tests. It brings up:- A ZooKeeper mini cluster
- Three Kafka Brokers (mini clusters)
Code in this test is based on the following GitHub repository: https://github.com/sakserv/hadoop-mini-clusters (ASL licensed), as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef
-
-
Field Summary
Fields Modifier and Type Field Description protected static StringbrokerConnectionStringsprotected static KafkaTestEnvironmentkafkaServerprotected static org.slf4j.LoggerLOGprotected static intNUMBER_OF_KAFKA_SERVERSprotected static PropertiessecurePropsprotected static PropertiesstandardPropsstatic org.junit.rules.TemporaryFoldertempFolderprotected static scala.concurrent.duration.FiniteDurationtimeout
-
Constructor Summary
Constructors Constructor Description KafkaTestBase()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected voidassertAtLeastOnceForTopic(Properties properties, String topic, int partition, Set<Integer> expectedElements, long timeoutMillis)We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.protected voidassertExactlyOnceForTopic(Properties properties, String topic, int partition, List<Integer> expectedElements)protected voidassertExactlyOnceForTopic(Properties properties, String topic, int partition, List<Integer> expectedElements, long timeoutMillis)We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.protected static voidcreateTestTopic(String topic, int numberOfPartitions, short replicationFactor)protected static voiddeleteTestTopic(String topic)static voidprepare()static voidprepare(boolean hideKafkaBehindProxy)protected static voidshutdownClusters()static voidshutDownServices()protected static voidstartClusters(boolean secureMode, boolean hideKafkaBehindProxy)
-
-
-
Field Detail
-
LOG
protected static final org.slf4j.Logger LOG
-
NUMBER_OF_KAFKA_SERVERS
protected static final int NUMBER_OF_KAFKA_SERVERS
- See Also:
- Constant Field Values
-
brokerConnectionStrings
protected static String brokerConnectionStrings
-
standardProps
protected static Properties standardProps
-
timeout
protected static scala.concurrent.duration.FiniteDuration timeout
-
kafkaServer
protected static KafkaTestEnvironment kafkaServer
-
tempFolder
public static org.junit.rules.TemporaryFolder tempFolder
-
secureProps
protected static Properties secureProps
-
-
Method Detail
-
prepare
public static void prepare() throws ClassNotFoundException- Throws:
ClassNotFoundException
-
prepare
public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException- Throws:
ClassNotFoundException
-
startClusters
protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException- Throws:
ClassNotFoundException
-
createTestTopic
protected static void createTestTopic(String topic, int numberOfPartitions, short replicationFactor)
-
deleteTestTopic
protected static void deleteTestTopic(String topic)
-
assertAtLeastOnceForTopic
protected void assertAtLeastOnceForTopic(Properties properties, String topic, int partition, Set<Integer> expectedElements, long timeoutMillis)
We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error. After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.- Parameters:
properties- Propertiestopic- String topicpartition- int partitionexpectedElements- SettimeoutMillis- long timeout
-
assertExactlyOnceForTopic
protected void assertExactlyOnceForTopic(Properties properties, String topic, int partition, List<Integer> expectedElements)
-
assertExactlyOnceForTopic
protected void assertExactlyOnceForTopic(Properties properties, String topic, int partition, List<Integer> expectedElements, long timeoutMillis)
We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error. After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.- Parameters:
properties- Propertiestopic- String topicpartition- int partitionexpectedElements- ListtimeoutMillis- long timeout
-
-