-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathJSR223 code Sample.txt
41 lines (38 loc) · 1.9 KB
/
JSR223 code Sample.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("invoices-service-invoice-database-events"));
System.out.println("Subscribed to topic " + "first_topic");
int i = 0;
long t = System.currentTimeMillis();
long end = t + 5000;
f = new FileOutputStream(".\\data.csv", true);
p = new PrintStream(f);
while (System.currentTimeMillis()<end)
{
org.apache.kafka.clients.consumer.ConsumerRecords<String, String> records = consumer.poll(100);
for (org.apache.kafka.clients.consumer.ConsumerRecord<String, String> record : records)
{
System.out.println("offset = " + record.offset() +" value = " + record.value( ));
p.println( "offset = " + record.offset() +" value = " + record.value());
}
consumer.commitSync();
}
consumer.close();
p.close();
f.close();