-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrs_pySpark.py
113 lines (54 loc) · 2.29 KB
/
rs_pySpark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from pyspark.mllib.recommendation import ALS
from pyspark import SparkContext
import sqlalchemy
import json
import pandas as pd
sc = SparkContext()
path_user_inventory = 'steam_user_inventory.txt'
# parse the crawled data set
def parse_new_string(raw_string):
user_inventory = json.loads(raw_string)
user_id = list(user_inventory.keys())[0]
if user_inventory[user_id]['response'] != {} and user_inventory[user_id]['response']['game_count'] != 0:
user_inventory = user_inventory[user_id]['response']['games']
else:
user_inventory = {}
return user_id, user_inventory
# represent 'steam_appid' with index
user_inventory_rdd = sc.textFile(path_user_inventory).map(parse_new_string).zipWithIndex()
# build relationship between 'steam_appid' and 'index'
def id_index(x):
((user_id, lst_inventory), index) = x
return index, user_id
dict_id_index = user_inventory_rdd.map(id_index).collectAsMap()
# create training rdd => format in (index, appid, playtime_forever)
def create_tuple(x):
((user_id, lst_inventory), index) = x
if lst_inventory != {}:
return index, [(i.get('appid'), i.get('playtime_forever'))
for i in lst_inventory if i.get('playtime_forever') > 0]
else:
return index, []
training_rdd = user_inventory_rdd.map(create_tuple).flatMapValues(lambda x: x).map(lambda x: (x[0], x[1][0], x[1][1]))
# '5' stands for feature dimension; default value is '3'
model = ALS.train(training_rdd, 5)
# make the recommendation
dic_recommend = {'g0':{}, 'g1':{}, 'g2':{},'g3':{}, 'g4':{}, 'g5':{},'g6':{}, 'g7':{}, 'g8':{}, 'g9':{}}
for index in dict_id_index.keys():
try:
if index % 200 == 0:
print('working on ', index)
lst_recommend = [i.product for i in model.recommendProducts(index, 10)]
user_id = dict_id_index.get(index)
rank = 0
for app_id in lst_recommend:
dic_recommend['g%s'%rank].update({user_id:app_id})
rank+=1
except:
pass
# output to the mySQL database
engine = sqlalchemy.create_engine('mysql+pymysql://@127.0.0.1/game_recommendation?charset=utf8mb4')
df = pd.DataFrame(dic_recommend)
df.index.name = 'user_id'
df = df.reset_index()
df.to_sql('tbl_recommend_games', engine, if_exists='replace')