Kafka-with-twitter
In this project, Kafka is used
with Twitter API.
Main idea behind this workflow is first to fetch required data(tweets) with the help of Twitter API and then storing that data in
ElasticSearch. Kafka is used as a messaging system. It collect data from Twitter and store in ElasticSearch. Lets try to make things a bit more clear with the help of a flow-diagram.
Steps -
-
Create Twitter Client
We don't want all the tweets, just the selected ones. In this case we have taken the condition that, only take those tweets with keywords "bitcoin" and "kafka". We can take as many terms as we want. Declare the host you want to connect to, the endpoint, and authentication. To connect to Twitter API first condition is to create a Twitter Developer Account, and create an application. You'll get credentials which are required for the authentication. Connect to the clientclient.connect()
and we are done. -
Create Kafka Producer
After the client let's create the producer. Client will fetch the desired tweets from Twitter and store in an ArrayList(msgQueue). For producer, first of all define theKafka-Topic
. We need to manually create this topic throughkafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1
. My topic name istwitter-tweets
. The message(messageQueue.poll()) will be taken from the messageQueue to be produced to Kafka. To learn more refer the link.
Here is the output format from producer. It will show the desired tweets. -
Create Elasticsearch Client
For free ElasticSearch cluster refer to app.bonsai.io. Signup and create free cluster with 3 nodes. This will give you your own elasticsearch cluster. You need to provide credentials here as well, which are available inAccess
section of bonsai. This client will allow us to insert data in elasticsearch. -
Create Kafka Consumer
For consumer, first of all create properties, then create consumer and then subscribe the consumer with the Kafka topic i.e.twitter-tweets
.For now we are displaying the number of recieved tweets and then forwarding them to ElasticSearch. An ID is generated through which you can access the exact tweet in Elasticsearch byGET /twitter-tweets/tweets/ID
Note - Before trying to run producer and consumer, there are two things one has to make sure first.
(1) Zookeeper should be running.
To run zookeeper - zookeeper-server-start config/zookeeper.properties
.
(2) Kafka server should be running.
To run a kafka server - kafka-server-start config/kafka.properties
.
Upcoming changes
In future I am also planning to perform monitoring with the help of tools like grafana or cprometheus. So stay tuned!
To learn more about Kafka refer to official documentation.