TensorFlow学习笔记【三】 实现去噪自编码器

一、

自编码器(Autoencoder)的输入节点和输出节点的数量是一致的,通常希望使用少量稀疏的高阶特征来重构输入,并非直接逐个复制输入节点(废话)。自编码器就是可以使用自身的高阶特征来编码自己,也就是提取出数据的高阶特征,用高阶特征重新组合来重构自己,相当于学习了恒等式:

$$y=x$$

去噪自编码器最常用的噪声是加性高斯噪声(Additive Gaussian Noise, AGN),算法从噪声中学习数据的特征,略去无规则的噪声,学习数据中频繁出现的模式和结构,才能在输出时复原数据。

自编码器在实际应用中不仅可以为监督训练做预训练,也可以直接进行特征提取和分析。

二、

python 实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import numpy as np
import sklearn.preprocessing as prep
import tensorflow as tf
import input_data

# 定义 Xaiver 初始化方法
def xavier_init(fan_in, fan_out, constant=1):
low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
high = constant * np.sqrt(6.0 / (fan_in + fan_out))
return tf.random_uniform((fan_in, fan_out), minval=low, maxval=high, dtype=tf.float32)

class AdditiveGaussianNoiseAutoencoder(object):

def __init__(self, n_input, n_hidden, transfer_function=tf.nn.softplus,
optimizer=tf.train.AdamOptimizer(), scale=0.1):
'''
初始化函数(只有一个隐含层),如何添加多个隐含层?
:param n_input: 输入变量数
:param n_hidden: 隐含层节点数
:param transfer_function: 隐含层激活函数,默认为 softplus
:param optimizer: 优化器,默认为 Adam
:param scale: 高斯噪声系数,默认为 0.1
'''
self.n_input = n_input
self.n_hidden = n_hidden
self.transfer = transfer_function
self.scale = tf.placeholder(tf.float32)
self.training_scale = scale
network_weights = self._initialize_weights()
self.weights = network_weights

# 定义网络结构

self.x = tf.placeholder(tf.float32, [None, self.n_input])
self.hidden = self.transfer(tf.add(tf.matmul(
self.x + scale * tf.random_normal((n_input,)),
self.weights['w1']), self.weights['b1']))
self.reconstruction = tf.add(tf.matmul(self.hidden,
self.weights['w2']), self.weights['b2'])

# 损失函数

self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(
self.reconstruction, self.x), 2.0))

self.optimizer = optimizer.minimize(self.cost)

init = tf.global_variables_initializer()
self.sess = tf.Session()
self.sess.run(init)

def _initialize_weights(self):
all_weights = dict()
all_weights['w1'] = tf.Variable(xavier_init(self.n_input,
self.n_hidden))
all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden],
dtype=tf.float32))
all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden,
self.n_input], dtype=tf.float32))
all_weights['b2'] = tf.Variable(tf.zeros([self.n_input],
dtype=tf.float32))
return all_weights

def partial_fit(self, X):
'''
用一个 batch 数据进行训练,返回当前的损失
:param X:
:return:
'''
cost, opt = self.sess.run((self.cost, self.optimizer),
feed_dict={self.x: X, self.scale: self.training_scale})
return cost

def calc_total_cost(self, X):
return self.sess.run(self.cost, feed_dict={self.x: X, self.scale: self.training_scale})

# 常用的成员函数(计算图中的子图)
def transform(self):
return self.sess.run(self.hidden, feed_dict={self.x: X, self.scale: self.training_scale})

def generate(self, hidden = None):
if hidden is None:
hidden = np.random.normal(size=self.weights["b1"])
return self.sess.run(self.reconstruction, feed_dict={self.hidden: hidden})

def reconstruct(self, X):
return self.sess.run(self.reconstruction, feed_dict={self.x: X, self.scale: self.training_scale})

def getWeights(self):
return self.sess.run(self.weights['w1'])

def getWeights(self):
return self.sess.run(self.weights['b1'])

mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

def standard_scale(X_train, X_test):
preprocessor = prep.StandardScaler().fit(X_train)
X_train = preprocessor.transform(X_train)
X_test = preprocessor.transform(X_test)
return X_train, X_test

def get_random_block_from_data(data, batch_size):
start_index = np.random.randint(0, len(data) - batch_size)
return data[start_index:(start_index + batch_size)]

X_train, X_test = standard_scale(mnist.train.images, mnist.test.images)

n_samples = int(mnist.train.num_examples)
# 训练轮数(epoch)
training_epochs = 20
batch_size = 128
display_step = 1

autoencoder = AdditiveGaussianNoiseAutoencoder(n_input=784,
n_hidden=200,
transfer_function=tf.nn.softplus,
optimizer=tf.train.AdamOptimizer(learning_rate=0.001),
scale=0.01)

# 训练
for epoch in range(training_epochs):
avg_cost = 0
total_batch = int(n_samples / batch_size)
for i in range(total_batch):
batch_xs = get_random_block_from_data(X_train, batch_size)

cost = autoencoder.partial_fit(batch_xs)
avg_cost += cost / n_samples * batch_size

if epoch % display_step == 0:
print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost))


print("total cost: " + str(autoencoder.calc_total_cost(X_test)))