-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_kafka_vm.py
80 lines (64 loc) · 2.45 KB
/
test_kafka_vm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
import argparse
from constants import *
import json
import pdb
from pykafka import KafkaClient
from pykafka.exceptions import OffsetOutOfRangeError, RequestTimedOut
from pykafka.partitioners import HashingPartitioner
import sys
from tc.schema.serialization import Utils
from tc.schema.serialization.kafka import KafkaAvroGenericSerializer, KafkaAvroGenericDeserializer
parser = argparse.ArgumentParser()
parser.add_argument('--kafka-group', help='Kafka consumer group', required=True)
parser.add_argument('--only-produce', help='Only produce messages',
required=False, action='store_true')
args = vars(parser.parse_args())
kafka_client = KafkaClient(KAFKA_URL)
kafka_topic = kafka_client.topics[args['kafka_group']]
producer = kafka_topic.get_producer(
partitioner=HashingPartitioner(),
sync=True, linger_ms=1, ack_timeout_ms=30000, max_retries=0)
schema = Utils.load_schema(SCHEMA_FILE)
input_file = open('avro/infoleak_small_units.CDM13.avro', 'rb')
serializer = KafkaAvroGenericSerializer(schema)
deserializer = KafkaAvroGenericDeserializer(schema, input_file=input_file)
records = deserializer.deserialize_from_file()
i = 0
produced = []
for edge in records:
#kafka_key = str(i).encode() # this is hashed to select a partition
kafka_key = '0'
produced.append(edge)
message = serializer.serialize(args['kafka_group'], edge)
producer.produce(message, kafka_key)
i += 1
print 'Pushed', i, 'messages'
producer.stop()
input_file.close()
if args['only_produce']:
sys.exit(0)
consumer = kafka_topic.get_balanced_consumer(
consumer_group=args['kafka_group'], auto_commit_enable=True,
auto_commit_interval_ms=1000, reset_offset_on_start=False,
consumer_timeout_ms=100, fetch_wait_max_ms=0, managed=True)
j = 0
consumed = []
while True:
if j >= i:
break
try:
for kafka_message in consumer:
if kafka_message.value is not None:
message = deserializer.deserialize(args['kafka_group'],
kafka_message.value)
consumed.append(message)
j += 1
except RequestTimedOut:
logger.warn('Kafka consumer request timed out')
except OffsetOutOfRangeError:
logger.warn('Kafka consumer offset out of range')
print 'Consumed', i, 'messages'
consumer.stop()
for i in range(len(produced)):
assert consumed[i] == produced[i]