-
Notifications
You must be signed in to change notification settings - Fork 0
/
final_data_rolling_retention.py
102 lines (63 loc) · 2.82 KB
/
final_data_rolling_retention.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import json
import os
from datetime import datetime, timedelta
import pytz
import calculate_daily_transaction_data
import config
import calculate_rolling_retention_data
from manage_transactions import get_first_transaction_timestamp
from util import logging
STORE_FINAL_DATA_USER = '/terra-data/v2/final/rolling_retention'
log = logging.get_custom_logger(__name__, config.LOG_LEVEL)
def final_data_rolling_retention():
os.makedirs(STORE_FINAL_DATA_USER, exist_ok=True)
max_time = datetime.utcnow()
max_time = max_time.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
stop_processing = False
date_to_process = get_first_transaction_timestamp()
log.debug('generate final data: rolling retention')
if date_to_process >= max_time:
return
user_list = {}
while not stop_processing:
user_meta_data = {}
log.debug('creating final rolling retention data for ' + date_to_process.strftime('%Y-%m-%d'))
# Total
# New
# Daily Active
# retention 7d
# retention 14d
# retention 30d
file_path = os.path.join(STORE_FINAL_DATA_USER, date_to_process.strftime('%Y-%m-%d') + '.json')
raw_data = calculate_daily_transaction_data.get_user(date_to_process)
for currency in raw_data.keys():
if currency not in user_list.keys():
user_list[currency] = set()
user_meta_data[currency] = {}
count_new_user = 0
count_daily_active = len(raw_data[currency])
for user_object in raw_data[currency]:
if not user_object['address'] in user_list[currency]:
count_new_user += 1
user_list[currency].add(user_object['address'])
user_meta_data[currency]['new'] = count_new_user
user_meta_data[currency]['daily'] = count_daily_active
if not os.path.isfile(file_path):
final_data = {}
for currency in user_list.keys():
#
# calculate retention data
#
retention_data = calculate_rolling_retention_data.get_retention_for_date(date_to_process, currency)
final_data[currency] = {
'total': len(user_list[currency]),
'new': user_meta_data.get(currency)['new'] if (currency in user_meta_data) else 0,
'daily': user_meta_data.get(currency)['daily'] if (currency in user_meta_data) else 0,
'rolling_retention': retention_data,
}
if len(raw_data.keys()) > 0:
with open(file_path, 'w') as file:
file.write(json.dumps(final_data))
date_to_process += timedelta(days=1)
if date_to_process >= max_time:
stop_processing = True