-
Notifications
You must be signed in to change notification settings - Fork 1
/
sql.py
160 lines (128 loc) · 4.28 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from collections import Counter
import re
from urllib.parse import unquote
from gensim.models.word2vec import Word2Vec
import pandas as pd
import nltk
# 定义GeneSeg函数,用于处理输入的payload
def GeneSeg(payload):
# 数字泛化为"0"
payload = payload.lower()
payload = unquote(unquote(payload))
payload, num = re.subn(r"\d+", "0", payload)
# 替换url为 http://u
payload, num = re.subn(
r"(http|https)://[a-zA-Z0-9\.@&/#!#\?]+", "http://u", payload
)
# 分词
r = """
(?x)[\w\.]+?\(
|\)
|"\w+?"
|'\w+?'
|http://\w
|</\w+>
|<\w+>
|<\w+
|\w+=
|>
|[\w\.]+
"""
return nltk.regexp_tokenize(payload, r)
# 读取普通数据,并将其命名为payload,frac=0.1表示从数据集中随机抽取10%的数据
normal_data = pd.read_csv("sql-inject/benign.csv", names=["payload"]).sample(frac=0.1)
# 读取恶意数据,并将其命名为payload,frac=0.1表示从数据集中随机抽取10%的数据
sql_data = pd.read_table("sql-inject/malicious.csv", names=["payload"], sep="~").sample(
frac=0.1
)
normal_data["label"] = 0
sql_data["label"] = 1
# 将正常数据和sql攻击数据按行合并
data = pd.concat([normal_data, sql_data])
# 将data中的payload列映射为GeneSeg函数,并将结果赋值给words列
data["words"] = data["payload"].map(GeneSeg)
# 展开词汇列表
all_words = [word for sublist in data[data["label"] == 1]["words"] for word in sublist]
# 统计词频
word_counts = Counter(all_words)
vocabulary_size = 3000
# 选取出现次数最多的3000个词汇构建词汇表
top_words = [word for word, count in word_counts.most_common(vocabulary_size)]
# 构建词汇表
vocab = {word: idx for idx, word in enumerate(top_words)} # 构建词汇表
# 根据词汇表替换词汇列表中的词汇
processed_words = [
[word if word in vocab else "UNK" for word in sublist]
for sublist in data[data["label"] == 1]["words"]
]
embedding_size = 300
num_sampled = 20
skip_window = 10
num_iter = 10
model = Word2Vec(
processed_words,
vector_size=embedding_size,
window=skip_window,
negative=num_sampled,
epochs=num_iter,
)
embeddings = model.wv
import os
# 创建保存模型的文件夹
save_dir = "sql-models"
os.makedirs(save_dir, exist_ok=True)
# 保存模型
model.save(os.path.join(save_dir, "trained_w2v_model.h5"))
# embeddings.similar_by_word("select", 10)
import numpy as np
import tensorflow as tf
# 准备数据
from tensorflow.keras.preprocessing.sequence import pad_sequences
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(data["words"])
import pickle
# 保存 Tokenizer 对象
with open("sql-models/tokenizer.pickle", "wb") as file:
pickle.dump(tokenizer, file)
word_index = tokenizer.word_index
X = tokenizer.texts_to_sequences(data["words"])
X = pad_sequences(X)
print("max length" + str(X.shape[1]))
# 处理标签
Y = np.array(data["label"])
# 划分训练集和测试集
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(
X, Y, test_size=0.2, random_state=42
)
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, SpatialDropout1D, LSTM, Dense
# 构建LSTM模型
lstm_model = Sequential()
lstm_model.add(Embedding(len(word_index) + 1, 128, input_length=X.shape[1]))
lstm_model.add(SpatialDropout1D(0.2))
lstm_model.add(LSTM(100))
lstm_model.add(Dense(1, activation="tanh"))
# 编译模型
lstm_model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
# 训练模型
lstm_model.fit(
X_train, Y_train, epochs=3, batch_size=16, validation_data=(X_test, Y_test)
)
# 创建保存模型的文件夹
os.makedirs(save_dir, exist_ok=True)
# 保存模型
lstm_model.save(os.path.join(save_dir, "trained_sqldec_model.h5"))
# 进行预测
y_pred = lstm_model.predict(X_test)
y_pred = (y_pred > 0.5).astype(int)
# 进行评估
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
accuracy = accuracy_score(Y_test, y_pred)
precision = precision_score(Y_test, y_pred)
recall = recall_score(Y_test, y_pred)
f1 = f1_score(Y_test, y_pred)
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 score: {f1}")