public class KafkaSample extends java.lang.Object
Connectors are used to create a bridge between topology streams and a Kafka cluster:
KafkaConsumer - subscribe to Kafka topics and create streams of messages.KafkaProducer - publish streams of messages to Kafka topics.The sample publishes some messages to a Kafka topic. It also subscribes to the topic and reports the messages received. The messages received may include messages from prior runs of the sample.
The sample requires a running Kafka cluster with the following characteristics:
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaSampleTopic
localhost:2181localhost:9092Required IBM Streams environment variables:
DISTRIBUTED
DISTRIBUTED
See the Apache Kafka link above for information about setting up a Kafka cluster and creating a topic.
This may be executed from the samples/java/functional directory as:
ant run.kafka.distributed - Using Apache Ant, this will run in distributed mode.ant run.kafka - Using Apache Ant, this will run in standalone mode.java -cp functionalsamples.jar:../../../com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar:$STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar
kafka.KafkaSample CONTEXT_TYPE
- Run directly from the command line.
DISTRIBUTED - Run as an IBM Streams distributed application.STANDALONE - Run as an IBM Streams standalone application.BUNDLE - Create an IBM Streams application bundle.TOOLKIT - Create an IBM Streams application toolkit.| Constructor and Description |
|---|
KafkaSample() |
| Modifier and Type | Method and Description |
|---|---|
static void |
main(java.lang.String[] args) |
void |
publishSubscribe(java.lang.String contextType)
Publish some messages to a topic, scribe to the topic and report
received messages.
|
public KafkaSample()
public static void main(java.lang.String[] args) throws java.lang.Exception
java.lang.Exceptionpublic void publishSubscribe(java.lang.String contextType) throws java.lang.Exception
contextType - string value of a StreamsContext.Typejava.lang.Exception