-
Notifications
You must be signed in to change notification settings - Fork 0
/
_cronjobs.py
112 lines (100 loc) · 4.64 KB
/
_cronjobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
import logging
from datetime import datetime
from fastapi_utils.tasks import repeat_every
from httpx import AsyncClient
from sqlalchemy.exc import OperationalError
from _db import PushLog, SessionLocal
from _redis import delete_key, get_key, get_keys_by_pattern
logger = logging.getLogger(__name__)
async def logPushTask(taskId: str, data: dict):
"""
记录推送任务
:param taskId: str
:param data: dict
:return: Boolean
:example: {'data': {'baseURL': 'https://api.day.app/uKeSrwm3ainGgn5SAmRyg9/', 'msg': 'You have a new notification!', 'push_receiver': 'yuki', 'icon': 'https://static.olelive.com/snap/fa77502e442ee6bbd39be20b2a2810ee.jpg?_n=202409290554', 'click_url': 'https://example.com', 'is_passive': False, 'headers': {'Authorization': 'Bearer your_token_here', 'Content-Type': 'application/json'}, 'log_data': {'push_id': '12345', 'push_receiver': 'user@example.com', 'push_by': 'system'}}, 'result': 'success'}
"""
async with SessionLocal() as session:
async with session.begin():
push_result = True if data['result'] == 'success' else False
pushLog = PushLog(
push_id=taskId,
push_receiver=data['data']['log_data']['push_receiver'],
push_channel="bark",
push_at=datetime.now(),
push_by=data['data']['log_data']['push_by'] if 'push_by' in data['data']['log_data'] else 'system',
push_result=push_result,
push_message=data['data']['msg'],
push_server='bark',
user_id=data['data']['log_data']['user_id'] if 'user_id' in data['data']['log_data'] else None
)
session.add(pushLog)
try:
await session.commit()
return True
except OperationalError as e:
async with session.begin():
session.rollback()
pushLog = PushLog(
push_id=taskId,
push_receiver=data['data']['log_data']['push_receiver'],
push_channel="bark",
push_at=datetime.now(),
push_by=data['data']['log_data']['push_by'] if 'push_by' in data['data'][
'log_data'] else 'system',
push_result=push_result,
push_message=data['data']['msg'],
push_server='bark',
user_id=data['data']['log_data']['user_id'] if 'user_id' in data['data']['log_data'] else None
)
session.add(pushLog)
await session.commit()
return True
except Exception as e:
logger.error(f"Failed to log push task: {e}", exc_info=True)
return False
@repeat_every(seconds=5)
async def pushTaskExecQueue() -> bool:
"""
Process push tasks from the Redis keys matching 'pushTask:*' pattern.
:return: Boolean indicating success.
"""
try:
all_keys = await get_keys_by_pattern('pushTask:*')
if not all_keys:
return False
logger.info(f"Found {len(all_keys)} push tasks in the queue.")
async with AsyncClient() as client:
for key in all_keys:
value = await get_key(key)
if not value:
continue
data = json.loads(value)
logger.info(f"Processing push task: {data}")
url = (
f"{data['baseURL']}{data['msg']}?"
f"icon={data['icon']}&"
f"url={data['click_url']}&"
f"passive={data['is_passive']}"
)
url.replace("//", "/").replace("https:/", "https://")
response = await client.post(url)
if response.status_code == 200:
await delete_key(key)
data['result'] = 'success'
logger.info(f"Push task successful: {data}")
else:
data['result'] = 'failed'
logger.error(f"Failed to push task: {data}")
try:
# taskID 取 pushTask: 后面的字符串
taskID = key.split(":")[1]
print(taskID)
await logPushTask(taskID, data)
except Exception as e:
logger.error(f"Failed to log push task: {e}", exc_info=True)
return True
except Exception as e:
logger.error(f"Error in pushTaskExecQueue: {e}", exc_info=True)
return False