-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsanitizer_utils.py
188 lines (159 loc) · 4.97 KB
/
sanitizer_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
break out sanitizer functions to here so that backmerging changes is less annoying between branches
"""
from typing import Tuple
from urllib.parse import urlparse
import requests
from urlextract import URLExtract
import consts
import util
# si (source identifier) is a tracking param but people kept whining
ALLOWED_PARAMS = [
"t",
"variant",
"sku",
"defaultSelectionIds",
"q",
"v",
"id",
"tk",
"topic",
"quality",
"size",
"width",
"height",
"feature",
"p",
"l",
"board",
"c",
"route",
"product",
"path",
"product_id",
"idx",
"list",
"page",
"sort",
"iframe_url_utf8",
"si",
"gcode",
"url",
"h",
"w",
"hash",
"m",
"dl",
"th",
"language",
"k",
"m",
"s",
"key",
]
DOMAINS_TO_FIX = {
# 'www.tiktok.com': 'proxitok.pussthecat.org',
# "www.tiktok.com": "vxtiktok.com",
"twitter.com": "fxtwitter.com",
"x.com": "fixupx.com",
"instagram.com": "ddinstagram.com",
"www.instagram.com": "ddinstagram.com",
}
WHITELISTED_DOMAINS = [
"youtube.com",
"www.youtube.com",
"youtu.be",
"open.spotify.com",
*DOMAINS_TO_FIX.values(),
]
DOMAINS_TO_REDIRECT = ["a.aliexpress.com", "a.co"] # "vm.tiktok.com",
REDIRECT_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"Dnt": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/117.0",
}
def handle_redirect(url: str) -> str:
"""redirect URLs that are hiding trackers in them"""
if urlparse(url).netloc in DOMAINS_TO_REDIRECT:
try:
req = requests.get(url, headers=REDIRECT_HEADERS, timeout=10)
if req.status_code == consts.HTTP_OK and not req.url.endswith("errors/500"):
return req.url
except Exception: # pylint: disable=broad-except
pass
return url
def proxy_if_necessary(url: str) -> Tuple[str, bool]:
"""
mostly fix embeds for discord
:return the sanitized url, bool implying whether or not to keep embed
"""
netloc = urlparse(url).netloc
if netloc in DOMAINS_TO_FIX.keys(): # pylint: disable=consider-iterating-dictionary
url = url.replace(netloc, DOMAINS_TO_FIX[netloc], 1)
return url, True
return url, False
def proxy_url(url: str) -> Tuple[str, bool]:
"""
just proxy a URL on demand
:return: sanitized url, bool implying whether or not to keep embed
"""
sanitized_url = handle_redirect(url)
sanitized_url, keep_embed = proxy_if_necessary(sanitized_url)
return sanitized_url if sanitized_url != url else url, keep_embed
def sanitize_message(message_content: str) -> Tuple[str, bool, bool]:
"""
:return: Response content, needs sanitizing bool, warning suffix bool
"""
needs_sanitizing = False
post_warning = False
sanitized_msg_word_list = []
for url in URLExtract().gen_urls(message_content):
if urlparse(url).netloc in WHITELISTED_DOMAINS:
continue
sanitized_url, keep_embed = proxy_url(url)
sanitized_url = sanitize_url(sanitized_url)
if sanitized_url != url:
# this was proxied, check for liveness
if keep_embed:
try:
req = requests.get(sanitized_url, timeout=10)
except requests.exceptions.ReadTimeout:
continue # he's dead jim
if "mp4" in req.text:
sanitized_msg_word_list.append(sanitized_url)
needs_sanitizing = True
post_warning = False
continue
else:
needs_sanitizing, post_warning = True, True
sanitized_msg_word_list.append(f"<{sanitized_url}>")
return "\n".join(sanitized_msg_word_list), needs_sanitizing, post_warning
def sanitize_url(url: str) -> str:
"""remove unnecessary url parameters from a url"""
new_word = url.split("?")[0]
# do not sanitize image embeds
if util.is_image(new_word):
return url
url_params = []
if len(url.split("?")) > 1:
url_params = url.split("?")[1].split("&")
if "amazon." in new_word:
new_word = new_word.split("ref=")[0]
url_params[:] = [param for param in url_params if valid_param(param)]
if len(url_params) > 0:
new_word = new_word + "?" + "&".join(url_params)
return url if url.endswith("?") else new_word
def valid_param(param: str) -> bool:
"""checks url query parameter against hard list of valid ones"""
for allowed_param in ALLOWED_PARAMS:
if param.startswith(f"{allowed_param}="):
return True
return False