-
Notifications
You must be signed in to change notification settings - Fork 4.5k
/
naive_bayes.py
171 lines (128 loc) · 5.93 KB
/
naive_bayes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from typing import Set
import re
def tokenize(text: str) -> Set[str]:
text = text.lower() # Convert to lowercase,
all_words = re.findall("[a-z0-9']+", text) # extract the words, and
return set(all_words) # remove duplicates.
assert tokenize("Data Science is science") == {"data", "science", "is"}
from typing import NamedTuple
class Message(NamedTuple):
text: str
is_spam: bool
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict
class NaiveBayesClassifier:
def __init__(self, k: float = 0.5) -> None:
self.k = k # smoothing factor
self.tokens: Set[str] = set()
self.token_spam_counts: Dict[str, int] = defaultdict(int)
self.token_ham_counts: Dict[str, int] = defaultdict(int)
self.spam_messages = self.ham_messages = 0
def train(self, messages: Iterable[Message]) -> None:
for message in messages:
# Increment message counts
if message.is_spam:
self.spam_messages += 1
else:
self.ham_messages += 1
# Increment word counts
for token in tokenize(message.text):
self.tokens.add(token)
if message.is_spam:
self.token_spam_counts[token] += 1
else:
self.token_ham_counts[token] += 1
def _probabilities(self, token: str) -> Tuple[float, float]:
"""returns P(token | spam) and P(token | not spam)"""
spam = self.token_spam_counts[token]
ham = self.token_ham_counts[token]
p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)
return p_token_spam, p_token_ham
def predict(self, text: str) -> float:
text_tokens = tokenize(text)
log_prob_if_spam = log_prob_if_ham = 0.0
# Iterate through each word in our vocabulary.
for token in self.tokens:
prob_if_spam, prob_if_ham = self._probabilities(token)
# If *token* appears in the message,
# add the log probability of seeing it;
if token in text_tokens:
log_prob_if_spam += math.log(prob_if_spam)
log_prob_if_ham += math.log(prob_if_ham)
# otherwise add the log probability of _not_ seeing it
# which is log(1 - probability of seeing it)
else:
log_prob_if_spam += math.log(1.0 - prob_if_spam)
log_prob_if_ham += math.log(1.0 - prob_if_ham)
prob_if_spam = math.exp(log_prob_if_spam)
prob_if_ham = math.exp(log_prob_if_ham)
return prob_if_spam / (prob_if_spam + prob_if_ham)
messages = [Message("spam rules", is_spam=True),
Message("ham rules", is_spam=False),
Message("hello ham", is_spam=False)]
model = NaiveBayesClassifier(k=0.5)
model.train(messages)
assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}
text = "hello spam"
probs_if_spam = [
(1 + 0.5) / (1 + 2 * 0.5), # "spam" (present)
1 - (0 + 0.5) / (1 + 2 * 0.5), # "ham" (not present)
1 - (1 + 0.5) / (1 + 2 * 0.5), # "rules" (not present)
(0 + 0.5) / (1 + 2 * 0.5) # "hello" (present)
]
probs_if_ham = [
(0 + 0.5) / (2 + 2 * 0.5), # "spam" (present)
1 - (2 + 0.5) / (2 + 2 * 0.5), # "ham" (not present)
1 - (1 + 0.5) / (2 + 2 * 0.5), # "rules" (not present)
(1 + 0.5) / (2 + 2 * 0.5), # "hello" (present)
]
p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))
# Should be about 0.83
assert model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)
def drop_final_s(word):
return re.sub("s$", "", word)
def main():
import glob, re
# modify the path to wherever you've put the files
path = 'spam_data/*/*'
data: List[Message] = []
# glob.glob returns every filename that matches the wildcarded path
for filename in glob.glob(path):
is_spam = "ham" not in filename
# There are some garbage characters in the emails, the errors='ignore'
# skips them instead of raising an exception.
with open(filename, errors='ignore') as email_file:
for line in email_file:
if line.startswith("Subject:"):
subject = line.lstrip("Subject: ")
data.append(Message(subject, is_spam))
break # done with this file
import random
from scratch.machine_learning import split_data
random.seed(0) # just so you get the same answers as me
train_messages, test_messages = split_data(data, 0.75)
model = NaiveBayesClassifier()
model.train(train_messages)
from collections import Counter
predictions = [(message, model.predict(message.text))
for message in test_messages]
# Assume that spam_probability > 0.5 corresponds to spam prediction
# and count the combinations of (actual is_spam, predicted is_spam)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
for message, spam_probability in predictions)
print(confusion_matrix)
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
# We probably shouldn't call private methods, but it's for a good cause.
prob_if_spam, prob_if_ham = model._probabilities(token)
return prob_if_spam / (prob_if_spam + prob_if_ham)
words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))
print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])
if __name__ == "__main__": main()