-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsumer.py
32 lines (22 loc) · 863 Bytes
/
Consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from kafka import KafkaConsumer
import json
from kafka.errors import KafkaError
def json_deserializer(data):
return json.loads(data)
# each consumer will be in a consumer group
consumer = KafkaConsumer(
bootstrap_servers=["localhost:9092","localhost:9093"],
auto_offset_reset='earliest',
group_id="C1",
key_deserializer=json_deserializer,
value_deserializer=json_deserializer)
consumer.subscribe(["Reg_User","Data_User"])
if __name__ == "__main__":
print("Starting")
for msg in consumer:
print("Key: ", msg.key)
print("Key: ", msg.value["price"])
print("Key: ", type(msg.value))
log1 = json.dumps(msg.value)
print(type(log1))
print("Reg_User: ", type(log1))