-
Notifications
You must be signed in to change notification settings - Fork 0
/
_db.py
291 lines (245 loc) · 10.8 KB
/
_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import datetime
import logging
import os
from enum import Enum as PyEnum
from uuid import uuid4
import dotenv
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Integer, String, Text, select, text
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
dotenv.load_dotenv()
logger = logging.getLogger(__name__)
# === DATABASE Configuration ===
DATABASE_URL = os.getenv("MYSQL_CONN_STRING")
if not DATABASE_URL:
raise ValueError("MYSQL_CONN_STRING is not set")
if DATABASE_URL.startswith("mysql://"):
DATABASE_URL = DATABASE_URL.replace("mysql://", "mysql+asyncmy://", 1)
# === DATABASE ===
# Set up SQLAlchemy
Base = declarative_base() # 这里是一个基类,所有的 ORM 类都要继承这个类
engine = create_async_engine(DATABASE_URL, echo=False) # 创建一个引擎
# noinspection PyTypeChecker
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession) # 异步会话类
# 枚举类型定义
class SubChannelEnum(PyEnum):
"""
validator
"""
OLE_VOD = "ole_vod"
class User(Base):
__tablename__ = "users"
userId = Column(String(36), primary_key=True, index=True, default=uuid4().hex)
id = Column(String(12), index=True, unique=True)
username = Column(String(32), index=True, unique=True)
primaryEmail = Column(String(64), index=True, unique=True)
primaryPhone = Column(String(16), default="")
name = Column(String(32), default="")
avatar = Column(String(256), default="")
customData = Column(String(256), default='{}')
identities = Column(Text(), default='[]')
profile = Column(String(256), default="")
applicationId = Column(String(21), default="")
lastSignInAt = Column(Integer(), default=datetime.datetime.now().timestamp())
createdAt = Column(Integer(), default=datetime.datetime.now().timestamp())
updatedAt = Column(Integer(), default=datetime.datetime.now().timestamp())
vod_subs = relationship("VodSub", back_populates="user")
push_logs = relationship("PushLog", back_populates="user") # 增加 PushLog 关系
class VodSub(Base):
__tablename__ = "vod_subs"
id = Column(Integer(), primary_key=True, index=True, autoincrement=True)
sub_id = Column(String(32), index=True, unique=True)
sub_by = Column(String(36), ForeignKey('users.id'))
sub_channel = Column(String(32), default=SubChannelEnum.OLE_VOD.value) # 将 Enum 映射为字符串
sub_at = Column(DateTime, default=datetime.datetime.now(datetime.timezone.utc)) # 使用 UTC 时间
sub_needSync = Column(Boolean, default=False) # 取搜索是的year 判断是否需要同步
# 外键关联到 VodInfo 表
vod_info_id = Column(Integer, ForeignKey('vod_info.id'))
vod_info = relationship("VodInfo", back_populates="subs")
user = relationship("User", back_populates="vod_subs")
def to_dict(self):
"""
:return:
"""
return {
"sub_id": self.sub_id,
"sub_by": self.sub_by,
"sub_channel": self.sub_channel,
"sub_at": self.sub_at,
"vod_info_id": self.vod_info_id
}
class VodInfo(Base):
__tablename__ = "vod_info"
id = Column(Integer(), primary_key=True, index=True, autoincrement=True, unique=True)
vod_id = Column(String(32), index=True, unique=True, nullable=False)
vod_name = Column(String(32), index=True, default="")
vod_typeId = Column(Integer(), index=True, default=0)
vod_typeId1 = Column(Integer(), index=True, default=0)
vod_remarks = Column(String(24), default="") # Remarks or status of the VOD (e.g., "完结" means "completed")
vod_is_vip = Column(Boolean, default=False)
vod_episodes = Column(Integer(), default=0)
vod_urls = Column(String(256), default="")
vod_new = Column(Boolean, default=False)
vod_version = Column(String(16), default="未知")
vod_score = Column(Float(), default=0.0)
vod_year = Column(Integer(), default=0)
# 添加 relationship,反向关系到 VodSub
subs = relationship("VodSub", back_populates="vod_info")
def to_dict(self):
"""
:return:
"""
# noinspection PyTypeChecker
return {
column.name: getattr(self, column.name)
for column in self.__table__.columns
if column.name != 'subs' # Exclude the 'subs' relationship
}
class PushLog(Base):
"""
推送日志
:param push_id: 推送 ID
:param push_receiver: 接收者 ID
:param push_channel: 推送渠道
:param push_at: 推送时间
:param push_by: 推送者系统明
:param push_result: 推送结果
:param push_message: 推送消息
:param push_server: 推送服务器
:param user: User 对象
"""
__tablename__ = "push_logs"
id = Column(Integer(), primary_key=True, index=True, autoincrement=True, unique=True)
push_id = Column(String(32), index=True, unique=True)
push_receiver = Column(String(36))
push_channel = Column(String(32), default=SubChannelEnum.OLE_VOD.value) # 将 Enum 映射为字符串
push_at = Column(DateTime, default=datetime.datetime.now(datetime.timezone.utc)) # 使用 UTC 时间
push_by = Column(String(36))
push_result = Column(Boolean, default=False)
push_message = Column(String(256), default="")
push_server = Column(String(32), default="")
user_id = Column(String(36), ForeignKey('users.userId'))
user = relationship("User", back_populates="push_logs")
def to_dict(self):
"""
:return:
"""
return {
"push_id": self.push_id,
"push_by": self.push_by,
"push_channel": self.push_channel,
"push_at": self.push_at,
"push_result": self.push_result,
"push_message": self.push_message
}
# 创建或修改数据库中的表
async def init_db():
async with engine.begin() as conn:
try:
await conn.run_sync(Base.metadata.create_all, checkfirst=False)
except OperationalError as e:
logging.info("重建数据库表, 原因: %s", str(e))
# # 删除所有表
# await conn.run_sync(Base.metadata.drop_all)
# # 创建所有表
await conn.run_sync(Base.metadata.create_all)
except Exception as e:
raise RuntimeError(f"Database initialization failed: {str(e)}")
async def test_db_connection():
try:
async with SessionLocal() as session:
async with session.begin():
# 执行一个简单的查询
result = await session.execute(text("SELECT 1"))
assert result.scalar() == 1
return True
except Exception as e:
raise ConnectionError(f"Database connection failed: {str(e)}")
async def cache_vod_data(data):
"""
:param data:
"""
db: SessionLocal = SessionLocal()
for vod_data in data["data"]["data"]:
if vod_data["type"] == "vod":
for item in vod_data["list"]:
# 查找是否存在相同的 vod_id
# noinspection PyTypeChecker
stmt = select(VodInfo).where(VodInfo.vod_id == str(item["id"]))
result = await db.execute(stmt)
db_vod = result.scalar_one_or_none()
episode_list = item.get("episodes", [])
if episode_list:
item["episodes"] = len(episode_list)
if db_vod:
# 更新现有数据
db_vod.vod_name = item["name"]
db_vod.vod_typeId = item["typeId"]
db_vod.vod_typeId1 = item["typeId1"]
db_vod.vod_remarks = item["remarks"]
db_vod.vod_is_vip = item["vip"]
db_vod.vod_episodes = item.get("episodes", 0)
db_vod.vod_urls = item.get("pic", "")
db_vod.vod_new = item.get("new", False)
db_vod.vod_version = item.get("version", "未知")
db_vod.vod_score = item.get("score", 0.0)
db_vod.vod_year = item.get("year", 0)
else:
# 插入新数据
new_vod = VodInfo(
vod_id=str(item["id"]),
vod_name=item["name"],
vod_typeId=item["typeId"],
vod_typeId1=item["typeId1"],
vod_remarks=item["remarks"],
vod_is_vip=item["vip"],
vod_episodes=item.get("episodes", 0),
vod_urls=item.get("pic", ""),
vod_new=item.get("new", False),
vod_version=item.get("version", "未知"),
vod_score=item.get("score", 0.0),
vod_year=item.get("year", 0)
)
db.add(new_vod)
await db.commit()
await db.close()
class requestUpdate(Base):
__tablename__ = "request_update"
id = Column(Integer(), primary_key=True, index=True, autoincrement=True, unique=True)
request_id = Column(String(32), index=True, unique=True, default=uuid4().hex)
request_by = Column(String(36), ForeignKey('users.id'), default="Anonymous")
request_channel = Column(String(32), default=SubChannelEnum.OLE_VOD.value) # 将 Enum 映射为字符串
request_at = Column(DateTime, default=datetime.datetime.now(datetime.timezone.utc)) # 使用 UTC 时间
request_result = Column(Boolean, default=False)
request_vod = Column(String(256), default="")
request_vod_channel = Column(String(32), default="", nullable=True)
def to_dict(self):
"""
:return:
"""
# search username by userId
return {
"request_id": self.request_id,
"request_by": self.request_by,
"request_channel": self.request_channel,
"request_at": self.request_at,
"request_result": self.request_result,
"request_vod": self.request_vod,
"request_vod_channel": self.request_vod_channel
}
class WebHookStorage(Base):
__tablename__ = 'webhook_storage'
id = Column(String(32), primary_key=True, index=True, default=uuid4().hex)
hook_id = Column(String(32))
event = Column(String(24))
created_at = Column(DateTime, default=datetime.datetime.now(datetime.timezone.utc))
session_id = Column(String(32))
user_agent = Column(String(256), default="")
user_ip = Column(String(128), default="")
user_id = Column(String(32), default="")
sessionId = Column(String(32), default="")
application = Text()
def __repr__(self):
return f"<WebHookStorage(hook_id='{self.hook_id}', event='{self.event}')>"