TensorFlow监督学习实战笔记:图像分类、文本分类和模型保存

TensorFlow监督学习实战笔记

记录用TensorFlow做监督学习的实战经验,包括图像分类、文本分类、模型保存和超参数调优。

回归 vs 分类

类型 应用场景 输出类型 常见算法
回归 房价预测、股价预测 连续值 线性回归、神经网络回归
分类 图像识别、垃圾邮件检测 离散类别 CNN、RNN、Transformer

核心概念:

  • 训练数据:包含输入特征和对应标签
  • 特征(Feature):用于预测的输入变量
  • 标签(Label):需要预测的目标值
  • 损失函数:衡量预测值与真实值差距
  • 优化器:调整模型参数以最小化损失

图像分类:Fashion MNIST

Fashion MNIST是经典的图像分类数据集,10类服装的灰度图像(28x28像素)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# 加载数据
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# 类别名称
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

print(f"训练集形状: {train_images.shape}")
print(f"训练标签数量: {len(train_labels)}")

# 数据预处理:归一化到[0, 1]
train_images = train_images / 255.0
test_images = test_images / 255.0

# 可视化训练样本
plt.figure(figsize=(10, 10))
for i in range(25):
plt.subplot(5, 5, i + 1)
plt.xticks([])
plt.yticks([])
plt.grid(False)
plt.imshow(train_images[i], cmap=plt.cm.binary)
plt.xlabel(class_names[train_labels[i]])
plt.show()

构建CNN模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 构建模型
model = tf.keras.Sequential([
# Flatten层:将28x28图像展平为784维向量
tf.keras.layers.Flatten(input_shape=(28, 28)),
# Dense层:128个神经元,ReLU激活
tf.keras.layers.Dense(128, activation='relu'),
# 输出层:10个类别
tf.keras.layers.Dense(10)
])

# 编译模型
model.compile(
optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)

# 查看模型结构
model.summary()

训练与评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 训练模型
history = model.fit(
train_images, train_labels,
epochs=10,
validation_split=0.2
)

# 评估模型
test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)
print(f'\n测试准确率: {test_acc:.4f}')

# 添加Softmax层进行预测
probability_model = tf.keras.Sequential([
model,
tf.keras.layers.Softmax()
])

predictions = probability_model.predict(test_images)

# 预测第一个样本
print(f"第一个样本的预测概率分布: {predictions[0]}")
print(f"预测类别: {np.argmax(predictions[0])}")
print(f"真实类别: {test_labels[0]}")

预测结果可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def plot_image(i, predictions_array, true_label, img):
true_label, img = true_label[i], img[i]
plt.grid(False)
plt.xticks([])
plt.yticks([])
plt.imshow(img, cmap=plt.cm.binary)

predicted_label = np.argmax(predictions_array)
if predicted_label == true_label:
color = 'blue'
else:
color = 'red'

plt.xlabel("{} {:2.0f}% ({})".format(
class_names[predicted_label],
100 * np.max(predictions_array),
class_names[true_label]),
color=color)

def plot_value_array(i, predictions_array, true_label):
true_label = true_label[i]
plt.grid(False)
plt.xticks(range(10))
plt.yticks([])
thisplot = plt.bar(range(10), predictions_array, color="#777777")
plt.ylim([0, 1])
predicted_label = np.argmax(predictions_array)

thisplot[predicted_label].set_color('red')
thisplot[true_label].set_color('blue')

# 可视化前15个测试样本
num_rows = 5
num_cols = 3
num_images = num_rows * num_cols
plt.figure(figsize=(2 * 2 * num_cols, 2 * num_rows))
for i in range(num_images):
plt.subplot(num_rows, 2 * num_cols, 2 * i + 1)
plot_image(i, predictions[i], test_labels, test_images)
plt.subplot(num_rows, 2 * num_cols, 2 * i + 2)
plot_value_array(i, predictions[i], test_labels)
plt.tight_layout()
plt.show()

文本分类:IMDB情感分析

IMDB数据集包含50,000条电影评论,分为正面和负面两类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# 加载数据(保留最常用的10,000个词)
imdb = keras.datasets.imdb
(train_data, train_labels), (test_data, test_labels) = imdb.load_data(
num_words=10000
)

print(f"训练样本数: {len(train_data)}")
print(f"测试样本数: {len(test_labels)}")
print(f"第一条评论的词索引序列: {train_data[0]}")
print(f"第一条评论的长度: {len(train_data[0])}")
print(f"第二条评论的长度: {len(train_data[1])}")

数据预处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 获取词索引映射
word_index = imdb.get_word_index()

# 添加特殊标记的词索引
word_index = {k: (v + 3) for k, v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2
word_index["<UNUSED>"] = 3

# 创建反向映射
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])

# 解码评论为可读文本
def decode_review(text):
return ' '.join([reverse_word_index.get(i, '?') for i in text])

# 查看第一条评论的文本
print(decode_review(train_data[0]))

序列填充与截断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 使用pad_sequences标准化序列长度
train_data = keras.preprocessing.sequence.pad_sequences(
train_data,
value=word_index["<PAD>"],
padding='post',
maxlen=256
)

test_data = keras.preprocessing.sequence.pad_sequences(
test_data,
value=word_index["<PAD>"],
padding='post',
maxlen=256
)

print(f"填充后第一条评论的长度: {len(train_data[0])}")
print(f"填充后第二条评论的长度: {len(train_data[1])}")

构建Embedding模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
vocab_size = 10000

model = keras.Sequential()
# Embedding层:将词索引转换为词向量
model.add(keras.layers.Embedding(vocab_size, 16))
# GlobalAveragePooling1D:对序列维度取平均
model.add(keras.layers.GlobalAveragePooling1D())
# Dense隐藏层
model.add(keras.layers.Dense(16, activation='relu'))
# 输出层(sigmoid用于二分类)
model.add(keras.layers.Dense(1, activation='sigmoid'))

model.summary()

# 编译模型
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)

训练与验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 划分验证集
x_val = train_data[:10000]
partial_x_train = train_data[10000:]

y_val = train_labels[:10000]
partial_y_train = train_labels[10000:]

# 训练模型
history = model.fit(
partial_x_train,
partial_y_train,
epochs=40,
batch_size=512,
validation_data=(x_val, y_val),
verbose=1
)

# 评估模型
results = model.evaluate(test_data, test_labels, verbose=2)
print(f"\n测试损失: {results[0]:.4f}")
print(f"测试准确率: {results[1]:.4f}")

训练过程可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 提取训练历史
history_dict = history.history
acc = history_dict['accuracy']
val_acc = history_dict['val_accuracy']
loss = history_dict['loss']
val_loss = history_dict['val_loss']

epochs = range(1, len(acc) + 1)

# 绘制损失曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(epochs, loss, 'bo', label='训练损失')
plt.plot(epochs, val_loss, 'b', label='验证损失')
plt.title('训练和验证损失')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# 绘制准确率曲线
plt.subplot(1, 2, 2)
plt.plot(epochs, acc, 'bo', label='训练准确率')
plt.plot(epochs, val_acc, 'b', label='验证准确率')
plt.title('训练和验证准确率')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

模型保存与恢复

检查点回调保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# 创建简单模型
def create_model():
model = keras.Sequential([
keras.layers.Dense(512, activation='relu', input_shape=(784,)),
keras.layers.Dropout(0.2),
keras.layers.Dense(10)
])
model.compile(
optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
return model

# 加载MNIST数据
(train_images, train_labels), (test_images, test_labels) = keras.datasets.mnist.load_data()
train_images = train_images[:1000].reshape(-1, 28 * 28) / 255.0
test_images = test_images[:1000].reshape(-1, 28 * 28) / 255.0
train_labels = train_labels[:1000]
test_labels = test_labels[:1000]

# 创建模型
model = create_model()

# 配置检查点回调
checkpoint_path = "training_1/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# 创建保存权重的回调
cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_path,
save_weights_only=True,
verbose=1
)

# 训练模型并保存权重
model.fit(
train_images, train_labels,
epochs=10,
validation_data=(test_images, test_labels),
callbacks=[cp_callback]
)

# 查看保存的检查点文件
print(f"检查点文件: {os.listdir(checkpoint_dir)}")

从检查点恢复模型

1
2
3
4
5
6
7
8
9
10
11
12
13
# 创建新模型实例(未训练)
model = create_model()

# 评估未训练模型(准确率约10%)
loss, acc = model.evaluate(test_images, test_labels, verbose=2)
print(f"未训练模型准确率: {100*acc:.2f}%")

# 加载权重
model.load_weights(checkpoint_path)

# 重新评估
loss, acc = model.evaluate(test_images, test_labels, verbose=2)
print(f"恢复后模型准确率: {100*acc:.2f}%")

定期保存检查点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 配置定期保存
checkpoint_path = "training_2/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

batch_size = 32

# 每5个epoch保存一次
cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_path,
verbose=1,
save_weights_only=True,
save_freq=5*batch_size
)

# 保存初始权重
model.save_weights(checkpoint_path.format(epoch=0))

# 训练
model.fit(
train_images, train_labels,
epochs=50,
batch_size=batch_size,
callbacks=[cp_callback],
validation_data=(test_images, test_labels),
verbose=0
)

# 查找最新检查点
latest = tf.train.latest_checkpoint(checkpoint_dir)
print(f"最新检查点: {latest}")

保存整个模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 方式1:SavedModel格式(推荐)
model.save('saved_model/my_model')

# 加载SavedModel
new_model = tf.keras.models.load_model('saved_model/my_model')

# 验证模型结构
new_model.summary()

# 评估恢复模型
loss, acc = new_model.evaluate(test_images, test_labels, verbose=2)
print(f"SavedModel准确率: {100*acc:.2f}%")

# 方式2:HDF5格式
model.save('my_model.h5')

# 加载HDF5模型
new_model_h5 = tf.keras.models.load_model('my_model.h5')

# 评估
loss, acc = new_model_h5.evaluate(test_images, test_labels, verbose=2)
print(f"HDF5模型准确率: {100*acc:.2f}%")

超参数自动调优(Keras Tuner)

安装与基本概念

1
pip install keras-tuner -i https://pypi.tuna.tsinghua.edu.cn/simple

超参数类型

  • 模型超参数:隐藏层数量、每层的神经元数量等
  • 算法超参数:学习率、批量大小、优化器等

定义可调模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import tensorflow as tf
from tensorflow import keras
import keras_tuner as kt
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# 加载Fashion MNIST数据
(img_train, label_train), (img_test, label_test) = keras.datasets.fashion_mnist.load_data()
img_train = img_train.astype('float32') / 255.0
img_test = img_test.astype('float32') / 255.0

# 定义超模型构建函数
def model_builder(hp):
model = keras.Sequential()
model.add(keras.layers.Flatten(input_shape=(28, 28)))

# 调节第一个Dense层的神经元数量(32-512)
hp_units = hp.Int('units', min_value=32, max_value=512, step=32)
model.add(keras.layers.Dense(units=hp_units, activation='relu'))
model.add(keras.layers.Dense(10))

# 调节学习率(0.01, 0.001, 0.0001)
hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])

model.compile(
optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)

return model

配置调优器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 创建Hyperband调优器
tuner = kt.Hyperband(
model_builder,
objective='val_accuracy',
max_epochs=10,
factor=3,
directory='my_dir',
project_name='intro_to_kt'
)

# 配置早停回调
stop_early = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=5
)

执行超参数搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 执行搜索
tuner.search(
img_train, label_train,
epochs=50,
validation_split=0.2,
callbacks=[stop_early]
)

# 获取最优超参数
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
超参数搜索完成。
最优第一层神经元数量: {best_hps.get('units')}
最优学习率: {best_hps.get('learning_rate')}
""")

训练最优模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 使用最优超参数构建模型
model = tuner.hypermodel.build(best_hps)

# 训练模型
history = model.fit(
img_train, label_train,
epochs=50,
validation_split=0.2
)

# 找到最优epoch
val_acc_per_epoch = history.history['val_accuracy']
best_epoch = val_acc_per_epoch.index(max(val_acc_per_epoch)) + 1
print(f"最优epoch: {best_epoch}")

# 重新训练最优模型
hypermodel = tuner.hypermodel.build(best_hps)
hypermodel.fit(
img_train, label_train,
epochs=best_epoch,
validation_split=0.2
)

# 评估最终模型
eval_result = hypermodel.evaluate(img_test, label_test)
print(f"[测试损失, 测试准确率]: {eval_result}")

防止过拟合

Dropout正则化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from tensorflow.keras import layers

# 带Dropout的模型
dropout_model = keras.Sequential([
layers.Dense(512, activation='elu', input_shape=(784,)),
layers.Dropout(0.5),
layers.Dense(512, activation='elu'),
layers.Dropout(0.5),
layers.Dense(512, activation='elu'),
layers.Dropout(0.5),
layers.Dense(1)
])

def compile_and_fit(model, name, max_epochs=1000):
model.compile(
optimizer='adam',
loss='mse',
metrics=['mae', 'mse']
)
return model.fit(
train_images, train_labels,
epochs=max_epochs,
validation_split=0.2,
callbacks=[
keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=50,
restore_best_weights=True
)
]
)

# 训练带Dropout的模型
regularizer_histories = {}
regularizer_histories['dropout'] = compile_and_fit(dropout_model, "regularizers/dropout")

Dropout原理

  • 随机将一部分神经元的输出设为0
  • 迫使网络不依赖特定神经元,提高泛化能力
  • 常用Dropout率:0.2-0.5

早停法(Early Stopping)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 配置早停回调
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)

# 训练
model.fit(
train_images, train_labels,
epochs=100,
validation_split=0.2,
callbacks=[early_stopping]
)

推荐配置

数据预处理

步骤 说明 代码示例
归一化 缩放到[0,1] x = x / 255.0
标准化 均值为0,标准差为1 x = (x - mean) / std
序列填充 统一文本序列长度 pad_sequences()
数据增强 扩充训练数据 ImageDataGenerator

模型构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 推荐模型结构
model = keras.Sequential([
keras.layers.Input(shape=(784,)),
keras.layers.Dense(512, activation='relu'),
keras.layers.Dropout(0.3),
keras.layers.Dense(256, activation='relu'),
keras.layers.Dropout(0.3),
keras.layers.Dense(10, activation='softmax')
])

# 推荐优化器配置
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='categorical_crossentropy',
metrics=['accuracy']
)

训练配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
history = model.fit(
train_data, train_labels,
batch_size=32,
epochs=100,
validation_split=0.2,
callbacks=[
keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5
),
keras.callbacks.ModelCheckpoint(
'best_model.h5',
save_best_only=True,
monitor='val_accuracy'
)
]
)

图像分类用CNN、文本分类用Embedding、模型保存有三种方式、超参数用Keras Tuner自动搜。这些都是实际跑过的代码,可以直接用。

参考