-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathKafkaProducer.py
145 lines (115 loc) · 4.65 KB
/
KafkaProducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# !/usr/bin/env python
"""
author : Marcelo Sanches
doc name : Kafka Producer
purpose : to produce messages to a Kafka topic for a recommender system
date : 05.06.2019
version : 3.7.2
"""
# Import modules
import os
import time
import ujson
import pandas as pd
import requests
#import urllib2
from retrying import retry
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer
from concurrent.futures import wait
# Setup functions
def delete_recommender_system_topic():
"""Deletes the recommender system topic if it exists
"""
topics = admin.list_topics(timeout=10).topics.keys()
# delete topic if exists
while recommender_system_topic in topics:
print("Trying to delete " + recommender_system_topic)
status = admin.delete_topics([recommender_system_topic])
fut = status[recommender_system_topic]
try:
fut.result()
except Exception as e:
print(e)
topics = admin.list_topics(timeout=10).topics.keys()
def create_recommender_system_topic():
"""Creates the recommender system topic if it doesn't already exist
"""
topics = admin.list_topics(timeout=10).topics.keys()
# create topic if doesn't already exist
while recommender_system_topic not in topics:
print("Trying to create " + recommender_system_topic)
status = admin.create_topics([NewTopic(recommender_system_topic,
num_partitions=3,
replication_factor=1)])
fut = status[recommender_system_topic]
try:
fut.result()
except Exception as e:
print(e)
topics = admin.list_topics(timeout=10).topics.keys()
print(topics)
# Run
if __name__ == '__main__':
# Basic setup
recommender_system_topic = 'recommender.system.1'
brokers = 'kafka-1:9092'
admin = AdminClient({'bootstrap.servers': brokers})
# delete and create topic if exists
delete_recommender_system_topic()
create_recommender_system_topic()
# setup producer
p = Producer({'bootstrap.servers': 'kafka-1:9092'})
while True:
# READ DATA -- ideally, hit an endpoint and stream
#file_path='https://archive.ics.uci.edu/ml/machine-learning-databases/\
# 00352/Online%20Retail.xlsx'
#response=urllib2.urlopen(file_path)
#html=response.read()
# reading from local file instead
# convert Excel file to CSV if first time
if os.path.isfile('./Online Retail.csv') == True:
pass
else:
df = pd.read_excel("Online Retail.xlsx")
# filter out missing data (~25% CustomerIDs are NA)
df = df.dropna()
# change CustomerID to integer
df['CustomerID'] = df['CustomerID'].astype(int)
# save as CSV
df.to_csv("Online_Retail.csv", index=False)
# read in CSV
df = pd.read_csv('Online_Retail.csv')
# group by invoice num
invoice_groups = df.groupby('InvoiceNo')
# iterate over each group (each invoice)
for invoice_name, invoice in invoice_groups:
time.sleep(0.1) # remove or reduce value, added to sync with slower consumer
basket = {}
stockcodes = []
descriptions = []
quantities = []
unitprices = []
# iterate over rows in this invoice dataframe
for row_index, row in invoice.iterrows():
# these fields are the same for each row, so doesn't matter if we keep overwriting
basket['InvoiceNo'] = row['InvoiceNo']
basket['CustomerID'] = row['CustomerID']
basket['InvoiceDate'] = row['InvoiceDate']
basket['Country'] = row['Country']
# these fields are different for each row, so we append to lists
stockcodes.append(row['StockCode'])
descriptions.append(row['Description'])
quantities.append(row['Quantity'])
unitprices.append(row['UnitPrice'])
basket['StockCodes'] = stockcodes
basket['Descriptions'] = descriptions
basket['Quantities'] = quantities
basket['UnitPrices'] = unitprices
# produce each message (basket)
msgbytes = ujson.dumps(basket).encode('utf-8')
p.produce(recommender_system_topic, msgbytes)
print("produced basket " +str(invoice_name))
p.flush()
print("Done flushing")
time.sleep(15) # time to kill the producer so as not to duplicate data