-
Notifications
You must be signed in to change notification settings - Fork 2
/
monitor.py
136 lines (114 loc) · 4.5 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# cd /root/sh/git/ && nohup python3 /root/sh/git/git.py &
# coding:UTF-8
import json
import time
import os
import requests
import random
import requests.packages.urllib3 as urllib3
import pandas as pd
# 要监控的关键词,多个关键词以半角逗号分隔
# auto 会被解释为最新的 CVE 类型,如今年是2022年,auto 将被解释为 CVE-2022
search_key = "auto,rce"
# Server酱的 send key(在 https://sct.ftqq.com/ 获取)
send_key = "SCT63558ToWsvMZ0vUbc76WIhc5Dz7JOZ"
# 打印详细的信息
print_detail_enter = True
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " +
"Chrome / 70.0.3538.25 Safari / 537.36 Core / 1.70.3741.400 QQBrowser / 10.5.3863.400"
}
save_dir = os.path.join(os.path.split(os.path.realpath(__file__))[0], "data")
send_api = "https://sctapi.ftqq.com/%s.send" % send_key
# 将提取出来的数据中的 nan 转化为 None
urllib3.disable_warnings()
def push(params):
response = None
try:
response = requests.post(send_api, data=params, headers=headers, timeout=10, verify=False)
except ConnectionResetError:
try:
time.sleep(1)
response = requests.post(send_api, data=params, headers=headers, timeout=10, verify=False)
except ConnectionResetError:
print("请求被拒了!推送手机失败。")
if response.text.find("SUCCESS") == -1:
print("推送手机失败了。")
else:
print("推送手机成功。")
def crawling(category):
index = 0
save_path = os.path.join(save_dir, category + ".csv")
# 判断文件是否存在
data = []
new_data = []
try:
if os.path.exists(save_path):
# 如果文件存在则每次爬取 10 个
df = pd.read_csv(save_path, header=None)
df = df[df.columns[1:4]]
data = df.where(df.notnull(), None).values.tolist()
response = requests.get(
url="https://api.github.com/search/repositories?q=%s&sort=updated&perpage=10" % category,
headers=headers, verify=False)
else:
# 不存在爬取全部
response = requests.get(
url="https://api.github.com/search/repositories?q=%s&sort=updated&&order=desc&page=1" % category,
headers=headers, verify=False)
except Exception as e:
print("请检查网络连接!")
print(e)
return 1
try:
data_list: list = json.loads(response.text)["items"]
except json.decoder.JSONDecodeError:
print("数据解析出错了,请检查网络连接!")
return 1
content = ""
for i in data_list:
name = i['name']
url = i['html_url']
description = i['description']
s1 = [name, url, description]
if s1 not in data:
content = content + "\n%d.当前推送为:%s \n链接:%s \n描述:%s\n" % (index + 1, name, url, description)
s1.insert(0, time.strftime("%Y-%m-%d %H:%M", time.localtime()))
new_data.append(s1)
index += 1
title = "%s:已数据更新 %d 条数据。" % (category, index)
print(title)
if index:
if print_detail_enter:
print(content)
push({
"text": title,
"desp": content
})
print()
if index:
try:
if len(data):
df = pd.read_csv(save_path, header=None)
data = df.where(df.notnull(), None).values.tolist()
if not os.path.exists(save_dir):
os.mkdir(save_dir)
pd.DataFrame(data + new_data).to_csv(save_path, header=None, index=False)
print("数据已保存至 %s\n——————————————————————————" % save_path)
except PermissionError:
print("%s 文件被占用!保存数据失败。\n——————————————————————————" % save_path)
def get_newest():
return 'CVE-%d' % time.localtime(time.time())[0]
if __name__ == '__main__':
if search_key == "":
crawling(get_newest())
else:
search_list: list = search_key.split(',')
random.shuffle(search_list)
for j in search_list:
c = crawling(get_newest() if j == "auto" else j)
if c:
# 网络连接失败后等待10秒重连
time.sleep(10)
crawling(get_newest() if j == "auto" else j)
time.sleep(2)