1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
| import numpy as np import sklearn.preprocessing as prep import tensorflow as tf import input_data
def xavier_init(fan_in, fan_out, constant=1): low = -constant * np.sqrt(6.0 / (fan_in + fan_out)) high = constant * np.sqrt(6.0 / (fan_in + fan_out)) return tf.random_uniform((fan_in, fan_out), minval=low, maxval=high, dtype=tf.float32)
class AdditiveGaussianNoiseAutoencoder(object):
def __init__(self, n_input, n_hidden, transfer_function=tf.nn.softplus, optimizer=tf.train.AdamOptimizer(), scale=0.1): ''' 初始化函数(只有一个隐含层),如何添加多个隐含层? :param n_input: 输入变量数 :param n_hidden: 隐含层节点数 :param transfer_function: 隐含层激活函数,默认为 softplus :param optimizer: 优化器,默认为 Adam :param scale: 高斯噪声系数,默认为 0.1 ''' self.n_input = n_input self.n_hidden = n_hidden self.transfer = transfer_function self.scale = tf.placeholder(tf.float32) self.training_scale = scale network_weights = self._initialize_weights() self.weights = network_weights
self.x = tf.placeholder(tf.float32, [None, self.n_input]) self.hidden = self.transfer(tf.add(tf.matmul( self.x + scale * tf.random_normal((n_input,)), self.weights['w1']), self.weights['b1'])) self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2'])
self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract( self.reconstruction, self.x), 2.0))
self.optimizer = optimizer.minimize(self.cost)
init = tf.global_variables_initializer() self.sess = tf.Session() self.sess.run(init)
def _initialize_weights(self): all_weights = dict() all_weights['w1'] = tf.Variable(xavier_init(self.n_input, self.n_hidden)) all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden], dtype=tf.float32)) all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden, self.n_input], dtype=tf.float32)) all_weights['b2'] = tf.Variable(tf.zeros([self.n_input], dtype=tf.float32)) return all_weights
def partial_fit(self, X): ''' 用一个 batch 数据进行训练,返回当前的损失 :param X: :return: ''' cost, opt = self.sess.run((self.cost, self.optimizer), feed_dict={self.x: X, self.scale: self.training_scale}) return cost
def calc_total_cost(self, X): return self.sess.run(self.cost, feed_dict={self.x: X, self.scale: self.training_scale})
def transform(self): return self.sess.run(self.hidden, feed_dict={self.x: X, self.scale: self.training_scale})
def generate(self, hidden = None): if hidden is None: hidden = np.random.normal(size=self.weights["b1"]) return self.sess.run(self.reconstruction, feed_dict={self.hidden: hidden})
def reconstruct(self, X): return self.sess.run(self.reconstruction, feed_dict={self.x: X, self.scale: self.training_scale})
def getWeights(self): return self.sess.run(self.weights['w1'])
def getWeights(self): return self.sess.run(self.weights['b1'])
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
def standard_scale(X_train, X_test): preprocessor = prep.StandardScaler().fit(X_train) X_train = preprocessor.transform(X_train) X_test = preprocessor.transform(X_test) return X_train, X_test
def get_random_block_from_data(data, batch_size): start_index = np.random.randint(0, len(data) - batch_size) return data[start_index:(start_index + batch_size)]
X_train, X_test = standard_scale(mnist.train.images, mnist.test.images)
n_samples = int(mnist.train.num_examples)
training_epochs = 20 batch_size = 128 display_step = 1
autoencoder = AdditiveGaussianNoiseAutoencoder(n_input=784, n_hidden=200, transfer_function=tf.nn.softplus, optimizer=tf.train.AdamOptimizer(learning_rate=0.001), scale=0.01)
for epoch in range(training_epochs): avg_cost = 0 total_batch = int(n_samples / batch_size) for i in range(total_batch): batch_xs = get_random_block_from_data(X_train, batch_size)
cost = autoencoder.partial_fit(batch_xs) avg_cost += cost / n_samples * batch_size
if epoch % display_step == 0: print("Epoch:", '%04d' % (epoch+1), "cost=", "{:.9f}".format(avg_cost))
print("total cost: " + str(autoencoder.calc_total_cost(X_test)))
|