-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_events.py
47 lines (37 loc) · 1.17 KB
/
generate_events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
import random
from datetime import datetime
import time
import requests
def send_events(data, token, host):
params = {
'name': 'landing_datasource',
'token': token,
'host': host
}
r = requests.post(f'{host}/v0/events', params=params, data=data)
print(data)
def create_events():
tenant_ids = list(range(50))
event_types = ['login', 'open', 'edit', 'save', 'delete', 'click', 'comment']
responses = [200, 400, 500]
response_weights = [95, 4, 1]
with open ('./.tinyb') as tinyb:
tb = json.load(tinyb)
token = tb['token']
host = tb['host']
while True:
events = []
for i in range(10):
message = {
'timestamp': datetime.utcnow().isoformat(),
'tenant_id': random.choice(tenant_ids),
'event_type': random.choice(event_types),
'response': random.choices(responses, response_weights)[0]
}
events.append(message)
data = '\n'.join(json.dumps(e) for e in events)
send_events(data, token, host)
time.sleep(0.1)
if __name__ == '__main__':
create_events()