Producer streams tweets from twitter into Kafka and the consumers retrieves the data from kafka and loads in to the Elasticsearch Sink
This is a producer consumer application using Kafka
Prerequsities;
- Download Zookeeper. Configure the config/zookeeper.properties file. zookeeper is required for kafka to run. Zookeeper will store the kafka configurations
- Download Kafka. Configure the config/server.properties
- Download ElasticSearch and run it on port 9200. Optionally you can use Bonsai Elastic Search cloud or Elastic Search cloud service provided with 3 node cluster in free tier.
#zookeeper start
zookeeper-server-start.bat config/zookeeper.properties
runs on port 2181
#kafka start
kafka-server-start.bat config/server.properties
#create multiple copies of server.properties and run them separately for multi broker kafka application
runs on port 9092
#Steps
- Create a topic named twitter_tweet
- Configure the producer with bootstrapServerHost = "127.0.0.1:9092";
- Create an Application using Twitter Developer account and obtain the credentials
- Configure the producer with the twitter credentials to retrieve tweets from Twitter
- The producer will load the tweets from twitter into the kafka topic "twitter_tweet"
- Configure the consumer with bootstrapServerHost = "127.0.0.1:9092";
- Subscribe consumer to the topic "twitter_tweet"
- Create a RestClientBuilder to connect to ElasticSearch running in port 9200 in local
- Run the producer application to publish the tweets to kafka
- Run the consumer application to consume the tweets from kafka and load into Elastic Search using IndexRequest object. BulkRequest is very efficient
- View the indexed tweets from Kibana and create dashboards
url to find index in ElastiCSearch
http://localhost:9200/_cat/indices/?v
url to find data inside index
http://localhost:9200/elasticsearch/_search
#Kafka_Streams
- Kafka Streams is used to process data in real time
- In this application, the Kafka Streams process the tweets from topic "twitter_tweet"
- It filters the tweet wtih more than 10,000 followers
- It then puts the filteres tweets in to "important_tweets" topic
- Start a console consumer to consume from this topic "important_tweets" and verify the data