-
Notifications
You must be signed in to change notification settings - Fork 1
/
pyWebQuery.py
383 lines (345 loc) · 12.5 KB
/
pyWebQuery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/python
# -*- coding: utf-8 -*-
#version 0.1
from BeautifulSoup import BeautifulSoup
import urllib2,urllib,cookielib,urlparse
import os,sys
import re
cookieJar = cookielib.LWPCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookieJar))
urllib2.install_opener(opener)
defaultHeader = {'User-agent' : 'Mozilla/10.0 (compatible; MSIE 5.5; Win7)'}
def setHeader(header):
global defaultHeader
defaultHeader = header
verbose = True
def _i(string):
"""print string in verbose mode
Arguments:
- `string`:
"""
if verbose:
print string
def hasWords(string,words):
for item in words:
if not (item in string):
return False
else:
return True
def hasOneWords(string,words):
for item in words:
if item in string:
return True
return False
#WARNINGTAG: don't forget BadMoveException
class Page(object):
"""Page is used for holding page content to further obtain DOM Element.
"""
def _fetchHTML(self,url,referer="http://www.google.com/"):
"""fetch HTML in UTF-8 encoding
Arguments:
- `url`:
- `refer`:default referer is google.com
"""
global defaultHeader
headers = defaultHeader.copy()
if referer!=None:
headers['Referer'] = referer
req = urllib2.Request(url,headers = headers)
print headers
return urllib2.urlopen(req).read()
def _postAndFetchHTML(self,url,postData,referer="http://www.google.com"):
"""post data and fetch returnning html
Arguments:
- `self`:
- `url`:
- `referer`:
"""
global defaultHeader
headers = defaultHeader.copy()
if referer!=None:
headers["Referer"] = referer
req = urllib2.Request(url,urllib.urlencode(postData),headers)
return urllib2.urlopen(req).read()
def __init__(self, URL=None,HTML=None,prevPage=None,nextPage=None):
"""
Arguments:
- `URL`:current URL
- `HTML`:initHTML
- `prevPage`:this page comes from previous PageObject
- `nextPage`:this page comeback from next PageObject
"""
self._URL = URL
self._HTML = HTML
self._prevPage = prevPage
self._nextPage = nextPage
self._postData = None
if prevPage!= None:
self._referer = prevPage._URL
else:
self._referer = None
#has URL and no HTML means create by goto or from new URL,so here we fetch the HTML
if self._HTML == None and self._URL!=None:
self._HTML = self._fetchHTML(self._URL,self._referer)
def goto(self,URL):
"""goto some Page from this Page
Arguments:
- `self`:
- `URL`:where to go
"""
URL = urlparse.urljoin(self._URL,URL)
self._nextPage = Page(URL=URL,prevPage = self)
return self._nextPage
def post(self,URL,postData):
"""post parameters from this Page return a new Page Object hold the returning HTML
Arguments:
- `self`:
- `URL`:
- `parameters`:POSTDATA
- `referer`:
"""
URL = urlparse.urljoin(self._URL,URL)
newPage = Page(URL = URL,prevPage=self,
HTML = self._postAndFetchHTML(URL,postData=postData,referer=self._referer))
newPage._postData = postData
return newPage
def forward(self):
"""goto where it backs from,throw BadMoveException no futher forward
Arguments:
- `self`:
"""
return self._nextPage
def backward(self):
"""goto where it comes from,throw BadMoveException is nofurther backward
Arguments:
- `self`:
"""
return self._prevPage
def refresh(self):
"""refresh this page
Arguments:
- `self`:
"""
if self._postData == None:
self._HTML = self._fetchHTML(self._URL)
else:
self._HTML = self._postAndFetchHTML(self._URL,self._postData,self._referer)
def find(self,selectors):
"""find the css specified DOM sets
Arguments:
- `self`:
- `selector`:basic css selector support
"""
return Query(self,selectors)
def text(self):
return "".join(BeautifulSoup(self._HTML).findAll(text=True))
class PageSet(list):
"""a collection of Pages from
"""
def __init__(self):
pass
def find(self,selectors):
querySet = QuerySet()
for page in self:
querySet.append(page.find(selectors))
return querySet
def text(self):
result = []
for item in self:
result.extend(item.text())
return result
class Query(object):
"""Action to filter dom or and do some action indicate by dom or page
"""
def _filterDOM(self,HTML,selectors):
soup = BeautifulSoup(HTML)
return self._filterSoup(soup,selectors)
def _filterSoup(self,soup,selectors):
token="#. :"
filters = []
i=0
j=0
for i in range(len(selectors)):
if selectors[i] in token:
continue
elif i > 0 and selectors[i-1] in token:
selectorType=selectors[i-1]
j = i
if len(selectors)==i+1 or selectors[i+1] in token:
filters.append({"selectorType":selectorType,
"selectorValue":selectors[j:i+1]})
elif len(selectors)==i+1 or selectors[i+1] in token:
if j-1 >= 0:
selectorType = selectors[j-1]
else:
selectorType = " "
filters.append({"selectorType":selectorType,
"selectorValue":selectors[j:i+1]})
else:
continue
typeMap = {"#":"id",
".":"class",
}
for item in filters:
if item["selectorType"] == " ":
soupResult = soup.findAll(item["selectorValue"])
elif item["selectorType"] == ":":
content = item["selectorValue"]
goodSelectorType = content[0:content.index("[")]
goodSelectorValue = content[content.index("[")+1:-1]
soupResult = self._advancedFilter(soup,goodSelectorType,goodSelectorValue)
else:
t = item["selectorType"]
v = item["selectorValue"]
soupResult = soup.findAll(lambda tag:tag.has_key(typeMap[t]) and v in tag[typeMap[t]].split(" "))
soup = BeautifulSoup(str(soupResult))
return soupResult
def _advancedFilter(self,soup,selectorType,selectorValue):
keywords = selectorValue.split("&")
if selectorType == "text":
return soup.findAll(lambda tag:hasWords("".join(tag.findAll(text=True)),keywords))
else:
return soup.findAll(lambda tag:tag.has_key(selectorType) and hasWords(tag[selectorType],keywords))
def _download(self,url,path=None,referer="www.google.com"):
if path!=None:
para = "-O \"%s\"" % path
else:
para = ""
cmd = """wget "%s" -nc %s --header="Referer:%s" """ %(url,para,referer)
os.system(cmd)
def __init__(self,Page,selectors,soup=None):
"""
Arguments:
- `Page`: Query from which page
"""
self._Page = Page
if soup == None:
#come from Page
self._soupResult = self._filterDOM(self._Page._HTML,selectors)
else:
#come from Query,may because of using find two times
self._soupResult = self._filterSoup(soup,selectors)
def __len__(self):
return len(self._soupResult)
def __getitem__(self,i):
return self._soupResult[i]
def __iter__(self):
return self._soupResult.__iter__()
def find(self,selectors):
return Query(self._Page,selectors,BeautifulSoup(str(self._soupResult)))
def follow(self,keyword="*"):
"""follow the a tag with href
Arguments:
- `self`:
- `keyword`:
"""
soupResult = BeautifulSoup(str(self._soupResult)).findAll(lambda tag:tag.has_key("href") and (keyword=="*" or keyword in tag["href"]))
baseURL = self._Page._URL
pageSet = PageSet()
for item in soupResult:
targetURL = urlparse.urljoin(baseURL,item["href"])
_i("follow page %s" % targetURL)
pageSet.append(self._Page.goto(targetURL))
return pageSet
def download(self,keyword="*",fileNameGenerator = None,directAddress = None):
""" download the tag with a href attribution using wget"""
if directAddress!=None:
try:
self._download(directAddress,referer=self._Page._URL)
except Exception,e:
return 0
return 1
soupResult = BeautifulSoup(str(self._soupResult)).findAll(lambda tag:tag.has_key("href") and (keyword=="*" or keyword in tag["href"]))
baseURL = self._Page._URL
pageSet = PageSet()
counter = 0
for item in soupResult:
targetURL = urlparse.urljoin(baseURL,item["href"])
if fileNameGenerator!=None:
name = fileNameGenerator.next()
else:
name = None
self._download(targetURL,path=name,referer=self._Page._URL)
counter+=1
return counter
def attr(self,attrName):
"""get all the specified attribution value"""
results = BeautifulSoup(str(self._soupResult)).findAll(lambda tag:tag.has_key(attrName))
return [item[attrName] for item in results]
def text(self):
result = []
for item in self._soupResult:
result.append("".join(BeautifulSoup(str(item)).findAll(text=True)))
return result
def downloadSrc(self,keyword="*",fileNameGenerator = None):
soupResult = BeautifulSoup(str(self._soupResult)).findAll(lambda tag:tag.has_key("src") and keyword=="*" or keyword in tag["src"])
baseURL = self._Page._URL
pageSet = PageSet()
counter = 0
for item in soupResult:
targetURL = urlparse.urljoin(baseURL,item["src"])
if fileNameGenerator!=None:
name = fileNameGenerator.next()
else:
name = None
self._download(targetURL,path=name,referer=self._Page._URL)
counter+=1
return counter
class QuerySet(list):
"""a collection of QuerySet
"""
def __init__(self):
pass
def __len__(self):
counter = 0
for item in self:
counter += len(item)
return counter
def __getitem__(self,index):
counter = 0
for item in self:
if index >=counter and index < len(item)+counter:
return item[index-counter]
counter+=len(item)
else:
raise IndexError
def attr(self,attrName):
"get all the attrName specified attribution value"
result = []
for item in self:
result.extend(item.attr(attrName))
return result
def text(self):
"get all text in the match the tag"
result = []
for item in self:
result.extend(item.text())
return result
def find(self,selectors):
"""find match DOM in the current querys
Arguments:
- `self`:
- `selectors`:
"""
querySet = QuerySet()
for item in self:
querySet.append(item.find(selectors))
return querySet
def follow(self,keyword="*"):
"""follow all the link match the keyword or use * if you want just follow all links of the matched tag"""
pageSet = PageSet()
for item in self:
pageSet.extend(item.follow(keyword))
return pageSet
def download(self,keyword="*",fileNameGenerator = None):
"""using wget to download matched tag hrefmuse git fileNameGenerator a generator to name every element"""
counter = 0
for item in self:
counter += item.download(keyword,fileNameGenerator)
return counter
def downloadSrc(self,keyword="*",fileNameGenerator = None):
""" download images """
counter = 0
for item in self:
counter += item.downloadSrc(keyword,fileNameGenerator)
return counter