Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
change kafka consumer
Browse files Browse the repository at this point in the history
change kafka consumer
  • Loading branch information
137-rick committed Apr 2, 2018
1 parent 3c1bbaf commit 7c30338
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 51 deletions.
176 changes: 131 additions & 45 deletions server/src/main/java/org/weiboad/ragnar/server/kafka/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
Expand All @@ -19,7 +22,7 @@
import org.weiboad.ragnar.server.struct.MetaLog;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.*;

@Component
@Scope("singleton")
Expand All @@ -41,83 +44,166 @@ public class Consumer implements DisposableBean, Runnable {

private Logger log = LoggerFactory.getLogger(Consumer.class);

private Long offset = 0L;
private Map<Integer, Long> parationOffset = new HashMap<>();

public Long getOffset() {
return offset;
}
private KafkaConsumer<String, String> consumer;

private boolean isDestroy = false;

@PostConstruct
public void start() {
if (fieryConfig.getKafkaenable()) {
log.info("start kafka Consumer...");
this.startConsumer(false);
}
}

public boolean startConsumer(boolean startWithOffset) {
if (this.thread == null || !this.thread.isAlive()) {
log.info("start kafka Log Consumer Thread...");

this.isDestroy = false;
this.thread = new Thread(this);
this.thread.start();
return true;
}

log.error("kafka consumer thread is alive...");
return false;
}


/**
* get paration offset map
*
* @return Map
*/
public Map<Integer, Long> getParationOffset() {
return this.parationOffset;
}

@Override
public void run() {
if (fieryConfig.getKafkaenable()) {
KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(fieryConfig.getKafkaserver(), fieryConfig.getKafkagroupid());
consumer.subscribe(Arrays.asList(fieryConfig.getKafkatopic().split(",")));
try {

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
//when the queue is full pause the queue
boolean isPause = false;

for (ConsumerRecord<String, String> record : records) {
List<String> topicList = new ArrayList<>();
topicList.add(fieryConfig.getKafkatopic());

this.offset = record.offset();
//log.info("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
String content = record.value();
consumer = KafkaUtil.getConsumer(fieryConfig.getKafkaserver(), fieryConfig.getKafkagroupid());
consumer.subscribe(topicList);
while (!this.isDestroy) {

if (content.trim().length() == 0) {
continue;
List<PartitionInfo> parationList = consumer.partitionsFor(fieryConfig.getKafkatopic());

//not avalible pause poll
if ((!bizLogProcessor.checkAvalible() || !metaLogProcessor.checkAvalible()) && !isPause) {
for (PartitionInfo partitionInfo : parationList) {
int partitionid = partitionInfo.partition();
TopicPartition partition = new TopicPartition(fieryConfig.getKafkatopic(), partitionid);
consumer.pause(partition);
}
isPause = true;
}

//is meta log?
if (!content.substring(0, 1).equals("[")) {
//metalog
Gson gsonHelper = new Gson();

//base64 decode
try {
sun.misc.BASE64Decoder decoder = new sun.misc.BASE64Decoder();
content = new String(decoder.decodeBuffer(content));
String[] metalogPack = content.trim().split("\n");

//remove the es info
if (metalogPack.length == 2) {
MetaLog metalog = gsonHelper.fromJson(metalogPack[1], MetaLog.class);
indexHelper.insertProcessQueue(metalog);
metaLogProcessor.insertDataQueue(metalog);
}
//avalible on queue resume poll
if ((bizLogProcessor.checkAvalible() && metaLogProcessor.checkAvalible()) && isPause) {
for (PartitionInfo partitionInfo : parationList) {
int partitionid = partitionInfo.partition();
TopicPartition partition = new TopicPartition(fieryConfig.getKafkatopic(), partitionid);
consumer.resume(partition);
}
isPause = false;
}

} catch (Exception e) {
log.error("parser json:" + content);
e.printStackTrace();
ConsumerRecords<String, String> records = consumer.poll(1000);

for (ConsumerRecord<String, String> record : records) {

//partion offset record
parationOffset.put(record.partition(), record.offset());

//log.info("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
String content = record.value();

if (content.trim().length() == 0) {
continue;
}

} else {
//common log
JsonParser valueParse = new JsonParser();
//is meta log?
if (!content.substring(0, 1).equals("[")) {
//metalog
Gson gsonHelper = new Gson();

//base64 decode
try {
sun.misc.BASE64Decoder decoder = new sun.misc.BASE64Decoder();
content = new String(decoder.decodeBuffer(content));
String[] metalogPack = content.trim().split("\n");

//remove the es info
if (metalogPack.length == 2) {
MetaLog metalog = gsonHelper.fromJson(metalogPack[1], MetaLog.class);
indexHelper.insertProcessQueue(metalog);
metaLogProcessor.insertDataQueue(metalog);
}

} catch (Exception e) {
log.error("parser json:" + content);
e.printStackTrace();
}

} else {
//common log
JsonParser valueParse = new JsonParser();

try {
JsonArray valueArr = (JsonArray) valueParse.parse(content);
bizLogProcessor.insertDataQueue(valueArr);
} catch (Exception e) {
e.printStackTrace();
log.error("kafka parser json wrong:" + content);
try {
JsonArray valueArr = (JsonArray) valueParse.parse(content);
bizLogProcessor.insertDataQueue(valueArr);
} catch (Exception e) {
e.printStackTrace();
log.error("kafka parser json wrong:" + content);
}
}
}

}
} catch (WakeupException e) {
// Ignore exception if closing
if (!this.isDestroy) {
throw e;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
this.consumer = null;
KafkaUtil.cleanConsumer();
}
}
}


/**
* stop consumer
*/
public void shutdown() {
log.info("shutdown the consumer threat");
this.isDestroy = true;
consumer.wakeup();
try {
this.thread.join();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void destroy() {

this.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public static KafkaConsumer<String, String> getConsumer(String serverList, Strin

return kc;
}
public static void cleanConsumer() {
kc = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import org.weiboad.ragnar.server.util.DateTimeHelper;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Component
@Scope("singleton")
public class BizLogProcessor {

private ConcurrentLinkedQueue<JsonArray> BizLogQueue = new ConcurrentLinkedQueue<>();
private int maxQueueLength = 20000;

private LinkedBlockingQueue<JsonArray> BizLogQueue = new LinkedBlockingQueue<>();

//log obj
private Logger log = LoggerFactory.getLogger(BizLogProcessor.class);
Expand All @@ -48,10 +51,21 @@ public Integer getQueueLen() {
return BizLogQueue.size();
}

public boolean checkAvalible() {
if (this.BizLogQueue.size() >= this.maxQueueLength / 2) {
return false;
}
return true;
}

//main process struct
public void insertDataQueue(JsonArray data) {
if (data != null) {
BizLogQueue.add(data);
try {
this.BizLogQueue.put(data);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import org.weiboad.ragnar.server.statistics.api.APIStatisticTimeSet;
import org.weiboad.ragnar.server.search.IndexService;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Component
@Scope("singleton")
public class MetaLogProcessor {

private ConcurrentLinkedQueue<MetaLog> metaLogQueue = new ConcurrentLinkedQueue<>();
private int maxQueueLength = 20000;

private LinkedBlockingQueue<MetaLog> metaLogQueue = new LinkedBlockingQueue<>();

//log obj
private Logger log = LoggerFactory.getLogger(BizLogProcessor.class);
Expand All @@ -35,8 +37,19 @@ public Integer getQueueLen() {
//main process struct
public void insertDataQueue(MetaLog data) {
if (data != null) {
metaLogQueue.add(data);
try {
this.metaLogQueue.put(data);
} catch (Exception e) {
e.printStackTrace();
}
}
}

public boolean checkAvalible() {
if (this.metaLogQueue.size() >= this.maxQueueLength / 2) {
return false;
}
return true;
}

@Async
Expand All @@ -53,7 +66,7 @@ public void processData() {

totalProcess++;

if (totalProcess > 5000) {
if (totalProcess > this.maxQueueLength) {
break;
}

Expand Down

0 comments on commit 7c30338

Please sign in to comment.