-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathchannels.py
142 lines (118 loc) · 4.87 KB
/
channels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import urllib
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from requests_html import HTMLSession
from common import logger
TELEGAGO_BASE_URL = 'https://cse.google.com/cse?q=+&cx=006368593537057042503:efxu7xprihg#gsc.tab=0&gsc.ref=more%3Apublic&gsc.q='
LYZEM_BASE_URL = 'https://lyzem.com/search?f=channels&l=%3Aen&per-page=100&q='
# extracts the html from a URL using the requests_html library (supports JS)
def extract_html(url, javascript_enabled=False):
session = HTMLSession()
response = session.get(url)
if javascript_enabled:
response.html.render()
source_html = response.html.html
return source_html
else:
return response.html.html
# method to parse the HTML from the Lyzem page
def parse_lyzem_page(html):
soup = BeautifulSoup(html, "lxml")
links = soup.find_all('li', attrs={'class', 'result'})
channels = []
for link in links:
try:
element_classes = link['class']
# if they have this element this means the result is an advertisement
# we dont want these
if 'ann' in element_classes:
continue
path_url = link['data-url']
channel_name = path_url.split('?')[0].split('/')[-1]
if channel_name not in channels:
channels.append(channel_name)
except KeyError:
continue
return channels
def search_channels_lyzem(query, limit=100):
initial_request_url = LYZEM_BASE_URL + urllib.parse.quote(query)
logger.debug("Lyzem request url {}".format(initial_request_url))
# extract channels from initial page
source_html = extract_html(initial_request_url, javascript_enabled=False)
page_channels = parse_lyzem_page(source_html)
all_channels = page_channels
# if reached limit return the channels
if len(all_channels) >= limit:
return all_channels[:limit]
# otherwise we need to go to next pages
# find the number of pages from the html
soup = BeautifulSoup(source_html, "lxml")
cursor_div = soup.find_all('nav', {'class': 'pages'})
try:
num_pages = len(cursor_div[0].find_all('li'))
except IndexError:
num_pages = 0
pass
# then iterate over all pages to extract all channels
for i in range(num_pages):
request_url = initial_request_url + '&p=' + str(i + 1)
logger.debug("Lyzem request url {}".format(request_url))
source_html = extract_html(request_url, javascript_enabled=False)
page_channels = parse_lyzem_page(source_html)
for channel in page_channels:
if channel not in all_channels:
all_channels.append(channel)
if len(all_channels) >= limit:
return all_channels[:limit]
return all_channels
# method to parse the HTML from the telegago page
def parse_telegago_page(html):
soup = BeautifulSoup(html, "lxml")
links = soup.find_all('a', attrs={'class', 'gs-title'})
channels = []
for link in links:
try:
path_url = urlparse(link['href']).path
if path_url.startswith('/s/'):
if path_url.count('/') == 2:
channel_name = path_url.split('/')[-1]
else:
channel_name = path_url.split('/')[-2]
else:
channel_name = path_url.split('/')[1]
if channel_name not in channels:
channels.append(channel_name)
except KeyError:
continue
return channels
def search_channels_telegago(query, limit=100):
initial_request_url = TELEGAGO_BASE_URL + urllib.parse.quote(query)
logger.debug("Telegago request url {}".format(initial_request_url))
# extract channels from initial page
source_html = extract_html(initial_request_url, javascript_enabled=True)
page_channels = parse_telegago_page(source_html)
all_channels = page_channels
# if reached limit return the channels
if len(all_channels) >= limit:
return all_channels[:limit]
# otherwise we need to go to next pages
# find the number of pages from the html
soup = BeautifulSoup(source_html, "lxml")
cursor_div = soup.find_all('div', {'class': 'gsc-cursor'})
try:
num_pages = len(cursor_div[0].find_all('div'))
except IndexError:
num_pages = 0
pass
# then iterate over all pages to extract all channels
for i in range(num_pages):
request_url = initial_request_url + '&gsc.page=' + str(i + 1)
logger.debug("Telegago request url {}".format(request_url))
source_html = extract_html(request_url, javascript_enabled=True)
page_channels = parse_telegago_page(source_html)
for channel in page_channels:
if channel not in all_channels:
all_channels.append(channel)
if len(all_channels) >= limit:
return all_channels[:limit]
return all_channels