-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathBigram.py
286 lines (251 loc) · 9.62 KB
/
Bigram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
from oneGram import Dictionary, DictionarySmooth
from process import calcuBigramWordDistri, getPunciton
from collections import defaultdict
from math import log10
class BigramDictionary(Dictionary):
"""Bigram Dictionary with no smooth
Args:
Dictionary:class Dictionary , the dict of words
"""
def __init__(self):
# 继承Dictionary子类
Dictionary.__init__(self)
self.Bigram = calcuBigramWordDistri()
def getCount(self, front_word, word):
return self.Bigram[front_word][word]
def getPValue(self, front_word, word):
# 查询二元概率
# 回退算法
elpha = 0.2
if not self.inGram(front_word, word):
return elpha * Dictionary.getPValue(self, front_word) * Dictionary.getPValue(self, word)
return self.getCount(front_word, word) / sum(self.Bigram[front_word].values())
def inGram(self, front_word, word):
if front_word in self.Bigram and word in self.Bigram[front_word]:
return True
return False
class BigramDictionarySmooth(DictionarySmooth):
"""Bigram Dictionary with witten-bell smooth theory
Args:
Dictionary:class Dictionary , the dict of words
"""
def __init__(self):
# 继承Dictionary子类
DictionarySmooth.__init__(self)
self.Bigram = calcuBigramWordDistri()
self.unknowWordsSetZero()
self.alpha = self.wittenBellSmoothing()
def getPValue(self, front_word, word):
'''
查询二元概率
:param front_word: 前向词
:param word: 当前词
:return: P(Wi|Wi-1)
'''
if not self.inGram(front_word, word):
return self.alpha
return self.Bigram[front_word][word]
def inGram(self, front_word, word):
'''
P(Wi|Wi-1)是否存在
:param front_word: 前向词
:param word: 后向词
:return:True or False
'''
if front_word in self.Bigram and word in self.Bigram[front_word]:
return True
return False
def unknowWordsSetZero(self, file_name='test.txt'):
'''
统计未登录词,放入word_dict中,value = 0
:param file_name: 测试文件路径
:return:
'''
test_word_dict = calcuBigramWordDistri(file_name)
unknow_word_cnt = 0
for front_word in test_word_dict:
for word in test_word_dict[front_word]:
if not self.inGram(front_word, word):
self.Bigram[front_word][word] = 0.
unknow_word_cnt += 1
print('unknow words count number: {0}, set unknow word value = {1}'.format(
unknow_word_cnt, 0.0))
def wittenBellSmoothing(self):
'''
witten-Bell平滑
Pi =
T/{(N+T)Z} if Ci == 0
Ci/(N+T) if Ci != 0
:return:
'''
# 未登录词数,词量计数,单词数
Z, N, T = 0, 0, 0
for front_word in self.Bigram.keys():
for word in self.Bigram[front_word]:
if self.Bigram[front_word][word] == 0.:
Z += 1
else:
N += self.Bigram[front_word][word]
T += 1
# 平滑处理
for front_word in self.Bigram.keys():
for word in self.Bigram[front_word]:
if self.Bigram[front_word][word] == 0.:
self.Bigram[front_word][word] = T / ((N + T) * Z)
else:
self.Bigram[front_word][word] = self.Bigram[
front_word][word] / (N + T)
T += 1
print(
'successfully witten-Bell smoothing! smooth_value:{0}'.format(T / ((N + T) * Z)))
return (T / ((N + T) * Z))**4
class Bigram(BigramDictionarySmooth):
'''
Bigram 类方法
'''
def __init__(self):
BigramDictionarySmooth.__init__(self)
self.DICT = BigramDictionarySmooth
self.PUNC = getPunciton()
def _forwardSplitSentence(self, sentence, word_max_len=5):
'''
前向切分
:param sentence:待切分文本
:param word_max_len:最大词长
:return:切分方案列表
'''
# 所有可能的切分方案
split_groups = []
# 去除空格
sentence = sentence.strip()
sentence_len = len(sentence)
if sentence_len < 2:
return [[sentence]]
range_max = [sentence_len,
word_max_len][sentence_len > word_max_len]
# 保存当前二切分结果
current_groups = []
single_cut = True
for i in range(1, range_max)[::-1]:
# 词典存在子词串,二分切分
if self.DICT.inDict(self, sentence[:i]) and i != 1:
current_groups.append([sentence[:i], sentence[i:]])
single_cut = False
if single_cut or self.DICT.inDict(self, sentence[1:3]):
current_groups.append([sentence[:1], sentence[1:]])
# 词长为2时,为未登录词的概率较大,保留“为词”的可能性
if sentence_len <= 3:
current_groups.append([sentence])
# 对每一个切分,递归组合
# print(current_groups)
for one_group in current_groups:
if len(one_group) == 1:
split_groups.append(one_group)
continue
for child_group in self._forwardSplitSentence(one_group[1]):
child_group.insert(0, one_group[0])
# print(child_group)
split_groups.append(child_group)
# print(split_groups)
return split_groups
def backwardSplitSentence(self, sentence):
# Todo
'''
后向切分
:param sentence:待切分文本
:return:切分方案列表
'''
pass
def queryBigram(self, split_words_group):
'''
查询各个gram的概率
:param split_words_group:gram列表
:return:gram对应概率值key-value
'''
gram_p = {}
gram_p[u'<start>'] = defaultdict(float)
for split_words in split_words_group[::-1]:
try:
for i in range(len(split_words)):
if i == 0:
gram_p[u'<start>'][split_words[0]] = self.DICT.getPValue(
self, u'<start>', split_words[0])
continue
front_word = split_words[i - 1]
word = split_words[i]
if front_word not in gram_p:
gram_p[front_word] = defaultdict(float)
gram_p[front_word][word] = self.DICT.getPValue(self,
front_word, word)
except ValueError:
print('Failed to query Bigram.')
return gram_p
def _maxP(self, sentence, word_max_len=5):
'''
计算最大概率的切分组合
:param sentence: 待切分句子
:param word_max_len: 最大词长
:return:最优切分
'''
# 获取切分组合
split_words_group = self._forwardSplitSentence(
sentence, word_max_len=word_max_len)
max_p = -99999
best_split = []
# 存放已经计算过概率的子序列
value_dict = {}
value_dict[u'<start>'] = defaultdict(float)
for split_words in split_words_group[::-1]:
words_p = 0
try:
for i in range(len(split_words)):
word = split_words[i]
if i == 0:
if word not in value_dict[u'<start>']:
value_dict[u'<start>'][word] = log10(
self.DICT.getPValue(self, u'<start>', word))
words_p += value_dict[u'<start>'][word]
# print("<start> {0}: {1}".format(word,value_dict[u'<start>'][word]))
continue
front_word = split_words[i - 1]
if front_word not in value_dict:
value_dict[front_word] = defaultdict(float)
if word not in value_dict[front_word]:
value_dict[front_word][word] = log10(
self.DICT.getPValue(self, front_word, word))
words_p += value_dict[front_word][word]
# print("{0} {1}: {2}".format(front_word,word,value_dict[front_word][word]))
if words_p < max_p:
break
except ValueError:
print("Failed to calculate maxP.")
if words_p > max_p:
max_p = words_p
best_split = split_words
return best_split
def segment(self, sentence):
'''
分词调用入口
:param sentence:待切分句子
:return:切分词序列
'''
words = []
sentences = []
# 若含有标点,以标点分割
start = -1
for i in range(len(sentence)):
if sentence[i] in self.PUNC:
sentences.append(sentence[start + 1:i])
sentences.append(sentence[i])
start = i
if not sentences:
sentences.append(sentence)
for sent in sentences:
# print(sent)
words.extend(self._maxP(sent))
return words
if __name__ == '__main__':
bigram = Bigram()
s = '党的十五大依据邓小平理论和党的基本路线提出的党在社会主义初级阶段经济'
# s = u'不能结婚的和尚啊'
print(bigram.segment(s))