-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathticket.py
395 lines (373 loc) · 15.8 KB
/
ticket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
import requests
import time
import random
import yaml
from config import global_config
from logger import logger
from timer import Timer
from datetime import datetime
import os
from collections import defaultdict
import qrcode
from utils import get_random_user_agent, send_bark
HOST_CONFIG = {
"dev": {
"base_api": "https://admin-api-dev.pyiffestival.com/app/api/v1",
"url": "https://www-dev.pyiffestival.com",
},
"prod": {
"base_api": "https://api.pyiffestival.com/app/api/v1",
"url": "https://www.pyiffestival.com",
},
}
MAX_RETRY = 5
AVAIlABLE_SEAT_TYPE_ID = "09ffe644-40cd-42fe-889c-46e0b940f8bc"
class TicketHelper(object):
def __init__(self):
self.config = global_config
self.activity_id = self.config["activity_id"]
self.session = requests.Session()
env = self.config["env"]
Headers = {
"User-Agent": get_random_user_agent(),
"Origin": HOST_CONFIG[env]["url"],
"Referer": HOST_CONFIG[env]["url"],
}
self.api_url = HOST_CONFIG[env]["base_api"]
self.session.headers.update(Headers)
self.timer = Timer()
def start(self):
if "token" not in self.config or not self.config["token"]:
send_bark("请先登录", "请先登录并配置token")
else:
self.session.headers.update(
{"Authorization": f'Bearer {self.config["token"]}'}
)
self.buy_ticket()
def wait_some_time(self):
time.sleep(random.randint(3200, 7400) / 1000)
def validate_user(self, activity_id):
res = self.session.get(
url=self.api_url + "/TenantUser",
params={"activityId": activity_id},
)
self.handle_response("验证用户", res)
def get_categories(self):
if os.path.exists("categories.yaml"):
return yaml.load(open("categories.yaml", "r"), Loader=yaml.FullLoader)
res = self.session.get(
url=self.api_url + f"/Activity/{self.activity_id}/ActivityFilmCategories",
)
result = self.handle_response("获取分类", res)
if not result:
raise LookupError("获取分类失败")
else:
categories = []
for item in result:
category = {
"id": item["id"],
"name": item["categoryName"],
"nameEn": item["categoryNameEN"],
"activityId": item["activityId"],
"children": [],
}
for subCat in item["children"]:
category["children"].append(
{
"id": subCat["id"],
"name": subCat["categoryName"],
"nameEn": subCat["categoryNameEN"],
"activityId": subCat["activityId"],
"projectId": subCat["projectFilmCategoryId"],
}
)
categories.append(category)
return categories
def get_movie_list(
self,
category_id="d4c82f14-4dfc-4fb6-a91e-9fd1876956ea",
date="",
search_text="",
pageSize=10,
pageIndex=1,
):
params = {
"ActivityFilmCategoryId": category_id,
"Language": 0,
"pageIndex": pageIndex,
"pageSize": pageSize,
"Date": date,
"SearchText": search_text,
}
res = self.session.get(
url=self.api_url + f"/Activity/{self.activity_id}/ActivityFilms",
params=params,
)
return self.handle_response("获取电影列表", res)
def get_movie_detail(self, movie_id):
res = self.session.get(
url=self.api_url + f"/ActivityFilm/{movie_id}",
)
return self.handle_response("获取电影详情", res)
def create_order(self, seat_ids=[], plan_id=""):
self.validate_user(self.activity_id)
self.session.headers.update({"Content-Type": "application/json"})
data = {
"activityFilmPlanSeats": seat_ids,
}
res = self.session.post(
f"{self.api_url}/ActivityFilmPlan/{plan_id}/ActivityFilmPlanOrder",
data=str(data),
)
return self.handle_response("创建订单", res)
def get_film_plan_detail(self, film_plan_id):
res = self.session.get(
url=self.api_url + f"/ActivityFilmPlan/{film_plan_id}",
)
return self.handle_response("获取场次详情", res)
def get_seats_for_film_plan(self, film_plan_id):
res = self.session.get(
url=self.api_url
+ f"/ActivityFilmPlan/{film_plan_id}/ActivityFilmPlanSeats",
)
return self.handle_response("获取座位", res)
def create_pay_qr_code(self, order_ids):
self.session.headers.update({"Content-Type": "application/json"})
for id in order_ids:
data = {
"couponCode": "",
}
res = self.session.post(
url=self.api_url + f"/ActivityFilmPlanOrder/{id}/InitiatePayPc",
data=str(data),
)
result = self.handle_response("生成支付二维码", res)
if "codeUrl" in result:
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_H,
box_size=5,
border=4,
)
qr.add_data(result["codeUrl"])
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
if not os.path.exists("pay"):
os.makedirs("pay")
with open(f"pay/{id}.png", "wb") as f:
img.save(f)
def handle_response(self, func_name, res):
if res.status_code == 200 or res.status_code == 201:
result = res.json()
return result
else:
raise requests.exceptions.RequestException(
f"{func_name}失败:{res.status_code},{res.reason}"
)
def find_all_the_films(self):
self.validate_user(self.activity_id)
categories = self.get_categories()
if not os.path.exists("categories.yaml"):
with open("categories.yaml", "w") as f:
yaml.dump(categories, f, allow_unicode=True)
film_screening_activities = [
category
for category in categories
if category["nameEn"] == "FILM SCREENING"
]
result = {}
movie_count = 0
for category in film_screening_activities[0]["children"]:
movies = []
movie_list = self.get_movie_list(category["id"], pageSize=20)
movie_count += len(movie_list["items"])
for movie in movie_list["items"]:
movie_detail = self.get_movie_detail(movie["id"])
plans = []
for plan in movie_detail["activityFilmPlans"]:
plans.append(
{
"id": plan["id"],
"cinemaHallId": plan["activityCinemaHallId"],
"cinemaHallName": plan["activityCinemaHall"],
"date": plan["date"],
"startTime": plan["startTime"],
"endTime": plan["endTime"],
"price": plan["price"],
}
)
movies.append(
{
"id": movie["id"],
"name": movie["activityFilmName"],
"category": movie["activityFilmCategoryName"],
"hasTickets": movie["hasTickets"],
"plans": plans,
}
)
result[category["name"]] = movies
if movie_count == 0:
raise LookupError("排片表未出来,请稍后再试")
return result
def search_movie_and_place_order(self):
if os.path.exists("all_movies.yaml"):
with open("all_movies.yaml", "r") as f:
all_movies = yaml.load(f, Loader=yaml.FullLoader)
else:
all_movies = self.find_all_the_films()
with open("all_movies.yaml", "w") as f:
yaml.dump(all_movies, f, allow_unicode=True)
if "movies" not in self.config or not self.config["movies"]:
raise Exception("配置错误:请配置电影")
movies = self.config["movies"]
order_ids = []
for movie in movies:
if "category" not in movie:
raise Exception("配置错误:请配置分类")
if "date" not in movie:
raise Exception("配置错误:请配置日期")
if "count" not in movie:
raise Exception("配置错误:请配置购票数量")
if movie["count"] > 5:
raise Exception("配置错误:购票数量不能超过5张")
categoryList = all_movies[movie["category"]]
[found_movie] = (
[item for item in categoryList if item["name"] == movie["name"]]
if any(item["name"] == movie["name"] for item in categoryList)
else [{}]
)
if not found_movie:
raise LookupError(f"未找到「{movie['name']}」")
plans = []
movie_detail = self.get_movie_detail(found_movie["id"])
for plan in movie_detail["activityFilmPlans"]:
plans.append(
{
"id": plan["id"],
"cinemaHallId": plan["activityCinemaHallId"],
"cinemaHallName": plan["activityCinemaHall"],
"date": plan["date"],
"startTime": plan["startTime"],
"endTime": plan["endTime"],
"price": plan["price"],
"canSell": plan["canSell"],
"hasTickets": plan["hasTickets"],
}
)
plan = (
[
item
for item in plans
if datetime.strptime(movie["date"], "%Y-%m-%d")
== datetime.strptime(item["date"], "%Y-%m-%d %H:%M:%S")
]
if any(
datetime.strptime(movie["date"], "%Y-%m-%d")
== datetime.strptime(item["date"], "%Y-%m-%d %H:%M:%S")
for item in found_movie["plans"]
)
else [{}]
)
if not plan:
raise LookupError(f"未找到「{movie['name']}」指定时间{movie['date']}的场次")
plan = plan[0]
if not plan["canSell"]:
raise Exception(f"「{movie['name']}」指定时间{movie['date']}的场次不可售")
if not plan["hasTickets"]:
raise Exception(f"「{movie['name']}」指定时间{movie['date']}的场次已售罄")
plan_detail = self.get_film_plan_detail(plan["id"])
if not plan_detail["canSell"]:
raise Exception(f"「{movie['name']}」指定时间{movie['date']}的场次不可售")
if not plan_detail["hasTickets"]:
raise Exception(f"「{movie['name']}」指定时间{movie['date']}的场次已售罄")
seats = self.get_seats_for_film_plan(plan["id"])
seat_ids = self.choose_seat(seats, movie)
if not seat_ids:
raise Exception(f"「{movie['name']}」'指定时间{movie['date']}的场次未找到可用座位")
result = self.create_order(seat_ids, plan["id"])
if "id" in result:
order_ids.append(result["id"])
else:
raise Exception(f"{movie['name']}指定时间{movie['date']}的场次下单失败")
return order_ids
def choose_seat(self, seats, movie_config):
num_seats_needed = movie_config["count"]
seats_by_row = defaultdict(list)
seats_number_by_row = defaultdict(list)
for seat in seats:
seats_number_by_row[seat["row"]].append(seat["number"])
area_condition = True
if "area" in movie_config:
area_condition = seat["area"] == movie_config["area"]
if seat["stateTypeId"] == AVAIlABLE_SEAT_TYPE_ID and area_condition:
seats_by_row[seat["row"]].append(
{"number": seat["number"], "id": seat["id"]}
)
else:
continue
best_score = float("inf")
best_seats = []
best_seat_row_number = []
mid_row = max(seats_number_by_row.keys()) // 2 # 计算中间排
for row, seats in seats_by_row.items():
seats.sort(key=lambda x: x["number"])
if len(seats) < num_seats_needed: # 如果该排空座位数不够,跳过
continue
if row <= 2:
continue
mid_point = max(seats_number_by_row[row]) // 2 # 计算中间位置
for i in range(len(seats) - num_seats_needed + 1):
selected_seats = seats[i : i + num_seats_needed]
# 计算得分(越低越好)
score = sum(abs(mid_point - x["number"]) for x in selected_seats) + (
abs(row - mid_row)
)
if score < best_score:
best_score = score
best_seats = []
best_seat_row_number = []
for seat in selected_seats:
best_seat_row_number.append((row, seat["number"]))
best_seats.append(seat["id"])
logger.info(f"最优座位:{best_seat_row_number}")
if not best_seats or len(best_seats) < 1: # 如果没有找到“最优”座位,尝试选择任何可用座位
for row, seat_numbers in sorted(seats_by_row.items()):
if len(seat_numbers) >= num_seats_needed:
return [x["id"] for x in seat_numbers[:num_seats_needed]]
else:
return [x["id"] for x in seat_numbers]
return best_seats
def buy_ticket(self):
self.timer.start()
retries = 0
while True:
try:
self.validate_user(self.activity_id)
order_ids = self.search_movie_and_place_order()
if len(order_ids) > 0:
logger.info(f"购票成功:{(',').join(order_ids)}")
send_bark("购票成功,请马上付款", {(",").join(order_ids)})
self.create_pay_qr_code(order_ids)
break
except requests.exceptions.RequestException as e:
logger.error(e)
retries += 1
send_bark("请求失败", e)
if retries > MAX_RETRY:
logger.error("重试次数过多,退出")
send_bark("重试次数过多,退出", "")
break
if "验证用户失败" in str(e):
send_bark("验证用户失败", "请检查token是否正确")
break
except LookupError as e:
logger.error(e)
if "指定时间" or "排片表" in str(e):
send_bark("指定时间无场次", str(e))
break
except Exception as e:
logger.error("购票失败:" + str(e))
if "配置错误" in str(e):
send_bark("配置错误", str(e))
break
self.wait_some_time()