-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathwechat_video.py
220 lines (195 loc) · 8.78 KB
/
wechat_video.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
from argparse import ArgumentParser
from math import log10
from os import chdir, path
from re import DOTALL, search, sub
from shutil import move
from time import sleep
from urllib import parse
from json5 import loads
from requests import RequestException, Session
INTERVAL = 5
RETRIES = 8
CHUNK_SIZE = 1024
HEADERS = {
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"DNT": "1",
"Sec-Ch-Ua": '"Not)A;Brand";v="99", "Microsoft Edge";v="127", "Chromium";v="127"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0",
}
VIDEO_HEADERS = {
"Host": "mpvideo.qpic.cn",
"Origin": "https://mp.weixin.qq.com",
"Referer": "https://mp.weixin.qq.com/",
"Range": "bytes=0-",
}
x = Session()
x.headers.update(HEADERS)
def format_article_url(article_url: str) -> str:
"""Clean and format the article URL."""
# 1. https://mp.weixin.qq.com/s?__biz=Mzg5ODU0MjM2NA==&mid=2247483677&idx=1&sn=e299cc8de66a97041cb0832c282f94d4&chksm=c061bf6ef7163678ca9a04f8a3dcdeffaaeb8abe55772fbbdb53fa1dd665c72df807acf8613f#rd
# => https://mp.weixin.qq.com/s?__biz=Mzg5ODU0MjM2NA==&mid=2247483677&idx=1&sn=e299cc8de66a97041cb0832c282f94d4
decoded = article_url.replace("&", "&")
# 2. Only preserve `__biz`, `mid`, `idx`, `sn` query parameters (not a necessary procedure)
url = parse.urlparse(decoded)
query = parse.parse_qs(url.query)
query = {k: v for k, v in query.items() if k in {"__biz", "mid", "idx", "sn"}}
query_str = parse.urlencode(query, doseq=True)
cleaned = f"{url.scheme}://{url.netloc}{url.path}?{query_str}"
# 3. Replace 'http://' with 'https://'
return cleaned.replace("http://", "https://")
def extract_video_info(html: str) -> list:
"""Extract the video information from the HTML."""
# Extract the __mpVideoTransInfo array string
match = search(
r"window\.__mpVideoTransInfo\s*=\s*(\[\s*\{.*?\},\s*\]);", html, DOTALL
)
if not match:
return []
json_str = match.group(1)
# Remove '* 1 || 0'
json_str = sub(r"\s*\*\s*1\s*\|\|\s*0", "", json_str)
# Only keep url in '(url).replace(/^http(s?):/, location.protocol)'
json_str = sub(
r"\(\s*(\'http[^\)]*)\)\.replace\(\s*/\^http\(s\?\):/, location\.protocol\s*\)",
r"\1",
json_str,
)
data = loads(json_str)
return data
def best_quality(data: list) -> dict:
"""Return the URL of the best quality video."""
# Consider first `video_quality_level`, then `filesize`
# Note that they're both strings, so we should convert them to integers
item = max(
data,
key=lambda x: (int(x["video_quality_level"] or 0), int(x["filesize"]) or 0),
)
return item
def download_video(video_url: str, filename: str):
"""Download the video given the video URL."""
print(f" 🔍 Downloading {filename}...", end="\r")
tmp_file_path = filename + ".tmp"
if not path.exists(filename) or path.exists(tmp_file_path):
try:
r = x.get(video_url, headers=VIDEO_HEADERS, stream=True)
r.raise_for_status() # Raise an exception if the response is not 200 OK
total_size = int(r.headers["Content-Length"])
# r.close() # Stop the connection to the server
if path.exists(tmp_file_path):
tmp_size = path.getsize(tmp_file_path)
print(f" Already downloaded {tmp_size} Bytes out of {total_size} Bytes ({100 * tmp_size / total_size:.2f}%)")
if tmp_size == total_size:
move(tmp_file_path, filename)
print(" ✅ Downloaded {filename} successfully.")
return True
elif tmp_size > total_size:
print(" ❌ The downloaded .tmp file is larger than the remote file. It is likely corrupted.")
return False
else:
tmp_size = 0
print(f" File is {total_size} Bytes, downloading...")
with open(tmp_file_path, "ab") as f:
retries = 0
while retries < RETRIES:
try:
res = x.get(video_url, headers={**VIDEO_HEADERS, "Range": f"bytes={tmp_size}-"}, stream=True)
for chunk in res.iter_content(chunk_size=CHUNK_SIZE):
tmp_size += len(chunk)
f.write(chunk)
f.flush()
done = int(50 * tmp_size / total_size)
print(f"\r [{'█' * done}{' ' * (50 - done)}] {100 * tmp_size / total_size:.0f}%", end="")
break
except RequestException as e:
retries += 1
print(f"\n ⚠️ Retrying... ({retries}/{RETRIES})")
sleep(INTERVAL)
else:
print(f"\n ❌ Failed to download {filename} after {RETRIES} retries.")
return False
if tmp_size == total_size:
move(tmp_file_path, filename)
print(f"\n ✅ Downloaded {filename} successfully.")
except RequestException as e:
# Log the error
print(e)
with open(filename + '_log.txt', 'a+', encoding = 'UTF-8') as f:
f.write('%s, %s\n' % (video_url, e))
print(f" ❌ Failed to download {filename}.")
else:
print(f" ✅ Downloaded {filename} successfully.")
def download_single(article_url: str, filename: str):
"""Download a single video from the given `article_url`."""
# `article_url` be like: https://mp.weixin.qq.com/s?__biz=Mzg5ODU0MjM2NA%3D%3D&mid=2247483677&idx=1&sn=e299cc8de66a97041cb0832c282f94d4#rd
assert article_url.startswith("https://mp.weixin.qq.com/s"), "Invalid article URL"
print(f"Extracting video from {article_url}...")
r = x.get(article_url)
if "环境异常" in r.text:
print("Aborted due to detected environment exception.")
return
data = extract_video_info(r.text)
if not data:
print("No video found.")
return
item = best_quality(data)
item["url"] = item["url"].replace("&", "&")
download_video(item["url"], f"{filename}.mp4")
def extract_album_info(html: str) -> list:
"""Extract the album information from the HTML."""
# Extract the __appmsgalbum array string
match = search(r"var\s+videoList\s+=\s*(\[\s*\{.*?\}\s*\]);", html, DOTALL)
if not match:
return []
json_str = match.group(1)
# Remove ' * 1'
json_str = sub(r"\s+\*\s+1", "", json_str)
data = loads(json_str)
return data
def extract_article_urls(album_info: list) -> list:
"""Extract the list of article URLs from the `album_info`."""
return [format_article_url(item["url"]) for item in album_info if item["url"]]
def fetch_article_urls(album_url: str) -> list:
"""Fetch the list of article urls from the `album_url`."""
# `album_url` be like: https://mp.weixin.qq.com/mp/appmsgalbum?action=getalbum&album_id=1640869658155073541#wechat_redirect
assert album_url.startswith(
"https://mp.weixin.qq.com/mp/appmsgalbum?"
), "Invalid album URL"
r = x.get(album_url)
album_info = extract_album_info(r.text)
article_urls = extract_article_urls(album_info)
return article_urls
def download_album(album_url: str):
"""Download all videos from the given `album_url`."""
print("Fetching article URLs...")
article_urls = fetch_article_urls(album_url)
print(f"Found {len(article_urls)} articles.")
i = 1
length = int(log10(len(article_urls))) + 1
for article_url in article_urls:
sleep(INTERVAL)
download_single(article_url, f"{i:0{length}}")
i += 1
if __name__ == "__main__":
parser = ArgumentParser()
parser.description = "Download videos from video posts by WeChat Official Accounts."
parser.add_argument("url", help="URL of the post or album")
parser.add_argument("--output-dir", "-O", help="Output directory", default=".")
parser.add_argument("--output", "-o", help="Output file name (without extension; in the case of a single post)", default="video")
args = parser.parse_args()
url = args.url
chdir(args.output_dir)
if url.startswith("https://mp.weixin.qq.com/mp/appmsgalbum?"):
download_album(url)
elif url.startswith("https://mp.weixin.qq.com/s"):
download_single(url, args.output)
elif url:
print("Invalid URL")
else:
print("No URL provided")