Skip to content
This repository has been archived by the owner on May 5, 2022. It is now read-only.

Data synchronization using PyMySql and Websockets library #2

Merged
merged 25 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
14269b1
refactor: logging config from file
Ecss11 Jan 25, 2022
15756ab
git: 无视日志和数据库配置
Ecss11 Jan 25, 2022
4884282
refactor: 优化 try 块和日志
Ecss11 Jan 25, 2022
534b34b
fix: 改为普通方法
Ecss11 Jan 26, 2022
648be10
refactor: 使用继承 Mysql 的方法优化扩展性
Ecss11 Jan 26, 2022
9ebddcc
fix: 修复已知 IO 问题
Ecss11 Jan 26, 2022
0ccc5f9
refactor: rename and separate function
Ecss11 Jan 30, 2022
07f5d3a
feat: alert event update
Ecss11 Jan 30, 2022
73742e2
fix: Fixed known IO issues
Ecss11 Jan 26, 2022
5ce7de6
refactor: rename and separate function
Ecss11 Jan 30, 2022
04d139d
feat: alert event update
Ecss11 Jan 30, 2022
030f555
Merge remote-tracking branch 'origin/develop' into develop
Ecss11 Jan 30, 2022
88d5d75
docs: how to use the application
Ecss11 Jan 30, 2022
08b5525
fix: no newline at end of line
Ecss11 Jan 30, 2022
1f8c720
fix: no newline at end of line
Ecss11 Jan 30, 2022
efec437
refactor: extract Mysql and its child class to module
Ecss11 Jan 30, 2022
66e784c
git: gitkeeper for log
Ecss11 Jan 30, 2022
c47df47
git: gitkeeper for test folder
Ecss11 Jan 30, 2022
9458e2b
git: rename config file
Ecss11 Jan 30, 2022
4632eb1
refactor: more flexible logging
Ecss11 Jan 30, 2022
5631713
refactor: rename match_event function
Ecss11 Jan 30, 2022
81c9d96
refactor: setup logging when call
Ecss11 Jan 30, 2022
00ac23c
refactor: create controller folder
Ecss11 Jan 30, 2022
30bf84b
refactor: json structure change
Ecss11 Jan 30, 2022
0338bcf
docs: README update for config
Ecss11 Jan 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/sendlog.txt
# Ignore log files
/log/
/test/
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# PS2-DatabaseSync

> A tool based on the Planetside 2 API.

Get game events and sync to database.

## How to use

1. First you need a `config.json` file in config, for example:

```json
{
"database": {
"host": "localhost",
"port": 3306,
"database": "example",
"user": "user",
"password": "password"
}
}
```

2. You will need following third-party dependencies to run the application.

```requirements.txt
pymysql >= 1.0.2
websockets >= 10.1
```

3. Use the command below to start.

```shell
python -m update_database.py
```
2 changes: 2 additions & 0 deletions config/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Ignore database config update
/config.json
9 changes: 9 additions & 0 deletions config/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"database": {
"host": "localhost",
"port": 3306,
"database": "example",
"user": "user",
"password": "password"
}
}
7 changes: 0 additions & 7 deletions config/database.json

This file was deleted.

32 changes: 32 additions & 0 deletions config/logging.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"version": 1,
"disable_existing_loggers": false,
"root": {
"level": "DEBUG",
"handlers": [
"monitor",
"runtime"
]
},
"handlers": {
"monitor": {
"class": "logging.StreamHandler",
"formatter": "default",
"level": "DEBUG",
"stream": "ext://sys.stdout"
},
"runtime": {
"class": "logging.handlers.TimedRotatingFileHandler",
"formatter": "default",
"level": "INFO",
"filename": "log/runtime.log",
"when": "midnight",
"backupCount": "3"
}
},
"formatters": {
"default": {
"format": "%(asctime)s %(levelname)-8s %(message)s"
}
}
}
60 changes: 60 additions & 0 deletions controller/mysql_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json

import pymysql


class Mysql(object):
"""数据库相关操作

"""

def __init__(self):
with open("config/config.json") as config:
database = json.load(config)["database"]

self.conn = pymysql.connect(host=database["host"], port=database["port"], db=database["database"],
user=database["user"], password=database["password"])

async def update_event_in_background(self, payload):
pass


class DeathEventHandler(Mysql):
"""死亡事件

"""

async def update_event_in_background(self, payload):
"""击杀数据库更新

:param payload: Websocket 订阅数据,字典类。
"""
with self.conn.cursor() as cursor:
sql = "INSERT INTO ps2_death (attacker_character_id, attacker_fire_mode_id, attacker_loadout_id, " \
"attacker_vehicle_id, attacker_weapon_id, character_id, character_loadout_id, is_headshot, " \
"world_id, zone_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (payload["attacker_character_id"], payload["attacker_fire_mode_id"],
payload["attacker_loadout_id"], payload["attacker_vehicle_id"],
payload["attacker_weapon_id"], payload["character_id"],
payload["character_loadout_id"], payload["is_headshot"],
payload["world_id"], payload["zone_id"]))
self.conn.commit()


class AlertEventHandler(Mysql):
"""警报事件

"""

async def update_event_in_background(self, payload):
"""警报数据库更新

:param payload: Websocket 订阅数据,字典类。
"""
with self.conn.cursor() as cursor:
sql = "INSERT INTO ps2_jingbao (faction_vs, faction_tr, faction_nc, world_id, zone_id, " \
"metagame_event_id, metagame_event_state) VALUES (%s, %s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (payload["faction_vs"], payload["faction_tr"], payload["faction_nc"],
payload["world_id"], payload["zone_id"], payload["metagame_event_id"],
payload["metagame_event_state"]))
self.conn.commit()
Empty file added log/.gitkeeper
Empty file.
Empty file added test/.gitkeeper
Empty file.
Empty file added test/config/.gitkeeper
Empty file.
118 changes: 57 additions & 61 deletions update_database.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,91 @@
import asyncio
import json
import logging
import logging.config
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like import logging.config will automatically import logging methods.

import logging.handlers
import os

import pymysql
import websockets

logger = logging.getLogger()
logfile = 'test.log'
hdlr = logging.FileHandler('sendlog.txt')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.NOTSET)
from controller import mysql_controller


class Mysql(object):
"""数据库相关操作
class Subscription(object):
"""同步订阅事件至数据库,使用 Websockets

"""

def __init__(self):
with open("config/database.json") as config:
database = json.load(config)
self.conn = pymysql.connect(host=database["host"], port=database["port"], db=database["database"],
user=database["user"], password=database["password"])
# Websocket API 订阅内容,http://census.daybreakgames.com/
self.ps_api = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:yinxue"
self.subscribe = '{"service":"event","action":"subscribe","characters":["all"],"eventNames":["Death", ' \
'"MetagameEvent"],"worlds":["1", "10", "13", "17", "40"],' \
'"logicalAndCharactersWithWorlds":true} '
self.death_handler = mysql_controller.DeathEventHandler()
self.alert_handler = mysql_controller.AlertEventHandler()

async def update_death_event(self, payload):
"""击杀数据库更新
async def connect_ps_api(self):
"""连接行星边际 API 接口,并调用更新方法同步数据

:param payload: Websocket 订阅数据,字典类。
"""
# 数据库更新
with self.conn.cursor() as cursor:
sql = "INSERT INTO ps2_death (attacker_character_id, attacker_fire_mode_id, attacker_loadout_id, " \
"attacker_vehicle_id, attacker_weapon_id, character_id, character_loadout_id, is_headshot, " \
"world_id, zone_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (payload["attacker_character_id"], payload["attacker_fire_mode_id"],
payload["attacker_loadout_id"], payload["attacker_vehicle_id"],
payload["attacker_weapon_id"], payload["character_id"],
payload["character_loadout_id"], payload["is_headshot"],
payload["world_id"], payload["zone_id"]))
self.conn.commit()

print(payload)

async def update_alert_event(self, payload):
""" 警报数据库更新

:param payload: Websocket 订阅数据,字典类。
async with websockets.connect(self.ps_api, ping_timeout=None) as ws:
logger.info("Connection established.")
await ws.send(self.subscribe)
while True:
message = await ws.recv()
data: dict = json.loads(message)

await self.sync_data_to_database(data)

async def sync_data_to_database(self, data):
"""同步数据至数据库

:param data: API 返回数据
"""
# 数据库更新
print(payload)
is_subscription_event = True and data.get("service") == "event" and data.get("type") == "serviceMessage"

if is_subscription_event:
await self.select_event_handler(data)

async def connect_websocket():
# Websocket API 订阅内容,http://census.daybreakgames.com/
ps_api = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:yinxue"
subscribe = '{"service":"event","action":"subscribe","characters":["all"],"eventNames":["Death", ' \
'"MetagameEvent"],"worlds":["1", "10", "13", "17", "40"],"logicalAndCharactersWithWorlds":true} '
# 连接数据库
mysql = Mysql()
async def select_event_handler(self, data):
"""匹配事件对应的数据库操作

:param data: API 返回数据
"""
payload: dict = data["payload"]

async with websockets.connect(ps_api, ping_timeout=None) as websocket:
await websocket.send(subscribe)
print("Pending for message...")
if payload.get("event_name") == "Death":
await self.death_handler.update_event_in_background(payload)

while True:
message = await websocket.recv()
data: dict = json.loads(message)
elif payload.get("event_name") == "MetagameEvent":
await self.alert_handler.update_event_in_background(payload)

# 是否为订阅事件
if not (True and data.get("service") == "event" and data.get("type") == "serviceMessage"):
continue

# 判断事件选择数据库操作
payload: dict = data["payload"]
def setup_logging():
with open("config/logging.json", "r") as logging_config_file:
logging_config = json.load(logging_config_file)

if payload.get("event_name") == "Death":
await mysql.update_death_event(payload)
logging.config.dictConfig(logging_config)

elif payload.get("event_name") == "MetagameEvent":
await mysql.update_alert_event(payload)
return logging.getLogger()


logger = setup_logging()


if __name__ == '__main__':
current_file_path = os.path.dirname(__file__)
os.chdir(current_file_path)

synchronize = Subscription()

while True:
try:
asyncio.run(connect_websocket())
asyncio.run(synchronize.connect_ps_api())

except KeyboardInterrupt:
logger.info("The program was closed by the user.")
break

except websockets.WebSocketException:
logger.warning("Connection failed, try to reconnect.")
continue