forked from diaoyb/qiandao
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathunicom.py
269 lines (249 loc) · 14.9 KB
/
unicom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#!/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : github@limoruirui https://github.com/limoruirui
# @Time : 2022/8/10 13:23
# -------------------------------
# cron "30 9 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('某通阅读');
"""
联通app抽奖 入口:联通app 搜索 阅读专区 进入话费派送中
1. 脚本仅供学习交流使用, 请在下载后24h内删除
2. 需要第三方库 pycryptodome 支持 命令行安装 pip3 install pycryptodome或者根据自己环境自行安装
3. 环境变量说明 PHONE_NUM(必需) UNICOM_LOTTER(选填) UNICOM_USERAGENT(选填) 自行新建环境变量添加
PHONE_NUM 为你的手机号
UNICOM_LOTTER 默认自动抽奖, 若不需要 则添加环境变量值为 False
UNICOM_USERAGENT 联通app的useragent 随便一个数据包的请求头里应该都有 建议自己抓一个填上 不填也能跑 数据内容参考 line 44 双引号的内容
推送通知的变量同青龙 只写了tgbot(支持反代api)和pushplus
"""
"""updateTime: 2023.1.1 log: 更新aes加密key
updateTime: 2022.12.1 log: 活动重新上架 改用 pycryptodome 替代 cryptography 进行aes加密
updateTime: 2022.9.1 log: 每个月活动id改变更新
"""
from requests import post, get
from time import sleep, time
from datetime import datetime
from hashlib import md5 as md5Encode
from random import randint, uniform, choice
from os import environ
from sys import stdout, exit
from base64 import b64encode
from json import dumps
from tools.encrypt_symmetric import Crypt
import tools.notify as notify
from tools.tool import get_environ, random_sleep
# random_sleep(0, 1600)
"""主类"""
class China_Unicom:
def __init__(self, phone_num):
self.phone_num = phone_num
default_ua = f"Mozilla/5.0 (Linux; Android {randint(8, 13)}; SM-S908U Build/TP1A.220810.014; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/{randint(95, 108)}.0.5359.128 Mobile Safari/537.36; unicom{{version:android@9.0{randint(0,6)}00,desmobile:{self.phone_num}}};devicetype{{deviceBrand:,deviceModel:}};{{yw_code:}}"
run_ua = get_environ(key="UNICOM_USERAGENT", default=default_ua, output=False)
self.headers = {
"Host": "10010.woread.com.cn",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Content-Type": "application/json;charset=utf-8",
"Origin": "https://10010.woread.com.cn",
"User-Agent": run_ua,
"Connection": "keep-alive",
"Referer": "https://10010.woread.com.cn/ng_woread/",
}
self.fail_num = 0
def timestamp(self):
return round(time() * 1000)
def print_now(self, content):
print(content)
stdout.flush()
def md5(self, str):
m = md5Encode(str.encode(encoding='utf-8'))
return m.hexdigest()
def req(self, url, crypt_text, retry_num=5):
while retry_num > 0:
body = {
"sign": b64encode(Crypt(crypt_type="AES", key="update!@#1234567", iv="16-Bytes--String", mode="CBC").encrypt(crypt_text).encode()).decode()
}
self.headers["Content-Length"] = str(len(dumps(body).replace(" ", "")))
try:
res = post(url, headers=self.headers, json=body)
data = res.json()
return data
except Exception as e:
print(f"本次请求失败, 正在重新发送请求 剩余次数{retry_num}")
print(f"本次请求失败原因------{e}")
retry_num -= 1
sleep(5)
return self.req(url, crypt_text, retry_num)
def referer_login(self):
date = datetime.today().__format__("%Y%m%d%H%M%S")
timestamp = self.timestamp()
url = f"https://10010.woread.com.cn/ng_woread_service/rest/app/auth/10000002/{timestamp}/{self.md5(f'100000027k1HcDL8RKvc{timestamp}')}"
crypt_text = f'{{"timestamp":"{date}"}}'
body = {
"sign": b64encode(Crypt(crypt_type="AES", key="1234567890abcdef").encrypt(crypt_text).encode()).decode()
}
self.headers["Content-Length"] = str(len(str(body)) - 1)
data = post(url, headers=self.headers, json=body).json()
if data["code"] == "0000":
self.headers["accesstoken"] = data["data"]["accesstoken"]
else:
self.print_now(f"设备登录失败,日志为{data}")
exit(0)
def get_userinfo(self):
date = datetime.today().__format__("%Y%m%d%H%M%S")
url = "https://10010.woread.com.cn/ng_woread_service/rest/account/login"
crypt_text = f'{{"phone":"{self.phone_num}","timestamp":"{date}"}}'
data = self.req(url, crypt_text)
if data["code"] == "0000":
self.userinfo = data["data"]
else:
self.print_now(f"手机号登录失败, 日志为{data}")
exit(0)
def watch_video(self):
self.print_now("看广告获取积分任务: ")
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/obtainScoreByAd"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"value":"947728124","timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
for i in range(3):
data = self.req(url, crypt_text)
self.print_now(data)
if self.fail_num == 3:
self.print_now("当前任务出现异常 且错误次数达到3次 请手动检查")
notify.send("某通阅读", "当前任务出现异常 且错误次数达到3次 请手动检查")
exit(0)
if data["code"] == "9999":
self.print_now("当前任务出现异常 正在重新执行")
self.fail_num += 1
self.main()
return False
sleep(uniform(2, 8))
return True
def read_novel(self):
self.get_cardid()
self.get_cntindex()
self.get_chapterallindex()
self.print_now(f"{self.phone_num} :正在执行观看300次小说, 此过程较久, 最大时长为300 * 8s = 40min")
for i in range(300):
date = datetime.today().__format__("%Y%m%d%H%M%S")
chapterAllIndex = choice(self.chapterallindex_list)
url = f"https://10010.woread.com.cn/ng_woread_service/rest/cnt/wordsDetail?catid={self.catid}&pageIndex={self.pageIndex}&cardid={randint(10000, 99999)}&cntindex={self.cntindex}&chapterallindex={chapterAllIndex}&chapterseno=3"
crypt_text = f'{{"chapterAllIndex":{chapterAllIndex},"cntIndex":{self.cntindex},"cntTypeFlag":"1","timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
if self.fail_num == 3:
self.print_now("当前任务出现异常 且错误次数达到3次 请手动检查")
notify.send("某通阅读", "阅读任务出现异常 且错误次数达到3次 请手动检查")
exit(0)
if data.get("code") != "0000":
self.print_now("阅读小说发生异常, 正在重新登录执行, 接口返回")
self.print_now(data)
return self.main()
sleep(uniform(2, 4))
def query_score(self):
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/queryUserScore"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"activeIndex":{self.activeIndex},"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
total_score = data["data"]["validScore"]
self.lotter_num = int(total_score / 50)
self.print_now(f"{self.phone_num} 当前有积分{total_score}, 可以抽奖{self.lotter_num}次")
def get_activetion_id(self):
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/queryActiveInfo"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
if data["code"] == "0000":
self.activeIndex = data["data"]["activeindex"]
else:
self.print_now(f"活动id获取失败 将影响抽奖和查询积分")
def get_cardid(self):
"""
获取cardid
:return:
"""
url = "https://10010.woread.com.cn/ng_woread_service/rest/basics/getIntellectRecommend"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"cntsize":8,"recommendsize":5,"recommendid":0,"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
# print(data)
self.pageIndex = data["data"]["recommendindex"] if "recommendindex" in data["data"] else "10725"
self.cardid = data["data"]["catindex"] if "catindex" in data["data"] else "119056"
def get_cntindex(self):
url = "https://10010.woread.com.cn/ng_woread_service/rest/basics/recommposdetail/12279"
self.headers.pop("Content-Length", "no")
data = get(url, headers=self.headers).json()
booklist = data["data"]["booklist"]["message"]
book_num = len(booklist)
self.catid = booklist[0]["catindex"] if "catindex" in booklist[0] else "119411"
self.cntindex = booklist[randint(0, book_num - 1)]["cntindex"]
def get_chapterallindex(self):
url = f"https://10010.woread.com.cn/ng_woread_service/rest/cnt/chalist?catid=119411&pageIndex=10725&cardid=12279&cntindex={self.cntindex}"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"curPage":1,"limit":30,"index":"{self.cntindex}","sort":0,"finishFlag":1,"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
chapterallindexlist = data["list"][0]["charptercontent"]
chapterallindex_num = len(chapterallindexlist)
self.chapterallindex_list = [0] * chapterallindex_num
i = 0
while i < chapterallindex_num:
self.chapterallindex_list[i] = chapterallindexlist[i]["chapterallindex"]
i += 1
def lotter(self):
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/yearEnd/handleDrawLottery"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"activeIndex":{self.activeIndex},"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
if data["code"] == "0000":
self.print_now(f"抽奖成功, 获得{data['data']['prizename']}")
else:
self.print_now(f"抽奖失败, 日志为{data}")
def watch_ad(self):
self.print_now("观看广告得话费红包: ")
url = "https://10010.woread.com.cn/ng_woread_service/rest/activity/userTakeActive"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"activeIndex":6880,"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
self.print_now(data)
def exchange(self):
# ticketValue activeid来自于https://10010.woread.com.cn/ng_woread_service/rest/phone/vouchers/getSysConfig get请求
# {"ticketValue":"300","activeid":"61yd210901","timestamp":"20220816213709","token":"","userId":"","userIndex":,"userAccount":"","verifyCode":""}
url = "https://10010.woread.com.cn/ng_woread_service/rest/phone/vouchers/exchange"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"ticketValue":"300","activeid":"61yd210901","timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
print(data)
def query_red(self):
url = "https://10010.woread.com.cn/ng_woread_service/rest/phone/vouchers/queryTicketAccount"
date = datetime.today().__format__("%Y%m%d%H%M%S")
crypt_text = f'{{"timestamp":"{date}","token":"{self.userinfo["token"]}","userId":"{self.userinfo["userid"]}","userIndex":{self.userinfo["userindex"]},"userAccount":"{self.userinfo["phone"]}","verifyCode":"{self.userinfo["verifycode"]}"}}'
data = self.req(url, crypt_text)
if data["code"] == "0000":
can_use_red = data["data"]["usableNum"] / 100
if can_use_red >= 3:
self.print_now(f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 可以去兑换了")
notify.send("某通阅读", f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 可以去兑换了")
else:
self.print_now(f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 不足兑换的最低额度")
notify.send("某通阅读", f"查询成功 {self.phone_num} 当前有话费红包{can_use_red} 不足兑换的最低额度")
def main(self):
self.referer_login()
self.get_userinfo()
if not self.watch_video():
return
self.get_activetion_id()
self.read_novel()
self.query_score()
self.watch_ad()
if unicom_lotter:
for i in range(self.lotter_num):
self.lotter()
sleep(2)
self.query_score()
self.query_red()
if __name__ == "__main__":
"""读取环境变量"""
phone_num = get_environ("PHONE_NUM")
unicom_lotter = get_environ("UNICOM_LOTTER", default=True)
if phone_num == "":
exit(0)
China_Unicom(phone_num).main()