forked from tensorflow/models
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nn.py
189 lines (165 loc) · 11.3 KB
/
nn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from builtins import range
from builtins import object
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import tensorflow as tf
import numpy as np
from itertools import product
class FeedForwardNet(object):
"""Custom feed-forward network layer."""
def __init__(self, name, in_size, out_shape, layers=1, hidden_dim=32, final_nonlinearity=None, get_uncertainty=False):
self.name = name
self.in_size = in_size
self.out_shape = out_shape
self.out_size = np.prod(out_shape)
self.layers = layers
self.hidden_dim = hidden_dim
self.final_nonlinearity = (lambda x:x) if final_nonlinearity is None else final_nonlinearity
self.get_uncertainty = get_uncertainty
self.weights = [None] * layers
self.biases = [None] * layers
self.params_list = []
with tf.variable_scope(name):
for layer_i in range(self.layers):
in_size = self.hidden_dim
out_size = self.hidden_dim
if layer_i == 0: in_size = self.in_size
if layer_i == self.layers - 1: out_size = self.out_size
self.weights[layer_i] = tf.get_variable("weights%d" % layer_i, [in_size, out_size], initializer=tf.contrib.layers.xavier_initializer())
self.biases[layer_i] = tf.get_variable("bias%d" % layer_i, [1, out_size], initializer=tf.constant_initializer(0.0))
self.params_list += [self.weights[layer_i], self.biases[layer_i]]
def __call__(self, x, stop_params_gradient=False, is_eval=True, ensemble_idxs=None, pre_expanded=None, reduce_mode="none"):
original_shape = tf.shape(x)
h = tf.reshape(x, [-1, self.in_size])
for layer_i in range(self.layers):
nonlinearity = tf.nn.relu if layer_i + 1 < self.layers else self.final_nonlinearity
if stop_params_gradient: h = nonlinearity(tf.matmul(h, tf.stop_gradient(self.weights[layer_i])) + tf.stop_gradient(self.biases[layer_i]))
else: h = nonlinearity(tf.matmul(h, self.weights[layer_i]) + self.biases[layer_i])
if len(self.out_shape) > 0: h = tf.reshape(h, tf.concat([original_shape[:-1], tf.constant(self.out_shape)], -1))
else: h = tf.reshape(h, original_shape[:-1])
if pre_expanded is None: pre_expanded = ensemble_idxs is not None
if reduce_mode == "none" and not pre_expanded and self.get_uncertainty:
if len(self.out_shape) > 0: h = tf.expand_dims(h, -2)
else: h = tf.expand_dims(h, -1)
return h
def l2_loss(self):
return tf.add_n([tf.reduce_sum(.5 * tf.square(mu)) for mu in self.params_list])
class BayesianDropoutFeedForwardNet(FeedForwardNet):
"""Custom feed-forward network layer, with dropout as a Bayesian approximation."""
def __init__(self, name, in_size, out_shape, layers=1, hidden_dim=32, final_nonlinearity=None, get_uncertainty=False, keep_prob=.5, eval_sample_count=2, consistent_random_seed=False):
super(BayesianDropoutFeedForwardNet, self).__init__(name, in_size, out_shape, layers=layers, hidden_dim=hidden_dim,
final_nonlinearity=final_nonlinearity, get_uncertainty=get_uncertainty)
self.keep_prob = keep_prob
self.eval_sample_count = eval_sample_count
if eval_sample_count < 2: raise Exception("eval_sample_count must be at least 2 to estimate uncertainty")
self.dropout_seed = tf.random_uniform([layers], maxval=1e18, dtype=tf.int64) if consistent_random_seed else [None] * layers
def __call__(self, x, stop_params_gradient=False, is_eval=True, pre_expanded=False, ensemble_idxs=None, reduce_mode="none"):
if is_eval:
x = tf.tile(tf.expand_dims(x,0), tf.concat([tf.constant([self.eval_sample_count]), tf.ones_like(tf.shape(x))], 0))
original_shape = tf.shape(x)
h = tf.reshape(x, [-1, self.in_size])
for layer_i in range(self.layers):
nonlinearity = tf.nn.relu if layer_i + 1 < self.layers else self.final_nonlinearity
if layer_i > 0: h = tf.nn.dropout(h, keep_prob=self.keep_prob, seed=self.dropout_seed[layer_i])
if stop_params_gradient: h = nonlinearity(tf.matmul(h, tf.stop_gradient(self.weights[layer_i])) + tf.stop_gradient(self.biases[layer_i]))
else: h = nonlinearity(tf.matmul(h, self.weights[layer_i]) + self.biases[layer_i])
if len(self.out_shape) > 0: h = tf.reshape(h, tf.concat([original_shape[:-1], tf.constant(self.out_shape)], -1))
else: h = tf.reshape(h, original_shape[:-1])
if is_eval:
h, uncertainty = tf.nn.moments(h, 0)
if self.get_uncertainty: return h, uncertainty
else: return h
else:
return h
class EnsembleFeedForwardNet(FeedForwardNet):
"""Custom feed-forward network layer with an ensemble."""
def __init__(self, name, in_size, out_shape, layers=1, hidden_dim=32, final_nonlinearity=None, get_uncertainty=False, ensemble_size=2, train_sample_count=2, eval_sample_count=2):
if train_sample_count > ensemble_size: raise Exception("train_sample_count cannot be larger than ensemble size")
if eval_sample_count > ensemble_size: raise Exception("eval_sample_count cannot be larger than ensemble size")
self.name = name
self.in_size = in_size
self.out_shape = out_shape
self.out_size = np.prod(out_shape)
self.layers = layers
self.hidden_dim = hidden_dim
self.final_nonlinearity = (lambda x:x) if final_nonlinearity is None else final_nonlinearity
self.get_uncertainty = get_uncertainty
self.ensemble_size = ensemble_size
self.train_sample_count = train_sample_count
self.eval_sample_count = eval_sample_count
self.weights = [None] * layers
self.biases = [None] * layers
self.params_list = []
with tf.variable_scope(name):
for layer_i in range(self.layers):
in_size = self.hidden_dim
out_size = self.hidden_dim
if layer_i == 0: in_size = self.in_size
if layer_i == self.layers - 1: out_size = self.out_size
self.weights[layer_i] = tf.get_variable("weights%d" % layer_i, [ensemble_size, in_size, out_size], initializer=tf.contrib.layers.xavier_initializer())
self.biases[layer_i] = tf.get_variable("bias%d" % layer_i, [ensemble_size, out_size], initializer=tf.constant_initializer(0.0))
self.params_list += [self.weights[layer_i], self.biases[layer_i]]
def __call__(self, x, stop_params_gradient=False, is_eval=True, ensemble_idxs=None, pre_expanded=None, reduce_mode="none"):
if pre_expanded is None: pre_expanded = ensemble_idxs is not None
if ensemble_idxs is None:
ensemble_idxs = tf.random_shuffle(tf.range(self.ensemble_size))
ensemble_sample_n = self.eval_sample_count if is_eval else self.train_sample_count
ensemble_idxs = ensemble_idxs[:ensemble_sample_n]
else:
ensemble_sample_n = tf.shape(ensemble_idxs)[0]
weights = [tf.gather(w, ensemble_idxs, axis=0) for w in self.weights]
biases = [tf.expand_dims(tf.gather(b, ensemble_idxs, axis=0),0) for b in self.biases]
original_shape = tf.shape(x)
if pre_expanded: h = tf.reshape(x, [-1, ensemble_sample_n, self.in_size])
else: h = tf.tile(tf.reshape(x, [-1, 1, self.in_size]), [1, ensemble_sample_n, 1])
for layer_i in range(self.layers):
nonlinearity = tf.nn.relu if layer_i + 1 < self.layers else self.final_nonlinearity
if stop_params_gradient: h = nonlinearity(tf.einsum('bri,rij->brj', h, tf.stop_gradient(weights[layer_i])) + tf.stop_gradient(biases[layer_i]))
else: h = nonlinearity(tf.einsum('bri,rij->brj', h, weights[layer_i]) + biases[layer_i])
if pre_expanded:
if len(self.out_shape) > 0: h = tf.reshape(h, tf.concat([original_shape[:-1], tf.constant(self.out_shape)], -1))
else: h = tf.reshape(h, original_shape[:-1])
else:
if len(self.out_shape) > 0: h = tf.reshape(h, tf.concat([original_shape[:-1], tf.constant([ensemble_sample_n]), tf.constant(self.out_shape)], -1))
else: h = tf.reshape(h, tf.concat([original_shape[:-1], tf.constant([ensemble_sample_n])], -1))
if reduce_mode == "none":
pass
elif reduce_mode == "random":
if len(self.out_shape) > 0: h = tf.reduce_sum(h * tf.reshape(tf.one_hot(tf.random_uniform([tf.shape(h)[0]], 0, ensemble_sample_n, dtype=tf.int64), ensemble_sample_n), tf.concat([tf.shape(h)[:1], tf.ones_like(tf.shape(h)[1:-2]), tf.constant([ensemble_sample_n]), tf.constant([1])], 0)), -2)
else: h = tf.reduce_sum(h * tf.reshape(tf.one_hot(tf.random_uniform([tf.shape(h)[0]], 0, ensemble_sample_n, dtype=tf.int64), ensemble_sample_n), tf.concat([tf.shape(h)[:1], tf.ones_like(tf.shape(h)[1:-1]), tf.constant([ensemble_sample_n])], 0)), -1)
elif reduce_mode == "mean":
if len(self.out_shape) > 0: h = tf.reduce_mean(h, -2)
else: h = tf.reduce_mean(h, -1)
else: raise Exception("use a valid reduce mode: none, random, or mean")
return h
class ReparamNormal(object):
"""Wrapper to make a feedforward network that outputs both mu and logsigma,
for use in the reparameterization trick."""
def __init__(self, base_net, name, in_size, out_shape, layers=2, hidden_dim=32, final_nonlinearity=None, ls_start_bias=0.0, final_net=FeedForwardNet, logsigma_min=-5., logsigma_max=2., **kwargs):
assert layers > 1
self.main_encoder = base_net(name+"_base", in_size, [hidden_dim], layers, hidden_dim, final_nonlinearity=tf.nn.relu, **kwargs)
self.mu = final_net(name+"_mu", hidden_dim, out_shape, layers=1, final_nonlinearity=final_nonlinearity, **kwargs)
self.logsigma = final_net(name+"_logsigma", hidden_dim, out_shape, layers=1, final_nonlinearity=None, **kwargs)
self.ls_start_bias = ls_start_bias
self.params_list = self.main_encoder.params_list + self.mu.params_list + self.logsigma.params_list
self.logsigma_min = logsigma_min
self.logsigma_max = logsigma_max
def __call__(self, x):
encoded = self.main_encoder(x)
mu = self.mu(encoded)
logsigma = tf.clip_by_value(self.logsigma(encoded) + self.ls_start_bias, self.logsigma_min, self.logsigma_max)
return mu, logsigma
def l2_loss(self):
return self.main_encoder.l2_loss() + self.mu.l2_loss() + self.logsigma.l2_loss()