Kafka Producer and Consumer concepts with twitter live tweets on the "term" "Kafa", channel it to "twitter" topic and Consume with Kafka consumer groups "twitter_consumer_group"
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// producer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
if (msg != null) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("twitter", null, msg);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("some exception with producer ");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("stopping application ...");
log.info("shutting down client from twitter");
log.info("closing producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
- To compress the message using snappy
- linger.ms will stall the message to reach the given batch size which is 16MB and time or size whichever comes first, the message will be dispatched to kafka broker
// High Throughput Settings
// 16 bytes
props.put("batch.size", 16384);
// 20 seconds
props.put("linger.ms", 20);
// snappy compression
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
create an index with console put request put /twitter
add docs to the index
put /twitter/{documentname}/id
public static RestHighLevelClient createClient() {
// https://vvwqq42n2r:3wuhgyc6o@kafka-course-3974031019.us-east-1.bonsaisearch.net:443
String hostname = "kafka-course-3974031019.us-east-1.bonsaisearch.net";
String username = "vvwqq42n2r";
String password = "3wuhgyc6o";
// do if you are not running a local Elastic Search
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(new HttpHost(hostname, 443, "https"))
.setHttpClientConfigCallback(new HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
[main] INFO com.sri.kafka.consumer.ElasticSearchConsumer - D8_ZaHcBaCBESApmMUl1
GET /twitter/tweets/D8_ZaHcBaCBESApmMUl1
"_index": "twitter",
"_type": "tweets",
"_id": "D8_ZaHcBaCBESApmMUl1",
"_version": 1,
"_seq_no": 1,
"_primary_term": 1,
"found": true,
"_source": {
"foo": "bar"
Added Kafka-Consumer config and used Elastic Search to process the consumed records to index [twitter]
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
IndexRequest indexRequest = new IndexRequest("twitter", "tweets").source(record.value(),
IndexResponse index = client.index(indexRequest, RequestOptions.DEFAULT);
try {
// small delay before we consume next batch
} catch (InterruptedException e) {
[main] INFO com.sri.kafka.consumer.ElasticSearchConsumer - idFromRecord -->1357036689613611010
// Now You can find the tweet with the same id on elastic search get /twitter/tweets/1357036689613611010
private static boolean extractIdFrom(String value) {
int asInt = JsonParser.parseString(value).getAsJsonObject().get("user").getAsJsonObject().get("followers_count")
if (asInt > 1000) {
return true;
return false;
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// similar to consumer groups
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "demo-kafka-streams");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
// input topic
KStream<String, String> stream = streamsBuilder.stream("twitter");
KStream<String, String> filteredStream = stream.filter((k, jsonTweet) -> {
// filter for tweets which has a user of over 10000 followers
return extractIdFrom(jsonTweet);
// build the topology
KafkaStreams kafkaStream = new KafkaStreams(streamsBuilder.build(), properties);
// start our streams application