forked from alirizakeles/ab-2018
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschedule_notify.py
67 lines (48 loc) · 2.29 KB
/
schedule_notify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import redis
import json
import random
import pika
r = redis.Redis()
def on_request(ch, method, properties, body):
user = json.loads(body.decode())
user_suggestion_key = 'User:Suggestions:{}'.format(user['email'])
git_keys = ["Github:Stars:{}".format(sub) for sub in user['subscriptions']['github']]
git_keys.append("Gitlab:Stars:{}".format(sub) for sub in user['subscriptions']['gitlab'])
if check_empty_set(user_suggestion_key):
r.sunionstore(dest=user_suggestion_key, keys=git_keys)
prepare_queue_works(user, user_suggestion_key,user['repoCount'], git_keys)
def check_empty_set(user_suggestion_key):
if len(r.smembers(user_suggestion_key)) == 0:
flag = True
else:
flag = False
return flag
def prepare_queue_works(user, user_suggestion_key, count, git_keys):
chosen_repo_list = []
ready_repos_list = []
if len(r.smembers(user_suggestion_key)) != 0 :
while count > 0:
if check_empty_set(user_suggestion_key) :
r.sunionstore(dest=user_suggestion_key, keys=git_keys)
count -= 1
chosen_repo_list.append(r.spop(user_suggestion_key))
for repo in chosen_repo_list:
repo_key = "Github:Repos:{}".format(repo.decode())
repo_info = r.hgetall(repo_key)
decoded_repo_info = {k.decode(): v.decode() for k, v in repo_info.items()}
ready_repos_list.append(decoded_repo_info)
email_dict = {'to':'{}'.format(user['email']),'repos':ready_repos_list}
if user['telegramId'] != None :
telegram_dict = {'to':'{}'.format(user['telegramId']),'repos':ready_repos_list}
work_telegram = json.dumps(telegram_dict)
r.lpush('telegram_queue',work_telegram)
work_email = json.dumps(email_dict)
r.lpush('email_queue',work_email)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='work',
exchange_type='direct')
channel.queue_declare(queue='schedule_notifier_queue')
channel.queue_bind(exchange='work', queue='schedule_notifier_queue',routing_key='schedule_notifier')
channel.basic_consume(on_request, queue='schedule_notifier_queue', no_ack=True)
channel.start_consuming()