| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- """
- Adopted from https://github.com/uranusx86/BinaryNet-on-tensorflow
- """
- # coding=UTF-8
- import tensorflow as tf
- from tensorflow import keras
- from tensorflow.python.framework import tensor_shape, ops
- from tensorflow.python.ops import standard_ops, nn, variable_scope, math_ops, control_flow_ops
- from tensorflow.python.eager import context
- from tensorflow.python.training import optimizer, training_ops
- import numpy as np
- # Warning: if you have a @property getter/setter function in a class, must inherit from object class
- all_layers = []
- def hard_sigmoid(x):
- return tf.clip_by_value((x + 1.) / 2., 0, 1)
- def round_through(x):
- """
- Element-wise rounding to the closest integer with full gradient propagation.
- A trick from [Sergey Ioffe](http://stackoverflow.com/a/36480182)
- a op that behave as f(x) in forward mode,
- but as g(x) in the backward mode.
- """
- rounded = tf.round(x)
- return x + tf.stop_gradient(rounded - x)
- # The neurons' activations binarization function
- # It behaves like the sign function during forward propagation
- # And like:
- # hard_tanh(x) = 2*hard_sigmoid(x)-1
- # during back propagation
- def binary_tanh_unit(x):
- return 2. * round_through(hard_sigmoid(x)) - 1.
- def binary_sigmoid_unit(x):
- return round_through(hard_sigmoid(x))
- # The weights' binarization function,
- # taken directly from the BinaryConnect github repository
- # (which was made available by his authors)
- def binarization(W, H, binary=True, deterministic=False, stochastic=False, srng=None):
- dim = W.get_shape().as_list()
- # (deterministic == True) <-> test-time <-> inference-time
- if not binary or (deterministic and stochastic):
- # print("not binary")
- Wb = W
- else:
- # [-1,1] -> [0,1]
- # Wb = hard_sigmoid(W/H)
- # Wb = T.clip(W/H,-1,1)
- # Stochastic BinaryConnect
- '''
- if stochastic:
- # print("stoch")
- Wb = tf.cast(srng.binomial(n=1, p=Wb, size=tf.shape(Wb)), tf.float32)
- '''
- # Deterministic BinaryConnect (round to nearest)
- # else:
- # print("det")
- # Wb = tf.round(Wb)
- # 0 or 1 -> -1 or 1
- # Wb = tf.where(tf.equal(Wb, 1.0), tf.ones_like(W), -tf.ones_like(W)) # cant differential
- Wb = H * binary_tanh_unit(W / H)
- return Wb
- class DenseBinaryLayer(keras.layers.Dense):
- def __init__(self, output_dim,
- activation=None,
- use_bias=True,
- binary=True, stochastic=True, H=1., W_LR_scale="Glorot",
- kernel_initializer=tf.glorot_normal_initializer(),
- bias_initializer=tf.zeros_initializer(),
- kernel_regularizer=None,
- bias_regularizer=None,
- activity_regularizer=None,
- kernel_constraint=None,
- bias_constraint=None,
- trainable=True,
- name=None,
- **kwargs):
- super(DenseBinaryLayer, self).__init__(
- units=output_dim,
- activation=activation,
- use_bias=use_bias,
- kernel_initializer=kernel_initializer,
- bias_initializer=bias_initializer,
- kernel_regularizer=kernel_regularizer,
- bias_regularizer=bias_regularizer,
- activity_regularizer=activity_regularizer,
- kernel_constraint=kernel_constraint,
- bias_constraint=bias_constraint,
- trainable=trainable,
- name=name,
- **kwargs
- )
- self.binary = binary
- self.stochastic = stochastic
- self.H = H
- self.W_LR_scale = W_LR_scale
- all_layers.append(self)
- def build(self, input_shape):
- num_inputs = tensor_shape.TensorShape(input_shape).as_list()[-1]
- num_units = self.units
- print(num_units)
- if self.H == "Glorot":
- self.H = np.float32(np.sqrt(1.5 / (num_inputs + num_units))) # weight init method
- self.W_LR_scale = np.float32(1. / np.sqrt(1.5 / (num_inputs + num_units))) # each layer learning rate
- print("H = ", self.H)
- print("LR scale = ", self.W_LR_scale)
- self.kernel_initializer = tf.random_uniform_initializer(-self.H, self.H)
- self.kernel_constraint = lambda w: tf.clip_by_value(w, -self.H, self.H)
- '''
- self.b_kernel = self.add_variable('binary_weight',
- shape=[input_shape[-1], self.units],
- initializer=self.kernel_initializer,
- regularizer=None,
- constraint=None,
- dtype=self.dtype,
- trainable=False) # add_variable must execute before call build()
- '''
- self.b_kernel = self.add_variable('binary_weight',
- shape=[input_shape[-1], self.units],
- initializer=tf.random_uniform_initializer(-self.H, self.H),
- regularizer=None,
- constraint=None,
- dtype=self.dtype,
- trainable=False)
- super(DenseBinaryLayer, self).build(input_shape)
- # tf.add_to_collection('real', self.trainable_variables)
- # tf.add_to_collection(self.name + '_binary', self.kernel) # layer-wise group
- # tf.add_to_collection('binary', self.kernel) # global group
- def call(self, inputs):
- inputs = ops.convert_to_tensor(inputs, dtype=self.dtype)
- shape = inputs.get_shape().as_list()
- # binarization weight
- self.b_kernel = binarization(self.kernel, self.H)
- # r_kernel = self.kernel
- # self.kernel = self.b_kernel
- print("shape: ", len(shape))
- if len(shape) > 2:
- # Broadcasting is required for the inputs.
- outputs = standard_ops.tensordot(inputs, self.b_kernel, [[len(shape) - 1], [0]])
- # Reshape the output back to the original ndim of the input.
- if context.in_graph_mode():
- output_shape = shape[:-1] + [self.units]
- outputs.set_shape(output_shape)
- else:
- outputs = standard_ops.matmul(inputs, self.b_kernel)
- # restore weight
- # self.kernel = r_kernel
- if self.use_bias:
- outputs = nn.bias_add(outputs, self.bias)
- if self.activation is not None:
- return self.activation(outputs)
- return outputs
- # Functional interface for the Dense_BinaryLayer class.
- def dense_binary(
- inputs, units,
- activation=None,
- use_bias=True,
- binary=True, stochastic=True, H=1., W_LR_scale="Glorot",
- kernel_initializer=tf.glorot_normal_initializer(),
- bias_initializer=tf.zeros_initializer(),
- kernel_regularizer=None,
- bias_regularizer=None,
- activity_regularizer=None,
- kernel_constraint=None,
- bias_constraint=None,
- trainable=True,
- name=None,
- reuse=None):
- layer = DenseBinaryLayer(units,
- activation=activation,
- use_bias=use_bias,
- binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale,
- kernel_initializer=kernel_initializer,
- bias_initializer=bias_initializer,
- kernel_regularizer=kernel_regularizer,
- bias_regularizer=bias_regularizer,
- activity_regularizer=activity_regularizer,
- kernel_constraint=kernel_constraint,
- bias_constraint=bias_constraint,
- trainable=trainable,
- name=name,
- dtype=inputs.dtype.base_dtype,
- _scope=name,
- _reuse=reuse)
- return layer.apply(inputs)
- # Not yet binarized
- class BatchNormalization(keras.layers.BatchNormalization):
- def __init__(self,
- axis=-1,
- momentum=0.99,
- epsilon=1e-3,
- center=True,
- scale=True,
- beta_initializer=tf.zeros_initializer(),
- gamma_initializer=tf.ones_initializer(),
- moving_mean_initializer=tf.zeros_initializer(),
- moving_variance_initializer=tf.ones_initializer(),
- beta_regularizer=None,
- gamma_regularizer=None,
- beta_constraint=None,
- gamma_constraint=None,
- renorm=False,
- renorm_clipping=None,
- renorm_momentum=0.99,
- fused=None,
- trainable=True,
- name=None,
- **kwargs):
- super(BatchNormalization, self).__init__(
- axis=axis,
- momentum=momentum,
- epsilon=epsilon,
- center=center,
- scale=scale,
- beta_initializer=beta_initializer,
- gamma_initializer=gamma_initializer,
- moving_mean_initializer=moving_mean_initializer,
- moving_variance_initializer=moving_variance_initializer,
- beta_regularizer=beta_regularizer,
- gamma_regularizer=gamma_regularizer,
- beta_constraint=beta_constraint,
- gamma_constraint=gamma_constraint,
- renorm=renorm,
- renorm_clipping=renorm_clipping,
- renorm_momentum=renorm_momentum,
- fused=fused,
- trainable=trainable,
- name=name,
- **kwargs)
- # all_layers.append(self)
- def build(self, input_shape):
- super(BatchNormalization, self).build(input_shape)
- self.W_LR_scale = np.float32(1.)
- # Functional interface for the batch normalization layer.
- def batch_normalization(
- inputs,
- axis=-1,
- momentum=0.99,
- epsilon=1e-3,
- center=True,
- scale=True,
- beta_initializer=tf.zeros_initializer(),
- gamma_initializer=tf.ones_initializer(),
- moving_mean_initializer=tf.zeros_initializer(),
- moving_variance_initializer=tf.ones_initializer(),
- beta_regularizer=None,
- gamma_regularizer=None,
- beta_constraint=None,
- gamma_constraint=None,
- training=False,
- trainable=True,
- name=None,
- reuse=None,
- renorm=False,
- renorm_clipping=None,
- renorm_momentum=0.99,
- fused=None):
- layer = BatchNormalization(
- axis=axis,
- momentum=momentum,
- epsilon=epsilon,
- center=center,
- scale=scale,
- beta_initializer=beta_initializer,
- gamma_initializer=gamma_initializer,
- moving_mean_initializer=moving_mean_initializer,
- moving_variance_initializer=moving_variance_initializer,
- beta_regularizer=beta_regularizer,
- gamma_regularizer=gamma_regularizer,
- beta_constraint=beta_constraint,
- gamma_constraint=gamma_constraint,
- renorm=renorm,
- renorm_clipping=renorm_clipping,
- renorm_momentum=renorm_momentum,
- fused=fused,
- trainable=trainable,
- name=name,
- dtype=inputs.dtype.base_dtype,
- _reuse=reuse,
- _scope=name
- )
- return layer.apply(inputs, training=training)
- class AdamOptimizer(optimizer.Optimizer):
- """Optimizer that implements the Adam algorithm.
- See [Kingma et al., 2014](http://arxiv.org/abs/1412.6980)
- ([pdf](http://arxiv.org/pdf/1412.6980.pdf)).
- """
- def __init__(self, weight_scale, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8,
- use_locking=False, name="Adam"):
- super(AdamOptimizer, self).__init__(use_locking, name)
- self._lr = learning_rate
- self._beta1 = beta1
- self._beta2 = beta2
- self._epsilon = epsilon
- # BNN weight scale factor
- self._weight_scale = weight_scale
- # Tensor versions of the constructor arguments, created in _prepare().
- self._lr_t = None
- self._beta1_t = None
- self._beta2_t = None
- self._epsilon_t = None
- # Variables to accumulate the powers of the beta parameters.
- # Created in _create_slots when we know the variables to optimize.
- self._beta1_power = None
- self._beta2_power = None
- # Created in SparseApply if needed.
- self._updated_lr = None
- def _get_beta_accumulators(self):
- return self._beta1_power, self._beta2_power
- def _non_slot_variables(self):
- return self._get_beta_accumulators()
- def _create_slots(self, var_list):
- first_var = min(var_list, key=lambda x: x.name)
- create_new = self._beta1_power is None
- if not create_new and context.in_graph_mode():
- create_new = (self._beta1_power.graph is not first_var.graph)
- if create_new:
- with ops.colocate_with(first_var):
- self._beta1_power = variable_scope.variable(self._beta1,
- name="beta1_power",
- trainable=False)
- self._beta2_power = variable_scope.variable(self._beta2,
- name="beta2_power",
- trainable=False)
- # Create slots for the first and second moments.
- for v in var_list:
- self._zeros_slot(v, "m", self._name)
- self._zeros_slot(v, "v", self._name)
- def _prepare(self):
- self._lr_t = ops.convert_to_tensor(self._lr, name="learning_rate")
- self._beta1_t = ops.convert_to_tensor(self._beta1, name="beta1")
- self._beta2_t = ops.convert_to_tensor(self._beta2, name="beta2")
- self._epsilon_t = ops.convert_to_tensor(self._epsilon, name="epsilon")
- def _apply_dense(self, grad, var):
- m = self.get_slot(var, "m")
- v = self.get_slot(var, "v")
- # for BNN kernel
- # origin version clipping weight method is new_w = old_w + scale*(new_w - old_w)
- # and adam update function is new_w = old_w - lr_t * m_t / (sqrt(v_t) + epsilon)
- # so subtitute adam function into weight clipping
- # new_w = old_w - (scale * lr_t * m_t) / (sqrt(v_t) + epsilon)
- scale = self._weight_scale[var.name] / 4
- return training_ops.apply_adam(
- var, m, v,
- math_ops.cast(self._beta1_power, var.dtype.base_dtype),
- math_ops.cast(self._beta2_power, var.dtype.base_dtype),
- math_ops.cast(self._lr_t * scale, var.dtype.base_dtype),
- math_ops.cast(self._beta1_t, var.dtype.base_dtype),
- math_ops.cast(self._beta2_t, var.dtype.base_dtype),
- math_ops.cast(self._epsilon_t, var.dtype.base_dtype),
- grad, use_locking=self._use_locking).op
- def _resource_apply_dense(self, grad, var):
- m = self.get_slot(var, "m")
- v = self.get_slot(var, "v")
- return training_ops.resource_apply_adam(
- var.handle, m.handle, v.handle,
- math_ops.cast(self._beta1_power, grad.dtype.base_dtype),
- math_ops.cast(self._beta2_power, grad.dtype.base_dtype),
- math_ops.cast(self._lr_t, grad.dtype.base_dtype),
- math_ops.cast(self._beta1_t, grad.dtype.base_dtype),
- math_ops.cast(self._beta2_t, grad.dtype.base_dtype),
- math_ops.cast(self._epsilon_t, grad.dtype.base_dtype),
- grad, use_locking=self._use_locking)
- def _apply_sparse_shared(self, grad, var, indices, scatter_add):
- beta1_power = math_ops.cast(self._beta1_power, var.dtype.base_dtype)
- beta2_power = math_ops.cast(self._beta2_power, var.dtype.base_dtype)
- lr_t = math_ops.cast(self._lr_t, var.dtype.base_dtype)
- beta1_t = math_ops.cast(self._beta1_t, var.dtype.base_dtype)
- beta2_t = math_ops.cast(self._beta2_t, var.dtype.base_dtype)
- epsilon_t = math_ops.cast(self._epsilon_t, var.dtype.base_dtype)
- lr = (lr_t * math_ops.sqrt(1 - beta2_power) / (1 - beta1_power))
- # m_t = beta1 * m + (1 - beta1) * g_t
- m = self.get_slot(var, "m")
- m_scaled_g_values = grad * (1 - beta1_t)
- m_t = state_ops.assign(m, m * beta1_t,
- use_locking=self._use_locking)
- with ops.control_dependencies([m_t]):
- m_t = scatter_add(m, indices, m_scaled_g_values)
- # v_t = beta2 * v + (1 - beta2) * (g_t * g_t)
- v = self.get_slot(var, "v")
- v_scaled_g_values = (grad * grad) * (1 - beta2_t)
- v_t = state_ops.assign(v, v * beta2_t, use_locking=self._use_locking)
- with ops.control_dependencies([v_t]):
- v_t = scatter_add(v, indices, v_scaled_g_values)
- v_sqrt = math_ops.sqrt(v_t)
- var_update = state_ops.assign_sub(var,
- lr * m_t / (v_sqrt + epsilon_t),
- use_locking=self._use_locking)
- return control_flow_ops.group(*[var_update, m_t, v_t])
- def _apply_sparse(self, grad, var):
- return self._apply_sparse_shared(
- grad.values, var, grad.indices,
- lambda x, i, v: state_ops.scatter_add( # pylint: disable=g-long-lambda
- x, i, v, use_locking=self._use_locking))
- def _resource_scatter_add(self, x, i, v):
- with ops.control_dependencies(
- [resource_variable_ops.resource_scatter_add(
- x.handle, i, v)]):
- return x.value()
- def _resource_apply_sparse(self, grad, var, indices):
- return self._apply_sparse_shared(
- grad, var, indices, self._resource_scatter_add)
- def _finish(self, update_ops, name_scope):
- # Update the power accumulators.
- with ops.control_dependencies(update_ops):
- with ops.colocate_with(self._beta1_power):
- update_beta1 = self._beta1_power.assign(
- self._beta1_power * self._beta1_t,
- use_locking=self._use_locking)
- update_beta2 = self._beta2_power.assign(
- self._beta2_power * self._beta2_t,
- use_locking=self._use_locking)
- return control_flow_ops.group(*update_ops + [update_beta1, update_beta2],
- name=name_scope)
- def get_all_layers():
- return all_layers
- def get_all_LR_scale():
- return {layer.kernel.name: layer.W_LR_scale for layer in get_all_layers()}
- # This function computes the gradient of the binary weights
- def compute_grads(loss, opt):
- layers = get_all_layers()
- grads_list = []
- update_weights = []
- for layer in layers:
- # refer to self.params[self.W]=set(['binary'])
- # The list can optionally be filtered by specifying tags as keyword arguments.
- # For example,
- # ``trainable=True`` will only return trainable parameters, and
- # ``regularizable=True`` will only return parameters that can be regularized
- # function return, e.g. [W, b] for dense layer
- params = tf.get_collection(layer.name + "_binary")
- if params:
- # print(params[0].name)
- # theano.grad(cost, wrt) -> d(cost)/d(wrt)
- # wrt – with respect to which we want gradients
- # http://blog.csdn.net/shouhuxianjian/article/details/46517143
- # http://blog.csdn.net/qq_33232071/article/details/52806630
- # grad = opt.compute_gradients(loss, layer.b_kernel) # origin version
- grad = opt.compute_gradients(loss, params[0]) # modify
- print("grad: ", grad)
- grads_list.append(grad[0][0])
- update_weights.extend(params)
- print(grads_list)
- print(update_weights)
- return zip(grads_list, update_weights)
|