Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

机器学习项目tensorflow实战 #41

Open
iloveyou11 opened this issue May 17, 2021 · 0 comments
Open

机器学习项目tensorflow实战 #41

iloveyou11 opened this issue May 17, 2021 · 0 comments

Comments

@iloveyou11
Copy link
Owner

iloveyou11 commented May 17, 2021

闲暇时间利用tensorflow框架写一些小项目练手~ github项目地址

房价预测线性回归

第1步:进行数据处理

首先读取csv数据,再对x1 x2 ... xn y数据进行归一化处理,接下来添加单独的一列x0(值均为1,常数项)
回归模型1

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d

# 加载数据
sns.set(context="notebook"style="whitegrid"palette="dark")
df0 = pd.read_csv('房价预测线性回归/data0.csv'names=['square''price'])
sns.lmplot('square''price'df0height=6fit_reg=True)

df1 = pd.read_csv('房价预测线性回归/data1.csv'names=['square''bedrooms''price'])
print(df1.head())

# 绘制3d散点图
fig = plt.figure()
ax = plt.axes(projection='3d')
ax.set_xlabel('square')
ax.set_ylabel('bedrooms')
ax.set_zlabel('price')
ax.scatter3D(df1['square'], df1['bedrooms'],
             df1['price'], c=df1['price'], cmap='Greens')

# 数据规范化
def normalize(df):
    return df.apply(lambda col: (col-col.mean())/col.std())


df = normalize(df1)
print(df.head())

# 绘制规范化数据后的3d散点图
fig = plt.figure()
ax = plt.axes(projection='3d')
ax.set_xlabel('square')
ax.set_ylabel('bedrooms')
ax.set_zlabel('price')
ax.scatter3D(df['square'], df['bedrooms'],
             df['price'], c=df['price'], cmap='Reds')
plt.show()

# 添加列
ones = pd.DataFrame({'ones': np.ones(len(df))})
df = pd.concat([onesdf], axis=1)
print(df.head())

回归模型2

回归模型3

经过归一化处理后的数据结构如下:

   ones    square  bedrooms     price
0   1.0  0.130010 -0.223675  0.475747
1   1.0 -0.504190 -0.223675 -0.084074
2   1.0  0.502476 -0.223675  0.228626
3   1.0 -0.735723 -1.537767 -0.867025
4   1.0  1.257476  1.090417  1.595389

第2步:训练模型

在第1步得到的数据基础上进行处理. 首先,需要拿到x y的数据,定义学习率learning_rate和训练次数epoch,分别输入x y,计算损失loss值,使用梯度下降优化器进行优化操作.GradientDescentOptimizer

import tensorflow as tf
import numpy as np
import pandas as pd

def normalize(df):
    return df.apply(lambda col: (col-col.mean())/col.std())

df = pd.read_csv('房价预测线性回归/data1.csv'names=['square''bedrooms''price'])
df = normalize(df)
ones = pd.DataFrame({'ones': np.ones(len(df))})
df = pd.concat([onesdf], axis=1)
# print(df.head())


# 数据处理
X_data = np.array(df[df.columns[0:3]])
y_data = np.array(df[df.columns[-1]]).reshape(len(df), 1)
print(X_data.shape, type(X_data))
print(y_data.shape, type(y_data))


# 创建显性回归模型
learning_rate = 0.01
epoch = 500
# 输入x y
X = tf.compat.v1.placeholder(tf.float32X_data.shape)
y = tf.compat.v1.placeholder(tf.float32y_data.shape)
W = tf.compat.v1.get_variable(
    "weights", (X_data.shape[1], 1), initializer=tf.constant_initializer())
y_pred = tf.matmul(XW)
loss_op = 1 / (2 * len(X_data)) * tf.matmul((y_pred - y),
                                            (y_pred - y), transpose_a=True)
opt = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
train_op = opt.minimize(loss_op)


# 创建会话
with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    for e in range(1epoch+1):
        sess.run(train_opfeed_dict={X: X_datay: y_data})
        if e % 10 == 0:
            lossw = sess.run([loss_opW], feed_dict={X: X_datay: y_data})
            print("Epoch %d \t Loss=%.4g \t Model: y = %.4gx1 + %.4gx2 + %.4g" %
                  (elossw[1], w[2], w[0]))

模型训练好后的数据如下:

Epoch 10         Loss=0.4116     Model: y = 0.0791x1 + 0.03948x2 + 3.353e-10
Epoch 20         Loss=0.353      Model: y = 0.1489x1 + 0.07135x2 + -5.588e-11
Epoch 30         Loss=0.3087     Model: y = 0.2107x1 + 0.09676x2 + 3.912e-10
Epoch 40         Loss=0.2748     Model: y = 0.2655x1 + 0.1167x2 + -1.863e-11
Epoch 50         Loss=0.2489     Model: y = 0.3142x1 + 0.1321x2 + 1.77e-10
Epoch 60         Loss=0.2288     Model: y = 0.3576x1 + 0.1436x2 + -4.47e-10
Epoch 70         Loss=0.2131     Model: y = 0.3965x1 + 0.1519x2 + -8.103e-10
Epoch 80         Loss=0.2007     Model: y = 0.4313x1 + 0.1574x2 + -6.985e-10
Epoch 90         Loss=0.1908     Model: y = 0.4626x1 + 0.1607x2 + -4.936e-10
......
......
Epoch 420        Loss=0.1332     Model: y = 0.8076x1 + 0.02271x2 + 2.125e-09
Epoch 430        Loss=0.133      Model: y = 0.8109x1 + 0.01957x2 + 2.292e-09
Epoch 440        Loss=0.1328     Model: y = 0.8141x1 + 0.01655x2 + 2.913e-09
Epoch 450        Loss=0.1326     Model: y = 0.8171x1 + 0.01366x2 + 3.412e-09
Epoch 460        Loss=0.1325     Model: y = 0.82x1 + 0.01087x2 + 3.749e-09
Epoch 470        Loss=0.1323     Model: y = 0.8228x1 + 0.008204x2 + 3.499e-09
Epoch 480        Loss=0.1322     Model: y = 0.8254x1 + 0.005641x2 + 3.663e-09
Epoch 490        Loss=0.1321     Model: y = 0.828x1 + 0.003183x2 + 4.2e-09
Epoch 500        Loss=0.132      Model: y = 0.8304x1 + 0.0008239x2 + 4.138e-09

第3步:可视化流图

使用tensorboard可以可视化数据流图,可以方便我们查看训练的过程

import tensorflow as tf
import numpy as np
import pandas as pd

def normalize(df):
    return df.apply(lambda col: (col-col.mean())/col.std())

df = pd.read_csv('房价预测线性回归/data1.csv'names=['square''bedrooms''price'])
df = normalize(df)
ones = pd.DataFrame({'ones': np.ones(len(df))})
df = pd.concat([onesdf], axis=1)
# print(df.head())

# 数据处理
X_data = np.array(df[df.columns[0:3]])
y_data = np.array(df[df.columns[-1]]).reshape(len(df), 1)
print(X_data.shape, type(X_data))
print(y_data.shape, type(y_data))

# 创建显性回归模型
learning_rate = 0.01
epoch = 500
# 输入x y
with tf.name_scope('input'):
    X = tf.compat.v1.placeholder(tf.float32X_data.shape)
    y = tf.compat.v1.placeholder(tf.float32y_data.shape)
with tf.name_scope('hypothesis'):
    W = tf.compat.v1.get_variable(
        "weights", (X_data.shape[1], 1), initializer=tf.constant_initializer())
    y_pred = tf.matmul(XW)
with tf.name_scope('loss'):
    loss_op = 1 / (2 * len(X_data)) * tf.matmul((y_pred - y),
                                                (y_pred - y), transpose_a=True)
with tf.name_scope('train'):
    train_op = tf.train.GradientDescentOptimizer(
        learning_rate=learning_rate).minimize(loss_op)

# 创建会话
with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    writer = tf.compat.v1.summary.FileWriter('./summary'sess.graph)
    for e in range(1epoch+1):
        sess.run(train_opfeed_dict={X: X_datay: y_data})
        if e % 10 == 0:
            lossw = sess.run([loss_opW], feed_dict={X: X_datay: y_data})
            print("Epoch %d \t Loss=%.4g \t Model: y = %.4gx1 + %.4gx2 + %.4g" %
                  (elossw[1], w[2], w[0]))
writer.close()

其中,定义的with tf.name_scope('xxx'):是为了将相关的部分视为整体展示在tensorboard中,可以更加方便地展开和隐藏,能够更有效地展示模型的结构.

第4步:可视化损失loss

import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import pandas as pd

def normalize(df):
    return df.apply(lambda col: (col-col.mean())/col.std())

df = pd.read_csv('房价预测线性回归/data1.csv'names=['square''bedrooms''price'])
df = normalize(df)
ones = pd.DataFrame({'ones': np.ones(len(df))})
df = pd.concat([onesdf], axis=1)
# print(df.head())


# 数据处理
X_data = np.array(df[df.columns[0:3]])
y_data = np.array(df[df.columns[-1]]).reshape(len(df), 1)
print(X_data.shape, type(X_data))
print(y_data.shape, type(y_data))

# 创建显性回归模型
learning_rate = 0.01
epoch = 500
# 输入x y
with tf.name_scope('input'):
    X = tf.compat.v1.placeholder(tf.float32X_data.shape)
    y = tf.compat.v1.placeholder(tf.float32y_data.shape)
with tf.name_scope('hypothesis'):
    W = tf.compat.v1.get_variable(
        "weights", (X_data.shape[1], 1), initializer=tf.constant_initializer())
    y_pred = tf.matmul(XW)
with tf.name_scope('loss'):
    loss_op = 1 / (2 * len(X_data)) * tf.matmul((y_pred - y),
                                                (y_pred - y), transpose_a=True)
with tf.name_scope('train'):
    train_op = tf.train.GradientDescentOptimizer(
        learning_rate=learning_rate).minimize(loss_op)

# 创建会话
with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    writer = tf.compat.v1.summary.FileWriter('./summary'sess.graph)
    # 记录所有损失值
    loss_data = []
    for e in range(1epoch+1):
        _lossw = sess.run([train_oploss_opW],
                              feed_dict={X: X_datay: y_data})
        loss_data.append(float(loss))
        if e % 100 == 0:
            log_str = "Epoch %d \t Loss=%.4g \t Model: y = %.4gx1 + %.4gx2 + %.4g"
            print(log_str % (elossw[1], w[2], w[0]))
writer.close()
# print(len(loss_data))

# 可视化损失值
sns.set(context="notebook"style="whitegrid"palette="dark")
ax = sns.lineplot(x='epoch'y='loss'data=pd.DataFrame(
    {'loss': loss_data'epoch': np.arange(epoch)}))
ax.set_xlabel('epoch')
ax.set_ylabel('loss')
plt.show()

每次迭代过程中,损失值的变化趋势如下:

回归模型4

由此可见,随着迭代次数的增多,损失值越来越小,最后趋于平稳,模型越来越优.

手写数字识别

第1步:加载数据集mnist

# 获取mnist数据集
import matplotlib.pyplot as plt
from keras.datasets import mnist

(train_xtrain_y), (test_xtest_y) = mnist.load_data()
print(train_x.shapetrain_y.shape)
print(test_x.shapetest_y.shape)

# 可视化数据集
fig = plt.figure()
for i in range(15):
    plt.subplot(35i+1)
    plt.tight_layout()
    plt.imshow(train_x[i], cmap='Greys')
    plt.title('label:{}'.format(train_y[i]))
    plt.xticks([])
    plt.yticks([])
plt.show()

最后得到的数据集如下:

手写数字1

第2步:利用softmax进行手写数字识别

具体流程:

  1. 统计训练数据中各标签数量,并可视化标签数量,保证各类数字数量差不多,这样可以保证接下来训练模型的可靠性
  2. 数据处理:one-hot 编码
  3. 使用 Keras sequential model 定义神经网络(在这里,简单地使用了Dense-Activation(relu)-Dense-Activation(relu)-Dense-Activation(softmax)的结构),通过softmax计算的概率值大小判断是哪个数字的可能性最大
  4. 编译模型(利用model.compile()进行模型的编译)
  5. 训练模型,并将指标保存到 history 中
  6. 保存模型(利用model.save()将模型保存到本地,下次可以直接使用此模型),官方解释如下:
You can use model.save(filepath) to save a Keras model into a single HDF5 file which will contain:

the architecture of the model, allowing to re-create the model
the weights of the model
the training configuration (loss, optimizer)
the state of the optimizer, allowing to resume training exactly where you left off.
You can then use keras.models.load_model(filepath) to reinstantiate your model. load_model will also take care of compiling the model using the saved training configuration (unless the model was never compiled in the first place).
import tensorflow as tf
import os
from keras.layers.core import DenseActivation
from keras.models import Sequential
from keras.utils import np_utils
import matplotlib.pyplot as plt
import numpy as np
from keras.datasets import mnist

# 获取数据集
(x_trainy_train), (x_testy_test) = mnist.load_data()

# 规范化
X_train = x_train.reshape(60000784)
X_test = x_test.reshape(10000784)
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255

# 统计各标签数量
labelcount = np.unique(y_trainreturn_counts=True)
# print(label, count)

# 可视化标签数量
fig = plt.figure()
plt.bar(labelcountwidth=0.7align='center')
plt.title("Label Distribution")
plt.xlabel("Label")
plt.ylabel("Count")
plt.xticks(label)
plt.ylim(07500)
for labelcount in zip(labelcount):
    plt.text(labelcount'%d' % countha='center'va='bottom'fontsize=10)
# plt.show()

# one-hot编码
n_classes = 10
# print('before one-hot:', y_train.shape)
Y_train = np_utils.to_categorical(y_trainn_classes)
# print('after one-hot:', Y_train.shape)
Y_test = np_utils.to_categorical(y_testn_classes)

# 定义神经网络
model = Sequential()

model.add(Dense(512input_shape=(784,)))
model.add(Activation('relu'))

model.add(Dense(512))
model.add(Activation('relu'))

model.add(Dense(10))
model.add(Activation('softmax'))

# 编译模型
model.compile(loss='categorical_crossentropy'metrics=['accuracy'], optimizer='adam')
# 开始训练
history = model.fit(
    X_trainY_trainbatch_size=128epochs=5verbose=2validation_data=(X_testY_test)
)

# 可视化指标
# print(history.history)
fig = plt.figure()

plt.subplot(211)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train''test'], loc='lower right')

plt.subplot(212)
plt.plot(history.history['loss'])  # 损失
plt.plot(history.history['val_loss'])  # 测试集上的损失
plt.title('Model Loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train''test'], loc='upper right')
plt.tight_layout()
# plt.show()

# 保存模型
save_dir = "./mnist/model/"
if tf.io.gfile.exists(save_dir):
    tf.io.gfile.rmtree(save_dir)
tf.io.gfile.makedirs(save_dir)

model_name = 'keras_mnist.h5'
model_path = os.path.join(save_dirmodel_name)
model.save(model_path)
print('Saved trained model at %s ' % model_path)

训练的结果为:

Train on 60000 samples, validate on 10000 samples
Epoch 1/5
 - 10s - loss: 0.2186 - acc: 0.9353 - val_loss: 0.0935 - val_acc: 0.9709
Epoch 2/5
 - 11s - loss: 0.0788 - acc: 0.9754 - val_loss: 0.0757 - val_acc: 0.9749
Epoch 3/5
 - 11s - loss: 0.0494 - acc: 0.9837 - val_loss: 0.0636 - val_acc: 0.9807
Epoch 4/5
 - 12s - loss: 0.0353 - acc: 0.9888 - val_loss: 0.0762 - val_acc: 0.9769
Epoch 5/5
 - 11s - loss: 0.0265 - acc: 0.9910 - val_loss: 0.0724 - val_acc: 0.9786

可视化指标效果如下:
手写数字2

保存模型后,如果要实现模型的加载,则可以使用load_model函数,其中model_path是模型的路径.使用训练好的此模型统计测试集上的分类结果,具体代码如下:

from keras.models import load_model
import os
import numpy as np
from keras.utils import np_utils
from keras.datasets import mnist

(x_trainy_train), (x_testy_test) = mnist.load_data()
# 规范化
X_test = x_test.reshape(10000784)
X_test = X_test.astype('float32')
X_test /= 255

n_classes = 10
Y_test = np_utils.to_categorical(y_testn_classes)

save_dir = "./mnist/model/"
model_name = 'keras_mnist.h5'
model_path = os.path.join(save_dirmodel_name)
mnist_model = load_model(model_path)

loss_and_metrics = mnist_model.evaluate(X_testY_testverbose=2)
print("Test Loss: {}".format(loss_and_metrics[0]))
print("Test Accuracy: {}%".format(loss_and_metrics[1]*100))

predicted_classes = mnist_model.predict_classes(X_test)

correct = np.nonzero(predicted_classes == y_test)[0]
incorrect = np.nonzero(predicted_classes != y_test)[0]
print("Classified correctly count: {}".format(len(correct)))
print("Classified incorrectly count: {}".format(len(incorrect)))

得到的结果如下:

Test Loss: 0.07241645678399945
Test Accuracy: 97.86%
Classified correctly count: 9786
Classified incorrectly count: 214

由此可见,训练结果还是相当不错的,准确率达到了97.86%.

第3步:利用CNN进行手写数字识别

整体的流程和第2步是非常类似的,只是在模型训练的过程中,采用的是CNN卷积神经网络,加入了更多的隐藏层,增加了模型的复杂度,也提高了模型的准确性。具体的神经网络设计如下,分别为卷积层-卷积层-池化层-dropout层-flatten层-全连接层-dropout层-softmax全连接层,具体参数如下:

  1. 第1层卷积,32个3x3的卷积核 ,激活函数使用 relu
  2. 第2层卷积,64个3x3的卷积核,激活函数使用 relu
  3. 最大池化层,池化窗口 2x2
  4. Dropout 25% 的输入神经元
  5. 将 Pooled feature map 摊平后输入全连接网络
  6. 全联接层,激活函数使用 relu
  7. Dropout 50% 的输入神经元
  8. 使用 softmax 激活函数做多分类,输出各数字的概率

查看 MNIST CNN 模型网络结构:

Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_1 (Conv2D)            (None, 26, 26, 32)        320       
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 24, 24, 64)        18496     
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 12, 12, 64)        0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 12, 12, 64)        0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 9216)              0         
_________________________________________________________________
dense_1 (Dense)              (None, 128)               1179776   
_________________________________________________________________
dropout_2 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 10)                1290      
=================================================================
Total params: 1,199,882
Trainable params: 1,199,882
Non-trainable params: 0

具体代码如下:

from keras.layers import DenseDropoutFlattenConv2DMaxPooling2D
from keras import backend as K
import tensorflow as tf
import os
from keras.models import Sequential
from keras.utils import np_utils
import matplotlib.pyplot as plt
import numpy as np
from keras.datasets import mnist

# 获取数据集
(x_trainy_train), (x_testy_test) = mnist.load_data()

# 规范化
img_rowsimg_cols = 2828
if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1img_rowsimg_cols)
    x_test = x_test.reshape(x_test.shape[0], 1img_rowsimg_cols)
    input_shape = (1img_rowsimg_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rowsimg_cols1)
    x_test = x_test.reshape(x_test.shape[0], img_rowsimg_cols1)
    input_shape = (img_rowsimg_cols1)

X_train = x_train.astype('float32')
X_test = x_test.astype('float32')
X_train /= 255
X_test /= 255

# 统计各标签数量
labelcount = np.unique(y_trainreturn_counts=True)
# print(label, count)

# 可视化标签数量
fig = plt.figure()
plt.bar(labelcountwidth=0.7align='center')
plt.title("Label Distribution")
plt.xlabel("Label")
plt.ylabel("Count")
plt.xticks(label)
plt.ylim(07500)
for labelcount in zip(labelcount):
    plt.text(labelcount'%d' % countha='center'va='bottom'fontsize=10)
# plt.show()

# one-hot编码
n_classes = 10
# print('before one-hot:', y_train.shape)
Y_train = np_utils.to_categorical(y_trainn_classes)
# print('after one-hot:', Y_train.shape)
Y_test = np_utils.to_categorical(y_testn_classes)

# 使用 Keras sequential model 定义 MNIST CNN 网络
model = Sequential()
# 第1层卷积,32个3x3的卷积核 ,激活函数使用 relu
model.add(Conv2D(filters=32kernel_size=(33), activation='relu'input_shape=input_shape))

# 第2层卷积,64个3x3的卷积核,激活函数使用 relu
model.add(Conv2D(filters=64kernel_size=(33), activation='relu'))

# 最大池化层,池化窗口 2x2
model.add(MaxPooling2D(pool_size=(22)))

# Dropout 25% 的输入神经元
model.add(Dropout(0.25))

# 将 Pooled feature map 摊平后输入全连接网络
model.add(Flatten())

# 全联接层
model.add(Dense(128activation='relu'))

# Dropout 50% 的输入神经元
model.add(Dropout(0.5))

# 使用 softmax 激活函数做多分类,输出各数字的概率
model.add(Dense(n_classesactivation='softmax'))

model.summary()

for layer in model.layers:
    print(layer.get_output_at(0).get_shape().as_list())

# 编译模型
model.compile(loss='categorical_crossentropy'metrics=['accuracy'], optimizer='adam')
# 训练模型
history = model.fit(
    X_trainY_trainbatch_size=128epochs=5verbose=2validation_data=(X_testY_test)
)

# 可视化指标
# print(history.history)
fig = plt.figure()

plt.subplot(211)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train''test'], loc='lower right')

plt.subplot(212)
plt.plot(history.history['loss'])  # 损失
plt.plot(history.history['val_loss'])  # 测试集上的损失
plt.title('Model Loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train''test'], loc='upper right')
plt.tight_layout()
plt.show()

# 保存模型
save_dir = "./mnist/model/"
if tf.io.gfile.exists(save_dir):
    tf.io.gfile.rmtree(save_dir)
tf.io.gfile.makedirs(save_dir)

model_name = 'keras_mnist.h5'
model_path = os.path.join(save_dirmodel_name)
model.save(model_path)
print('Saved trained model at %s ' % model_path)

利用CNN卷积神经网络训练后,可视化指标如下:
手写数字3

验证码识别

第1步:创建验证码数据集

具体步骤如下:

  1. 引入第三方包ImageCaptcha
  2. 定义常量和字符集(验证码字符集(包括数字/大小字母/小写字母) 验证码参数(长度/高度/宽度) 数据集参数(训练数据集大小/测试数据集大小/训练数据集目录/测试数据集目录))
  3. 定义生成随机字符的方法
  4. 创建并保存验证码数据集的方法
  5. 创建并保存训练集
  6. 创建并保存测试集
  7. 生成并返回验证码数据集的方法
  8. 生成 100 张验证码图像和字符
from captcha.image import ImageCaptcha
import random
import numpy as np
import matplotlib.pyplot as plt
import PIL.Image as Image
import tensorflow as tf

# 定义常量
NUMBER = ['0''1''2''3''4''5''6''7''8''9']
LOWERCASE = ['a''b''c''d''e''f''g''h''i''j''k''l''m''n''o''p''q''r''s''t''u''v''w''x''y''z']
UPPERCASE = ['A''B''C''D''E''F''G''H''I''J''K''L''M''N''O''P''Q''R''S''T''U''V''W''X''Y''Z']

CAPTCHA_CHARSET = NUMBER   # 验证码字符集
CAPTCHA_LEN = 4            # 验证码长度
CAPTCHA_HEIGHT = 60        # 验证码高度
CAPTCHA_WIDTH = 160        # 验证码宽度

TRAIN_DATASET_SIZE = 5000     # 验证码数据集大小
TEST_DATASET_SIZE = 1000
TRAIN_DATA_DIR = './train-data/'  # 验证码数据集目录
TEST_DATA_DIR = './test-data/'


# 生成随机字符
def gen_random_text(charset=CAPTCHA_CHARSETlength=CAPTCHA_LEN):
    text = [random.choice(charset) for _ in range(length)]
    return ''.join(text)

# 创建并保存验证码数据集


def create_captcha_dataset(
        size=100data_dir='./data/'height=60width=160image_format='.png'):
    if tf.io.gfile.exists(data_dir):
        tf.io.gfile.rmtree(data_dir)
    tf.io.gfile.makedirs(data_dir)

    captcha = ImageCaptcha(width=widthheight=height)

    for _ in range(size):
        text = gen_random_text(CAPTCHA_CHARSETCAPTCHA_LEN)
        captcha.write(textdata_dir+text+image_format)

    return None


# 训练集
create_captcha_dataset(TRAIN_DATASET_SIZETRAIN_DATA_DIR)
# 测试集
create_captcha_dataset(TEST_DATASET_SIZETEST_DATA_DIR)


def gen_captcha_dataset(
        size=100height=60width=160image_format='.png'):
    captcha = ImageCaptcha(width=widthheight=height)
    imagestexts = [None]*size, [None]*size

    for i in range(size):
        texts[i] = gen_random_text(CAPTCHA_CHARSETCAPTCHA_LEN)
        images[i] = np.array(Image.open(captcha.generate(texts[i])))

    return imagestexts


# 生成100张验证码图像
imagestexts = gen_captcha_dataset()


# 可视化验证码前20张图片
plt.figure()
for i in range(20):
    plt.subplot(54i+1)
    plt.tight_layout()
    plt.imshow(images[i])
    plt.title("Label: {}".format(texts[i]))
    plt.xticks([])
    plt.yticks([])
plt.show()

最后生成 100 张验证码图像和字符如下:
验证码1

第2步:数据处理

具体步骤:

  1. 读取训练集前 100 张图片,并通过文件名解析验证码(标签)
  2. 数据可视化
  3. 将 RGB 验证码图像转为灰度图
  4. 数据规范化
  5. 适配 Keras 图像数据格式
  6. 对验证码中每个字符进行 one-hot 编码
  7. 将验证码向量解码为对应字符
from PIL import Image
from keras import backend as K
import random
import glob
import numpy as np
import matplotlib.pyplot as plt

NUMBER = ['0''1''2''3''4''5''6''7''8''9']
LOWERCASE = ['a''b''c''d''e''f''g''h''i''j''k''l''m''n''o''p''q''r''s''t''u''v''w''x''y''z']
UPPERCASE = ['A''B''C''D''E''F''G''H''I''J''K''L''M''N''O''P''Q''R''S''T''U''V''W''X''Y''Z']

CAPTCHA_CHARSET = NUMBER   # 验证码字符集
CAPTCHA_LEN = 4            # 验证码长度
CAPTCHA_HEIGHT = 60        # 验证码高度
CAPTCHA_WIDTH = 160        # 验证码宽度

TRAIN_DATA_DIR = './train-data/'  # 验证码数据集目录

# 读取训练集前100张图像
image = []
text = []
count = 0
for filename in glob.glob(TRAIN_DATA_DIR+'*.png'):
    image.append(np.array(Image.open(filename)))
    text.append(filename.lstrip(TRAIN_DATA_DIR).rstrip('.png')[1:])
    count += 1
    if count >= 100:
        break

# 数据可视化
# plt.figure()
# for i in range(20):
#     plt.subplot(5, 4, i+1)
#     plt.tight_layout()
#     plt.imshow(image[i])
#     plt.title("Label: {}".format(text[i]))
#     plt.xticks([])
#     plt.yticks([])
# plt.show()

image = np.array(imagedtype=np.float32)
# print(image.shape)  # (100, 60, 160, 3)

# 将RGB转化为灰度图

def rgb2grey(img):
    return np.dot(img[..., :3], [0.2990.5870.114])

image = rgb2grey(image)
# print(image.shape)  # (100, 60, 160)

# 数据可视化
# plt.figure()
# for i in range(20):
#     plt.subplot(5, 4, i+1)
#     plt.tight_layout()
#     plt.imshow(image[i], cmap='Greys')
#     plt.title("Label: {}".format(text[i]))
#     plt.xticks([])
#     plt.yticks([])
# plt.show()

# 数据规范化
image = image/255
# 适配keras图像数据格式


def fit_keras_channels(batchrows=CAPTCHA_HEIGHTcols=CAPTCHA_WIDTH):
    if K.image_data_format() == 'channels_first':
        batch = batch.reshape(batch.shape[0], 1rowscols)
        input_shape = (1rowscols)
    else:
        batch = batch.reshape(batch.shape[0], rowscols1)
        input_shape = (rowscols1)
    return batchinput_shape


imageinput_shape = fit_keras_channels(image)
# print(image.shape)  # (100, 60, 160, 1)
# print(input_shape)  # (60, 160, 1)


# 对验证码中每个字符进行 one-hot 编码
def text2vec(textlength=CAPTCHA_LENcharset=CAPTCHA_CHARSET):
    text_len = len(text)
    # 验证码长度校验
    if text_len != length:
        raise ValueError(
            'Error: length of captcha should be {}, but got {}'.format(lengthtext_len))

    # 生成一个形如(CAPTCHA_LEN*CAPTHA_CHARSET,) 的一维向量
    # 例如,4个纯数字的验证码生成形如(4*10,)的一维向量
    vec = np.zeros(length * len(charset))
    for i in range(length):
        # One-hot 编码验证码中的每个数字
        # 每个字符的热码 = 索引 + 偏移量
        vec[charset.index(text[i]) + i*len(charset)] = 1
    return vec


text = list(text)
vec = [None]*len(text)
for i in range(len(vec)):
    vec[i] = text2vec(text[i])

# 将验证码向量解码为对应字符
def vec2text(vector):
    if not isinstance(vectornp.ndarray):
        vector = np.asarray(vector)
    vector = np.reshape(vector, [CAPTCHA_LEN-1])
    text = ''
    for item in vector:
        text += CAPTCHA_CHARSET[np.argmax(item)]
    return text

最后生成的数字灰度图效果如下:
验证码2

第3步:训练模型

具体步骤(前6步准备工作已经做过了,主要是要进行7-15步的模型训练过程,16-17步是做对应的保存工作):

  1. 引入第三方包
  2. 定义超参数和字符集
  3. 将 RGB 验证码图像转为灰度图
  4. 对验证码中每个字符进行 one-hot 编码
  5. 将验证码向量解码为对应字符
  6. 适配 Keras 图像数据格式
  7. 读取训练集
  8. 处理训练集图像
  9. 处理训练集标签
  10. 读取测试集,处理对应图像和标签
  11. 创建验证码识别模型
  12. 查看模型摘要
  13. 模型可视化
  14. 训练模型
  15. 预测样例
  16. 保存模型
  17. 保存训练过程记录
from PIL import Image
from keras import backend as K
from keras.utils.vis_utils import plot_model
from keras.models import *
from keras.layers import *
import glob
import pickle
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf


NUMBER = ['0''1''2''3''4''5''6''7''8''9']
LOWERCASE = ['a''b''c''d''e''f''g''h''i''j''k''l''m''n''o''p''q''r''s''t''u''v''w''x''y''z']
UPPERCASE = ['A''B''C''D''E''F''G''H''I''J''K''L''M''N''O''P''Q''R''S''T''U''V''W''X''Y''Z']

CAPTCHA_CHARSET = NUMBER   # 验证码字符集
CAPTCHA_LEN = 4            # 验证码长度
CAPTCHA_HEIGHT = 60        # 验证码高度
CAPTCHA_WIDTH = 160        # 验证码宽度

TRAIN_DATA_DIR = './train-data/'  # 验证码数据集目录
TEST_DATA_DIR = './test-data/'

BATCH_SIZE = 100
EPOCHS = 10
OPT = 'adam'
LOSS = 'binary_crossentropy'

MODEL_DIR = './model/train_demo/'
MODEL_FORMAT = '.h5'
HISTORY_DIR = './history/train_demo/'
HISTORY_FORMAT = '.history'

filename_str = "{}captcha_{}_{}_bs_{}_epochs_{}{}"

# 模型网络结构文件
MODEL_VIS_FILE = 'captcha_classfication' + '.png'
# 模型文件
MODEL_FILE = filename_str.format(
    MODEL_DIROPTLOSSstr(BATCH_SIZE), str(EPOCHS), MODEL_FORMAT)
# 训练记录文件
HISTORY_FILE = filename_str.format(
    HISTORY_DIROPTLOSSstr(BATCH_SIZE), str(EPOCHS), HISTORY_FORMAT)


def rgb2gray(img):
    return np.dot(img[..., :3], [0.2990.5870.114])


# 对验证码中每个字符进行 one-hot 编码


def text2vec(textlength=CAPTCHA_LENcharset=CAPTCHA_CHARSET):
    text_len = len(text)
    # 验证码长度校验
    if text_len != length:
        raise ValueError(
            'Error: length of captcha should be {}, but got {}'.format(lengthtext_len))

    # 生成一个形如(CAPTCHA_LEN*CAPTHA_CHARSET,) 的一维向量
    # 例如,4个纯数字的验证码生成形如(4*10,)的一维向量
    vec = np.zeros(length * len(charset))
    for i in range(length):
        # One-hot 编码验证码中的每个数字
        # 每个字符的热码 = 索引 + 偏移量
        vec[charset.index(text[i]) + i*len(charset)] = 1
    return vec

# 将验证码向量解码为对应字符


def vec2text(vector):
    if not isinstance(vectornp.ndarray):
        vector = np.asarray(vector)
    vector = np.reshape(vector, [CAPTCHA_LEN-1])
    text = ''
    for item in vector:
        text += CAPTCHA_CHARSET[np.argmax(item)]
    return text


def fit_keras_channels(batchrows=CAPTCHA_HEIGHTcols=CAPTCHA_WIDTH):
    if K.image_data_format() == 'channels_first':
        batch = batch.reshape(batch.shape[0], 1rowscols)
        input_shape = (1rowscols)
    else:
        batch = batch.reshape(batch.shape[0], rowscols1)
        input_shape = (rowscols1)
    return batchinput_shape

# 读取训练集


X_train = []
Y_train = []
for filename in glob.glob(TRAIN_DATA_DIR + '*.png'):
    X_train.append(np.array(Image.open(filename)))
    Y_train.append(filename.lstrip(TRAIN_DATA_DIR).rstrip('.png')[1:])
X_train = np.array(X_traindtype=np.float32)
X_train = rgb2gray(X_train)
X_train = X_train / 255
X_traininput_shape = fit_keras_channels(X_train)
# (3948, 60, 160, 1) <class 'numpy.ndarray'>
print(X_train.shape, type(X_train))
print(input_shape)  # (60, 160, 1)

# 处理训练集标签
Y_train = list(Y_train)
for i in range(len(Y_train)):
    Y_train[i] = text2vec(Y_train[i])
Y_train = np.asarray(Y_train)
print(Y_train.shape, type(Y_train))


# 读取测试集,处理对应图像和标签
X_test = []
Y_test = []
for filename in glob.glob(TEST_DATA_DIR + '*.png'):
    X_test.append(np.array(Image.open(filename)))
    Y_test.append(filename.lstrip(TEST_DATA_DIR).rstrip('.png')[1:])
# list -> rgb -> gray -> normalization -> fit keras
X_test = np.array(X_testdtype=np.float32)
X_test = rgb2gray(X_test)
X_test = X_test / 255
X_test, _ = fit_keras_channels(X_test)
Y_test = list(Y_test)
for i in range(len(Y_test)):
    Y_test[i] = text2vec(Y_test[i])
Y_test = np.asarray(Y_test)
print(X_test.shape, type(X_test))
print(Y_test.shape, type(Y_test))


# 创建验证码识别模型
inputs = Input(shape=input_shapename="inputs")
# 第1层卷积
conv1 = Conv2D(32, (33), name="conv1")(inputs)
relu1 = Activation('relu'name="relu1")(conv1)

# 第2层卷积
conv2 = Conv2D(32, (33), name="conv2")(relu1)
relu2 = Activation('relu'name="relu2")(conv2)
pool2 = MaxPooling2D(pool_size=(22), padding='same'name="pool2")(relu2)

# 第3层卷积
conv3 = Conv2D(64, (33), name="conv3")(pool2)
relu3 = Activation('relu'name="relu3")(conv3)
pool3 = MaxPooling2D(pool_size=(22), padding='same'name="pool3")(relu3)

# 将 Pooled feature map 摊平后输入全连接网络
x = Flatten()(pool3)

# Dropout
x = Dropout(0.25)(x)

# 4个全连接层分别做10分类,分别对应4个字符。
x = [Dense(10activation='softmax'name='fc%d' % (i+1))(x) for i in range(4)]

# 4个字符向量拼接在一起,与标签向量形式一致,作为模型输出。
outs = Concatenate()(x)

# 定义模型的输入与输出
model = Model(inputs=inputsoutputs=outs)
model.compile(optimizer=OPTloss=LOSSmetrics=['accuracy'])

model.summary()
plot_model(modelto_file=MODEL_VIS_FILEshow_shapes=True)
history = model.fit(X_trainY_trainbatch_size=BATCH_SIZEepochs=EPOCHSverbose=2validation_data=(X_testY_test))

print(vec2text(Y_test[9]))
yy = model.predict(X_test[9].reshape(1601601))
print(vec2text(yy))

if not tf.io.gfile.exists(MODEL_DIR):
    tf.io.gfile.makedirs(MODEL_DIR)
model.save(MODEL_DIR)
print('Saved trained model at %s ' % MODEL_FILE)

训练模型得到的参数如下:


Train on 3956 samples, validate on 954 samples
Epoch 1/10
 - 149s - loss: 0.3270 - acc: 0.9000 - val_loss: 0.3247 - val_acc: 0.9000
Epoch 2/10
 - 122s - loss: 0.3229 - acc: 0.9000 - val_loss: 0.3195 - val_acc: 0.9000
Epoch 3/10
 - 114s - loss: 0.2987 - acc: 0.9004 - val_loss: 0.2726 - val_acc: 0.9028
Epoch 4/10
 - 106s - loss: 0.2257 - acc: 0.9164 - val_loss: 0.2303 - val_acc: 0.9171
Epoch 5/10
 - 103s - loss: 0.1799 - acc: 0.9337 - val_loss: 0.2171 - val_acc: 0.9209
Epoch 6/10
 - 113s - loss: 0.1523 - acc: 0.9447 - val_loss: 0.2062 - val_acc: 0.9254
Epoch 7/10
 - 112s - loss: 0.1383 - acc: 0.9498 - val_loss: 0.2048 - val_acc: 0.9260
Epoch 8/10
 - 127s - loss: 0.1251 - acc: 0.9550 - val_loss: 0.2052 - val_acc: 0.9260
Epoch 9/10
 - 161s - loss: 0.1144 - acc: 0.9587 - val_loss: 0.2013 - val_acc: 0.9285
Epoch 10/10
 - 159s - loss: 0.1063 - acc: 0.9618 - val_loss: 0.2045 - val_acc: 0.9268

实现词云

要实现词云效果,首先需要安装wordcloud库,pip install wordcloud进行安装即可。

1. 英文词云

实现基本的英文词云:

# -*- coding: utf-8 -*-

from wordcloud import WordCloud
import matplotlib.pyplot as plt

# 打开文本
text = open('./text/constitution.txt').read()
# 生成对象
wc = WordCloud().generate(text)

# 显示词云
plt.imshow(wcinterpolation='bilinear')
plt.axis('off')
plt.show()

# 保存到文件
wc.to_file('./img/word1.png')

效果展示:

word1

2. 中文词云

# -*- coding: utf-8 -*-

from wordcloud import WordCloud
import matplotlib.pyplot as plt

# 打开文本
text = open('./text/xyj.txt'encoding='UTF-8').read()
# 生成对象
wc = WordCloud(font_path='Hiragino.ttf'width=800height=600mode='RGBA'background_color=None).generate(text)

# 显示词云
plt.imshow(wcinterpolation='bilinear')
plt.axis('off')
plt.show()

# 保存到文件
wc.to_file('./img/word2.png')

其中读取文件使用text = open('xxx.txt',encoding='UTF-8').read(),注意这里要写encoding='UTF-8',否则无法正确读取内容。要额外引入字体文件,如上例引入了Hiragino.ttf字体。最后生成的效果如下,但是有个问题就是每个词并不是按照中文意思进行断开的,无实际意义,接下来我们处理中文分词问题。

word2

3. 中文词云+分词

# -*- coding: utf-8 -*-

from wordcloud import WordCloud
import matplotlib.pyplot as plt
import jieba

# 打开文本
text = open('./text/xyj.txt'encoding='UTF-8').read()

# 中文分词
text=' '.join(jieba.cut(text))

# 生成对象
wc = WordCloud(font_path='Hiragino.ttf'width=800height=600mode='RGBA'background_color=None).generate(text)

# 显示词云
plt.imshow(wcinterpolation='bilinear')
plt.axis('off')
plt.show()

# 保存到文件
wc.to_file('./img/word3.png')

因为英文文章中每个单词都是使用空格隔开的,因此不需要手动分词,而中文文章每个词是连在一起的,需要引入第三方包jieba进行中文分词操作。核心代码是text=' '.join(jieba.cut(text)),这样可以使得每个独立的词用空格隔开。最后生成的效果如下:

word3

4. 中文词云+分词+黑白蒙版

# -*- coding: utf-8 -*-

from wordcloud import WordCloud
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import jieba

# 打开文本
text = open('./text/xyj.txt'encoding='UTF-8').read()

# 中文分词
text=' '.join(jieba.cut(text))

# 启用黑白蒙版
mask=np.array(Image.open('./mask/black_mask.png'))
wc = WordCloud(mask=maskfont_path='Hiragino.ttf'width=800height=600mode='RGBA'background_color=None).generate(text)

# 显示词云
plt.imshow(wcinterpolation='bilinear')
plt.axis('off')
plt.show()

# 保存到文件
wc.to_file('./img/word4.png')

这里使用了黑白图片作为蒙版,WordCloud()函数中传入mask参数,则可以启用对应的蒙版,这样生成的词云会与蒙版的图形相同。效果如下:

black_mask

word4

5. 中文词云+分词+彩色蒙版

# -*- coding: utf-8 -*-

from wordcloud import WordCloudImageColorGenerator
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import jieba

# 打开文本
text = open('./text/xyj.txt'encoding='UTF-8').read()

# 中文分词
text=' '.join(jieba.cut(text))

# 启用彩色蒙版
mask=np.array(Image.open('./mask/color_mask.png'))
wc = WordCloud(mask=maskfont_path='Hiragino.ttf'width=800height=600mode='RGBA'background_color=None).generate(text)

# 从图片中生成颜色
image_colors=ImageColorGenerator(mask)
wc.recolor(color_func=image_colors)

# 显示词云
plt.imshow(wcinterpolation='bilinear')
plt.axis('off')
plt.show()

# 保存到文件
wc.to_file('./img/word5.png')

这里使用了彩色图片作为蒙版,从wordcloud引入ImageColorGenerator函数,从图片中生成颜色,这样生成的词云颜色和蒙版的颜色是相同的(每个部分的颜色都是大致对应的),效果如下:

color_mask

word5

6. 中文词云+分词+彩色蒙版+自定义颜色

# -*- coding: utf-8 -*-

from wordcloud import WordCloud
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import random
import jieba

# 打开文本
text = open('./text/xyj.txt'encoding='UTF-8').read()

# 中文分词
text=' '.join(jieba.cut(text))

# 自定义颜色函数
def random_color(wordfont_sizepositionorientationfont_pathrandom_state):
	s = 'hsl(0, %d%%, %d%%)' % (random.randint(6080), random.randint(6080))
	return s

# 启用彩色蒙版
mask=np.array(Image.open('./mask/color_mask.png'))
wc = WordCloud(color_func=random_colormask=maskfont_path='Hiragino.ttf'width=800height=600mode='RGBA'background_color=None).generate(text)

# 显示词云
plt.imshow(wcinterpolation='bilinear')
plt.axis('off')
plt.show()

# 保存到文件
wc.to_file('./img/word6.png')

为了实现为词云自定义颜色,我们可以单独实现一个上色函数random_color,在WordCloud()函数中传入color_func参数即可,效果如下:
word6

7. 中文词云+分词+彩色蒙版+关键词权重

# -*- coding: utf-8 -*-

from wordcloud import WordCloudImageColorGenerator
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import jieba.analyse

# 打开文本
text = open('./text/xyj.txt'encoding='UTF-8').read()

# 提取关键词和权重
freq=jieba.analyse.extract_tags(texttopK=200withWeight=True)
freq = {i[0]: i[1] for i in freq}

# 中文分词
text=' '.join(jieba.cut(text))

# 启用彩色蒙版
mask=np.array(Image.open('./mask/color_mask.png'))
wc = WordCloud(mask=maskfont_path='Hiragino.ttf'width=800height=600mode='RGBA'background_color=None).generate_from_frequencies(freq)

# 从图片中生成颜色
image_colors=ImageColorGenerator(mask)
wc.recolor(color_func=image_colors)

# 显示词云
plt.imshow(wcinterpolation='bilinear')
plt.axis('off')
plt.show()

# 保存到文件
wc.to_file('./img/word7.png')

为了在词云中凸显词语出现的频率,我们可以采用根据频率上色的方法,频率出现越高则着色越深。先提取关键词和权重,再通过WordCloud().generate_from_frequencies(freq)生成词云即可,注意这里的freq是词与频次关系的字典。最后生成的效果如下:
word7

自编码器图像去噪AE

自编码器深度学习中的一类无监督学习模型,由encoder和decoder两个部分组成。自编码器主要是一种思想,encoder和decoder可以由全连接层\CNN\RNN等模型实现。

第1步:完成模型训练,并保存模型

具体步骤:

  1. 获取训练集和测试集
  2. 随机添加噪声点
  3. 可视化噪声图
  4. 构建模型
  5. 训练模型
  6. 保存模型
# -*- coding: utf-8 -*-

from keras.datasets import mnist
import numpy as np
import matplotlib.pyplot as plt
from keras.layers import InputDenseConv2DMaxPooling2DUpSampling2D
from keras.models import Modelload_model

# 获取训练集和测试集
(x_train, _), (x_test, _) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = np.reshape(x_train, (len(x_train), 28281))
x_test = np.reshape(x_test, (len(x_test), 28281))

# 随机添加噪声点
noise_factor = 0.5
x_train_noisy = x_train + noise_factor * np.random.normal(loc=0.0scale=1.0size=x_train.shape) 
x_test_noisy = x_test + noise_factor * np.random.normal(loc=0.0scale=1.0size=x_test.shape) 
x_train_noisy = np.clip(x_train_noisy0.1.)
x_test_noisy = np.clip(x_test_noisy0.1.)

# 可视化噪声图
n = 10
plt.figure(figsize=(202))
for i in range(n):
    ax = plt.subplot(1ni + 1)
    plt.imshow(x_test_noisy[i].reshape(2828))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()

# 构建模型
input_img = Input(shape=(28281,)) # N * 28 * 28 * 1
x = Conv2D(32, (33), padding='same'activation='relu')(input_img) # 28 * 28 * 32
x = MaxPooling2D((22), padding='same')(x) # 14 * 14 * 32
x = Conv2D(32, (33), padding='same'activation='relu')(x) # 14 * 14 * 32
encoded = MaxPooling2D((22), padding='same')(x) # 7 * 7 * 32
x = Conv2D(32, (33), padding='same'activation='relu')(encoded) # 7 * 7 * 32
x = UpSampling2D((22))(x) # 14 * 14 * 32
x = Conv2D(32, (33), padding='same'activation='relu')(x) # 14 * 14 * 32
x = UpSampling2D((22))(x) # 28 * 28 * 32
decoded = Conv2D(1, (33), padding='same'activation='sigmoid')(x) # 28 * 28 * 1

# 训练模型
autoencoder = Model(input_imgdecoded)
autoencoder.compile(optimizer='adadelta'loss='binary_crossentropy')
autoencoder.fit(x_train_noisyx_trainepochs=100batch_size=128shuffle=Truevalidation_data=(x_test_noisyx_test))
# 保存模型
autoencoder.save('autoencoder.h5')

可视化噪声图片效果如下:
自编码1

第2步:加载训练好的模型,用来图像去噪

from keras.datasets import mnist
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Modelload_model

# 加载训练好了的模型
autoencoder = load_model('autoencoder.h5')

# 获取训练集和测试集
(x_train, _), (x_test, _) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = np.reshape(x_train, (len(x_train), 28281))
x_test = np.reshape(x_test, (len(x_test), 28281))

# 随机添加噪声点
noise_factor = 0.5
x_train_noisy = x_train + noise_factor * np.random.normal(loc=0.0scale=1.0size=x_train.shape) 
x_test_noisy = x_test + noise_factor * np.random.normal(loc=0.0scale=1.0size=x_test.shape) 
x_train_noisy = np.clip(x_train_noisy0.1.)
x_test_noisy = np.clip(x_test_noisy0.1.)

decoded_imgs = autoencoder.predict(x_test_noisy)
n = 10
plt.figure(figsize=(204))
for i in range(n):
    # 展示原始噪声图
    ax = plt.subplot(2ni + 1)
    plt.imshow(x_test_noisy[i].reshape(2828))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
 
    # 展示去噪后的图片
    ax = plt.subplot(2ni + 1 + n)
    plt.imshow(decoded_imgs[i].reshape(2828))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()

原始噪声图 vs 去噪后的图片 的效果如下:
自编码2

变分自编码器VAE

我们经常会有这样的需求:根据很多个样本,学会生成新的样本

以mnist为例,在看过成百上千张图片后,让计算机能够模仿生成一些类似的图片,这些图片在原始数据中并不存在,但是与原来图片看起来相似

简言之就是需要学会数据x的分布,根据数据分布产生新样本

VAE(变分自编码器)和AE(自编码器)的区别:

  1. AE中隐层表示的分布未知,而VAE中隐层变量服从高斯分布
  2. AE中学习的是encoder和decoder,VAE中还学习隐变量的分布,包括高斯分布的均值和方差
  3. AE只能从1个x得到对应的重构x
  4. VAE可以产生新的z,从而得到新的x,即生成新的样本(VAE是一种常见的生成式模型)

可以使用keras.datasets中的mnistfashion_mnist进行测试:

# -*- coding: utf-8 -*-

import numpy as np
import matplotlib.pyplot as plt

from keras.layers import InputDenseLambda
from keras.models import Model
from keras import backend as K
from keras import objectives
from keras.datasets import mnist
# from keras.datasets import fashion_mnist

# 定义常数
batch_size = 100
original_dim = 784 # 28*28
intermediate_dim = 256
latent_dim = 2
epochs = 50

x = Input(shape=(original_dim,)) # 维度28*28,数据数量不确定
h = Dense(intermediate_dimactivation='relu')(x) # 输出维度是256
z_mean = Dense(latent_dim)(h) # 全连接层
z_log_var = Dense(latent_dim)(h) # 全连接层

# 根据均值和方差生成z
def sampling(args):
    z_meanz_log_var = args
    epsilon = K.random_normal(shape=(batch_sizelatent_dim), mean=0.)
    return z_mean + K.exp(z_log_var / 2) * epsilon
# Lambda层不参与训练,只参与计算,用于后面产生新的z
z = Lambda(samplingoutput_shape=(latent_dim,))([z_meanz_log_var])

# decoder部分包含两个全连接层
decoder_h = Dense(intermediate_dimactivation='relu')
decoder_mean = Dense(original_dimactivation='sigmoid')
h_decoded = decoder_h(z)
x_decoded_mean = decoder_mean(h_decoded)

# 总的损失函数,计算交叉熵
def vae_loss(xx_decoded_mean):
    xent_loss = original_dim * objectives.binary_crossentropy(xx_decoded_mean)
    kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
    return xent_loss + kl_loss

# 这里使用rmsprop优化算法和手写的loss函数vae_loss
vae = Model(xx_decoded_mean)
vae.compile(optimizer='rmsprop'loss=vae_loss)

# 加载并训练数据
(x_trainy_train), (x_testy_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

vae.fit(x_trainx_trainshuffle=Trueepochs=epochsbatch_size=batch_sizevalidation_data=(x_testx_test))


# 定义一个encoder,看看mnist中的数据在隐层中是怎么样的,前面将latent_dim定位为2,是为了输出二维平面化图
encoder = Model(xz_mean)

x_test_encoded = encoder.predict(x_testbatch_size=batch_size)
plt.figure(figsize=(66))
plt.scatter(x_test_encoded[:, 0], x_test_encoded[:, 1], c=y_test)
plt.colorbar()
plt.show()

# 定义生成器
decoder_input = Input(shape=(latent_dim,))
_h_decoded = decoder_h(decoder_input)
_x_decoded_mean = decoder_mean(_h_decoded)
generator = Model(decoder_input_x_decoded_mean)

# 验证生成器能生成什么样的图片
n = 20
digit_size = 28
figure = np.zeros((digit_size * ndigit_size * n))
grid_x = np.linspace(-44n)
grid_y = np.linspace(-44n)

for ixi in enumerate(grid_x):
    for jyi in enumerate(grid_y):
        z_sample = np.array([[yixi]])
        x_decoded = generator.predict(z_sample)
        digit = x_decoded[0].reshape(digit_sizedigit_size)
        figure[(n - i - 1) * digit_size: (n - i) * digit_sizej * digit_size: (j + 1) * digit_size] = digit

plt.figure(figsize=(1010))
plt.imshow(figure)
plt.show()

encoder的可视化效果(mnist中的数据在隐层中的形态):

变分自编码器1

验证生成器能生成什么样的图片:

变分自编码器2

生成式对抗网络GAN

除VAE以外,生成式对抗网络(GAN)也是一种非常流行的无监督生成式模型.

GAN中包括两个核心网络:

  1. 生成器(generator):记作G,通过对大量样本的学习,能够生成一些以假乱真的样本,和VAE类似
  2. 判别器(discriminator):记作D,接受真实样本和G生成的样本,并进行判别和区分
  3. G和D相互博弈,通过学习,G的生成能力和D的判别能力都逐渐增强并收敛

GAN的训练非常困难,有很多需要注意的细节,才能生成质量较高的图片:

  1. 恰当地使用BN(Batch Normalization) / LeakyReLU等
  2. 用strides为2的卷积代替池化
  3. 交替训练,避免一方过强

完整代码如下:

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import osimageio

# 加载手写数据集
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data')

# 定义参数
batch_size = 100 # 每次训练的样本数
z_dim = 100 # 输出大小
OUTPUT_DIR = 'samples' # 输出目录
if not os.path.exists(OUTPUT_DIR):
    os.mkdir(OUTPUT_DIR)

# 定义X、noise、is_training变量
X = tf.placeholder(dtype=tf.float32shape=[None28281], name='X')
noise = tf.placeholder(dtype=tf.float32shape=[Nonez_dim], name='noise')
is_training = tf.placeholder(dtype=tf.boolname='is_training')

# 激活函数leakyRelu
def lrelu(xleak=0.2):
    return tf.maximum(xleak * x)

# 损失函数
def sigmoid_cross_entropy_with_logits(xy):
    return tf.nn.sigmoid_cross_entropy_with_logits(logits=xlabels=y)


# 关键点---判别器/生成器的定义

# 1 判别器
def discriminator(imagereuse=Noneis_training=is_training):
    momentum = 0.9 # 动量
    
    with tf.variable_scope('discriminator'reuse=reuse):

        # 卷积开始,filters越来越多,图片越来越小
        # h0: -1,28,28,1
        # h1: -1,24,24,64
        # h2: -1,12,12,128
        # h3: -1,6,6,256
        # h4: -1,3,3,512
        # h4作为判别器输出
        
        h0 = lrelu(tf.layers.conv2d(imagekernel_size=5filters=64strides=2padding='same'))
        
        h1 = tf.layers.conv2d(h0kernel_size=5filters=128strides=2padding='same')
        # batch_norm转化为标准的高斯分布,指数加权滑动平均算法,decay是衰减系数
        h1 = lrelu(tf.contrib.layers.batch_norm(h1is_training=is_trainingdecay=momentum))
        
        h2 = tf.layers.conv2d(h1kernel_size=5filters=256strides=2padding='same')
        h2 = lrelu(tf.contrib.layers.batch_norm(h2is_training=is_trainingdecay=momentum))
        
        h3 = tf.layers.conv2d(h2kernel_size=5filters=512strides=2padding='same')
        h3 = lrelu(tf.contrib.layers.batch_norm(h3is_training=is_trainingdecay=momentum))
        
        h4 = tf.contrib.layers.flatten(h3)
        h4 = tf.layers.dense(h4units=1)
        # 返回经过sigmoid处理后的h4和未被激活的h4
        return tf.nn.sigmoid(h4), h4

# 2 生成器(输入的z是噪音,为二维tensor)
def generator(zis_training=is_training):
    momentum = 0.9
    with tf.variable_scope('generator'reuse=None):
        d = 3

        # 逆卷积开始,filters越来越少
        # h0: -1,3,3,512
        # h1: -1,6,6,256
        # h2: -1,12,12,128
        # h3: -1,24,24,64
        # h4: -1,28,28,1
        # h4作为生成器的输出

        h0 = tf.layers.dense(zunits=d * d * 512)
        h0 = tf.reshape(h0shape=[-1dd512])
        h0 = tf.nn.relu(tf.contrib.layers.batch_norm(h0is_training=is_trainingdecay=momentum))
        
        h1 = tf.layers.conv2d_transpose(h0kernel_size=5filters=256strides=2padding='same')
        h1 = tf.nn.relu(tf.contrib.layers.batch_norm(h1is_training=is_trainingdecay=momentum))
        
        h2 = tf.layers.conv2d_transpose(h1kernel_size=5filters=128strides=2padding='same')
        h2 = tf.nn.relu(tf.contrib.layers.batch_norm(h2is_training=is_trainingdecay=momentum))
        
        h3 = tf.layers.conv2d_transpose(h2kernel_size=5filters=64strides=2padding='same')
        h3 = tf.nn.relu(tf.contrib.layers.batch_norm(h3is_training=is_trainingdecay=momentum))
        
        h4 = tf.layers.conv2d_transpose(h3kernel_size=5filters=1strides=1padding='valid'activation=tf.nn.tanhname='g')
        return h4

g = generator(noise) # 生成的假图片
d_reald_real_logits = discriminator(X) # 真图片激活后h4和未激活h4的值
d_faked_fake_logits = discriminator(greuse=True) # 假图片激活后h4和未激活h4的值

vars_g = [var for var in tf.trainable_variables() if var.name.startswith('generator')] # 和generator相关的参数
vars_d = [var for var in tf.trainable_variables() if var.name.startswith('discriminator')] # 和discriminator相关的参数

loss_d_real = tf.reduce_mean(sigmoid_cross_entropy_with_logits(d_real_logitstf.ones_like(d_real))) # 真图片导致的判别器损失
loss_d_fake = tf.reduce_mean(sigmoid_cross_entropy_with_logits(d_fake_logitstf.zeros_like(d_fake))) # 假图片导致的判别器损失
loss_g = tf.reduce_mean(sigmoid_cross_entropy_with_logits(d_fake_logitstf.ones_like(d_fake))) # 生成器损失
loss_d = loss_d_real + loss_d_fake # 判别器损失(真图片+假图片)


# 优化函数
# 先完成update_ops的相关操作(如BN的参数更新),再完成后续的优化操作
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
    optimizer_d = tf.train.AdamOptimizer(learning_rate=0.0002beta1=0.5).minimize(loss_dvar_list=vars_d)
    optimizer_g = tf.train.AdamOptimizer(learning_rate=0.0002beta1=0.5).minimize(loss_gvar_list=vars_g)

# 辅助函数,用于将多张图片以网格状拼接在一起
def montage(images):
    if isinstance(imageslist):
        images = np.array(images)
    img_h = images.shape[1]
    img_w = images.shape[2]
    n_plots = int(np.ceil(np.sqrt(images.shape[0])))
    m = np.ones((images.shape[1] * n_plots + n_plots + 1images.shape[2] * n_plots + n_plots + 1)) * 0.5
    for i in range(n_plots):
        for j in range(n_plots):
            this_filter = i * n_plots + j
            if this_filter < images.shape[0]:
                this_img = images[this_filter]
                m[1 + i + i * img_h:1 + i + (i + 1) * img_h1 + j + j * img_w:1 + j + (j + 1) * img_w] = this_img
    return m


# 开始训练(需要交替训练,如每次迭代训练G两次)
sess = tf.Session()
sess.run(tf.global_variables_initializer())
z_samples = np.random.uniform(-1.01.0, [batch_sizez_dim]).astype(np.float32)
samples = []
loss = {'d': [], 'g': []}

for i in range(60000):
    # 产生随机noise
    n = np.random.uniform(-1.01.0, [batch_sizez_dim]).astype(np.float32)
    # 依次取数据
    batch = mnist.train.next_batch(batch_size=batch_size)[0]
    batch = np.reshape(batch, [-128281])
    # batch是0~1(relu),我们要将它映射到-1~1(tanh的取值范围)
    batch = (batch - 0.5) * 2 
    
    d_lsg_ls = sess.run([loss_dloss_g], feed_dict={X: batchnoise: nis_training: True})
    loss['d'].append(d_ls)
    loss['g'].append(g_ls)
    
    #依次训练D-G-G(判别器训练1次,生成器训练2次)
    sess.run(optimizer_dfeed_dict={X: batchnoise: nis_training: True})
    sess.run(optimizer_gfeed_dict={X: batchnoise: nis_training: True})
    sess.run(optimizer_gfeed_dict={X: batchnoise: nis_training: True})
    
    # 每迭代1000轮,打印样本
    if i % 1000 == 0:
        print(id_lsg_ls)
        gen_imgs = sess.run(gfeed_dict={noise: z_samplesis_training: False})
        # -1~1转0~1
        gen_imgs = (gen_imgs + 1) / 2
        imgs = [img[:, :, 0] for img in gen_imgs]
        gen_imgs = montage(imgs)
        plt.axis('off')
        plt.imshow(gen_imgscmap='gray')
        plt.savefig(os.path.join(OUTPUT_DIR'sample_%d.jpg' % i))
        plt.show()
        samples.append(gen_imgs)

plt.plot(loss['d'], label='Discriminator')
plt.plot(loss['g'], label='Generator')
plt.legend(loc='upper right')
plt.savefig('Loss.png')
plt.show()
imageio.mimsave(os.path.join(OUTPUT_DIR'samples.gif'), samplesfps=5)

# 保存模型
saver = tf.train.Saver()
saver.save(sess'./mnist_dcgan'global_step=60000)

图片经过卷积的基本结构变化如下:

GAN1

生成的手写数字图片的动图效果为:

GAN2

训练好模型后,可直接加载模型,自动生成类似图片:

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

batch_size = 100
z_dim = 100

def montage(images):
    if isinstance(imageslist):
        images = np.array(images)
    img_h = images.shape[1]
    img_w = images.shape[2]
    n_plots = int(np.ceil(np.sqrt(images.shape[0])))
    m = np.ones((images.shape[1] * n_plots + n_plots + 1images.shape[2] * n_plots + n_plots + 1)) * 0.5
    for i in range(n_plots):
        for j in range(n_plots):
            this_filter = i * n_plots + j
            if this_filter < images.shape[0]:
                this_img = images[this_filter]
                m[1 + i + i * img_h:1 + i + (i + 1) * img_h1 + j + j * img_w:1 + j + (j + 1) * img_w] = this_img
    return m

sess = tf.Session()
sess.run(tf.global_variables_initializer())

saver = tf.train.import_meta_graph('./mnist_dcgan-60000.meta')
saver.restore(sesstf.train.latest_checkpoint('./'))

graph = tf.get_default_graph()
g = graph.get_tensor_by_name('generator/g/Tanh:0')
noise = graph.get_tensor_by_name('noise:0')
is_training = graph.get_tensor_by_name('is_training:0')

n = np.random.uniform(-1.01.0, [batch_sizez_dim]).astype(np.float32)
gen_imgs = sess.run(gfeed_dict={noise: nis_training: False})
gen_imgs = (gen_imgs + 1) / 2
imgs = [img[:, :, 0] for img in gen_imgs]
gen_imgs = montage(imgs)
plt.axis('off')
plt.imshow(gen_imgscmap='gray')
plt.show()

Inception-v3图片分类

Inception-v3是由Google提出,用于实现ImageNet大规模视觉识别任务的一种神经网络。Inception-v3反复使用了Inception Block,涉及大量的卷积和池化. 这里我们选择加载pre-trained的Inception-v3模型,来完成一些图片分类任务。

Inception-v3的模型结构如下:

Inception-v3

训练好的模型包括3个部分:

  1. classify_image_graph_def.pb: Inception-v3模型结构和参数
  2. imagenet_2012_challenge_label_map_proto.pbtxt: 从类别编码到类别字符串的对应关系
  3. imagenet_synset_to_human_label_map.txt: 从类别字符串到类别名的对应关系
# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np

# 字符串id到name的映射字典
# n00004475	organism, being
# n00005787	benthos
# n00006024	heterotroph
# ...
uid_to_human = {}
for line in tf.gfile.GFile('imagenet_synset_to_human_label_map.txt').readlines():
	items = line.strip().split('\t')
	uid_to_human[items[0]] = items[1]

# node到字符串id的映射字典
# entry {
#   target_class: 449
#   target_class_string: "n01440764"
# }
# entry {
#   target_class: 450
#   target_class_string: "n01443537"
# }
node_id_to_uid = {}
for line in tf.gfile.GFile('imagenet_2012_challenge_label_map_proto.pbtxt').readlines():
	if line.startswith('  target_class:'):
		target_class = int(line.split(': ')[1])
	if line.startswith('  target_class_string:'):
		target_class_string = line.split(': ')[1].strip('\n').strip('\"')
		node_id_to_uid[target_class] = target_class_string


# node到name的映射字典(很具上述得到的node_id_to_uid和uid_to_human生成)
node_id_to_name = {}
for keyvalue in node_id_to_uid.items():
	node_id_to_name[key] = uid_to_human[value]


# 加载模型
def create_graph():
	with tf.gfile.FastGFile('classify_image_graph_def.pb''rb') as f:
		graph_def = tf.GraphDef()
		graph_def.ParseFromString(f.read())
		_ = tf.import_graph_def(graph_defname='')


# 分类图片的函数
def classify_image(imagetop_k=1):
	image_data = tf.gfile.FastGFile(image'rb').read()

	create_graph()

	with tf.Session() as sess:
		# 'softmax:0': A tensor containing the normalized prediction across 1000 labels
		# 'pool_3:0': A tensor containing the next-to-last layer containing 2048 float description of the image
		# 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG encoding of the image
		softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
		predictions = sess.run(softmax_tensorfeed_dict={'DecodeJpeg/contents:0': image_data})
		predictions = np.squeeze(predictions)

		top_k = predictions.argsort()[-top_k:]
		for node_id in top_k:
			human_string = node_id_to_name[node_id]
			score = predictions[node_id]
			print('%s (score = %.5f)' % (human_stringscore))

classify_image('./img/test3.png')

定制分类任务

Inception-v3是针对ImageNet图片分类设计的,因此最后一层全连接层的神经元个数和分类标签的个数相同。如果需要特别定制分类任务的话,只需要使用自己的标注数据,然后替换掉最后一层全连接层即可。

最后一层全连接层的神经元个数等于定制分类任务的标签个数,模型只训练最后一层的参数,其他参数保持不变。这样的话保留了Inception-v3对于图像的理解和抽象能力,同时满足了定制的分类任务,属于迁移学习的一种典型应用场景。

@iloveyou11 iloveyou11 changed the title 机器学习项目实战 机器学习项目实战——tensorflow python框架 May 17, 2021
@iloveyou11 iloveyou11 changed the title 机器学习项目实战——tensorflow python框架 机器学习项目tensorflow实战 May 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant