-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathp1.py
61 lines (53 loc) · 1.78 KB
/
p1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONValueProtocol
import re
import itertools
import time
class UsersCount(MRJob):
INPUT_PROTOCOL = JSONValueProtocol
def createWordList(self, line):
wordList2 =[]
wordList1 = line.split()
for word in wordList1:
cleanWord = ""
for char in word:
if char in '!,.?":;0123456789':
char = ""
cleanWord += char
wordList2.append(cleanWord)
return wordList2
def mapper_separate_text(self, _, line):
review = line['review_id']
text = self.createWordList(line['text'])
for word in text:
yield word, review
def unique_words_reducer(self, key, values):
reviews = list(values)
if len(reviews) == 1:
yield reviews[0], 1
def count_unique_words_per_review(self, key, values):
yield ["Max", [key, sum(values)]]
def get_most_original_review(self, key, values):
yield key, max(values, key=lambda item: item[1])
def steps(self):
return [
MRStep(
mapper=self.mapper_separate_text,
reducer=self.unique_words_reducer
),
MRStep(
reducer=self.count_unique_words_per_review
),
MRStep(
reducer=self.get_most_original_review
),
]
if __name__ == '__main__':
print "Begin..."
time_init = time.time()
UsersCount.run()
duration = time.time() - time_init
print "End!"
print "________________________________"
print "Query duration: {0}".format(duration)