-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrhadoop_test.R
150 lines (130 loc) · 4.61 KB
/
rhadoop_test.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
library(rhdfs)
library(rmr2)
hdfs.init()
#===================
small.ints <- to.dfs(1:10)
mapreduce(input = small.ints, map = function(k, v)cbind(k, v))
from.dfs('/tmp/file19f73df21734') #ajust
#===================
webpages <- read.csv('webpages.csv', stringsAsFactors = FALSE)
webpage.dfs <- to.dfs(webpages)
total_visit <- sum(webpages$visits)
key <- NA
val <- NULL
mapper_1 <- function(k, v) {
key <- v[2]
val <- v[3]
keyval(key, val)
}
reducer_1 <- function(k, v) {
per <- sum(v) / total_visit * 100
if (per > 67) {
val <- 'high'
} else if(per > 33 & per <= 67) {
val <- 'median'
} else {
val <- 'low'
}
keyval(k, val)
}
mapper_2 <- function(k, v) {
keyval(v, k)
}
reducer_2 <- function(k, v) {
if (is.na(key)) {
key <- k
val <- v
} else {
if (key == k) {
val <- c(val, v)
} else {
key <- k
val <- v
}
}
keyval(key, list(val))
}
mapreduce(webpage.dfs, map = mapper_1, reduce = reducer_1) %>%
mapreduce(map = mapper_2, reduce = reducer_2, combine = TRUE) %>%
from.dfs()
#====================
wc.map <- function(k, v) {
keyval(unlist(strsplit(v, ' ')), 1)
}
wc.reduce <- function(word, counts) {
keyval(word, sum(counts))
}
test_input <- to.dfs('this is a test, test rhadoop wordcount script, rhadoop is greate')
mapreduce(input = test_input, map = wc.map, reduce = wc.reduce, combine = TRUE)
#====================
train <- data.frame(user = c(rep(1,3), rep(2,4), rep(3,4), rep(4,4), rep(5,6)),
item = c(1,2,3,1,2,3,4,1,4,5,7,1,3,4,6,1,2,3,4,5,6),
perf = c(5, 3, 2.5, 2, 2.5, 5, 2, 2, 4, 4.5, 5, 5, 3, 4.5,
4, 4, 3, 2, 4, 3.5, 4))
require(rhdfs)
require(rmr2)
require(plyr)
hdfs.init()
rmr.options(backend = 'hadoop')
train_hdfs <- to.dfs(keyval(train$user, train))
# 获取所有item的共现组合
train_mr <- mapreduce(input = train_hdfs,
map = function(k, v) {
keyval(k, v$item) # user->item
},
reduce = function(k, v) {
m <- merge(v, v)
keyval(m$x, m$y) # all item combination
})
# 获取物品与物品之间的共现次数
step2.mr <- mapreduce(input = train_mr,
map = function(k, v) {
d <- data.frame(k, v)
d2 <- ddply(d, .(k, v), count)
keyval(d2$k, d2) # co
})
# 以item为键 item-user-perf为值
train2_mr <- mapreduce(input = train_hdfs,
map = function(k, v) {
df <- v
key <- df$item
val <- data.frame(item = df$item,
user = df$user,
perf = df$perf) # item, user, perf
keyval(key, val)
})
# 联合物品共现矩阵与物品被评分矩阵
eq_hdfs <- equijoin(left.input = step2.mr,
right.input = train2_mr,
map.left = function(k, v) {
keyval(k, v)
},
map.right = function(k, v) {
keyval(k, v)
},
outer = c('left')) # item1,item2,freq,item1,user,perf
# group by 同一用户、物品k、物品v,预测评分=对k的评分乘以k和v的共现次数
cal_mr <- mapreduce(input = eq_hdfs,
map = function(k, v) {
val <- v
na <- is.na(v$user.r)
if (length(which(na)) > 0) {
val <- v[-which(is.na(v$user.r)), ]
}
keyval(val$k.l, val)
},
reduce = function(k, v) {
val <- ddply(v, .(k.l, v.l, user.r), summarize, v = freq.l * perf.r)
keyval(val$k.l, val) # item1, item2, user, perf
})
# 取出用户、物品v和预测评分,排序
result_mr <- mapreduce(input = cal_mr,
map = function(k, v) {
keyval(v$user.r, v)
},
reduce = function(k, v) {
val <- ddply(v, .(user.r, v.l), summarize, v = sum(v))
val2 <- val[order(val$v, decreasing = TRUE), ]
names(val2) <- c('user', 'item', 'perf')
keyval(val2$user, val2)
})