import numpy as np import ctypes import hetu as ht from . import ndarray from . import gpu_links as gpu_op from .lr_scheduler import FixedScheduler from .gpu_ops.Node import Op from .gpu_ops.EmbeddingLookUp import EmbeddingLookUp_Gradient from .gpu_ops.ParameterServerCommunicate import ParameterServerCommunicateOp from .gpu_ops.Variable import PlaceholderOp class Optimizer(object): """Optimizers.""" def __init__(self, learning_rate, l2reg=0): if isinstance(learning_rate, FixedScheduler): self.lr_sched = learning_rate else: assert learning_rate >= 0, \ "learning rate must be non-negative" self.lr_sched = FixedScheduler(learning_rate) # now we don't support l2 regularizer for sparse updates # TODO: support l2 regularizer for sparse updates # now we don't support l2 regularizer for PS mode parameters # TODO: support l2 regularizer for PS mode parameters (after PS mode has optimizer on Servers) assert l2reg >= 0, 'L2 regularizer should be positive or 0.' self.l2reg = l2reg self.params = None self.tensors = None self.initiated = False @property def learning_rate(self): return self.lr_sched.get() @staticmethod def get_var_list(loss): def topo_sort_dfs(node, visited, var_list): if node in visited: return visited.add(node) if isinstance(node, PlaceholderOp) and node.trainable: var_list.append(node) return for n in node.inputs: topo_sort_dfs(n, visited, var_list) visited = set() trainable_vars = [] if isinstance(loss, list): for l in loss: topo_sort_dfs(l, visited, trainable_vars) else: topo_sort_dfs(loss, visited, trainable_vars) return trainable_vars def initiate_states(self, config): assert not self.initiated, "Optimizer already initiated." self.tensors = [config.placeholder_to_arr_map[node] for node in self.params] self.initiated = True def minimize(self, loss, var_list=None): """Return an optimizer op to update parameters. Parameters ---------- loss: loss node that we are minimizing. var_list: list of nodes that we are taking derivative wrt. Returns ------- An optimizer node. """ if not var_list: var_list = self.get_var_list(loss) self.params = var_list grads = ht.gradients(loss, self.params) optimizer_node = OptimizerOp(grads, self) return optimizer_node class OptimizerOp(Op): def __init__(self, grads, optimizer): super().__init__(OptimizerOp, grads, None) self.name = "Optimizer_%s" % (optimizer.name) self.optimizer = optimizer def compute(self, input_vals, output_val, stream_handle=None): assert output_val is None # For PS op, this input_vals is None # PS mode doesn't need local update if self.comm_mode != 'PS': self.optimizer.update(input_vals, stream_handle) def gradient(self, output_grad): raise NotImplementedError def infer_shape(self, input_shapes): return None def forward_hook(self, config): # disable inplace if not lazy execution # previously we use array reshape lazy callback to do this, which is deprecated (not efficient) for node in self.inputs: node.inplace = False self.optimizer.initiate_states(config) self.on_cpu = self.on_gpu = None self.comm_mode = config.comm_mode # some things todo. if self.comm_mode != 'PS': for i in range(len(self.inputs)): # Though the gradients for transfer ops are well defined, # we called gradients in optimizer op before transfer ops are added. # So here we also add tranfer ops for gradients update. # Could be optimized later. if not isinstance(self.inputs[i], ParameterServerCommunicateOp): paramctx = self.optimizer.params[i].ctx self.inputs[i] = super().add_transfer_op( self.inputs[i], paramctx, config.h2d_ops, config.d2h_ops) def backward_hook(self, config): self.comm_mode = config.comm_mode new_inputs = [] for i, node in enumerate(self.inputs): current_strategy = config.node_strategy.get( self.optimizer.params[i], self.comm_mode) if current_strategy == 'AllReduce' or (current_strategy == 'Hybrid' and not isinstance(node, EmbeddingLookUp_Gradient)): new_inputs.append(ht.allreduceCommunicate_op( node, config.param_allreduce_group.get(self.optimizer.params[i], config.nccl_comm))) elif current_strategy == 'PS' or (current_strategy == 'Hybrid' and isinstance(node, EmbeddingLookUp_Gradient)): new_inputs.append(ht.parameterServerCommunicate_op( node, self.optimizer.params[i], self.optimizer.get_config())) else: new_inputs.append(node) self.inputs = new_inputs class SGDOptimizer(Optimizer): def __init__(self, learning_rate=0.01, l2reg=0): super(SGDOptimizer, self).__init__(learning_rate, l2reg) self.name = 'SGD' def get_config(self): return (ctypes.c_int(0), (ctypes.c_float * 1)(self.learning_rate), ctypes.c_int(1)) def initiate_states(self, config): super().initiate_states(config) def update(self, grads, stream_handle=None): assert self.initiated is True params_size = len(self.params) assert params_size == len(grads) for i in range(params_size): if grads[i] == None: continue if self.params[i].on_gpu: assert isinstance(self.tensors[i], ndarray.NDArray) assert isinstance( grads[i], (ndarray.NDArray, ndarray.IndexedSlices)) if self.l2reg > 0: gpu_op.add_l2_regularization( self.tensors[i], grads[i], self.l2reg, stream_handle) gpu_op.sgd_update( self.tensors[i], grads[i], self.learning_rate, stream_handle) else: from ._base import DNNL_LIB if isinstance(grads[i], ndarray.IndexedSlices): if DNNL_LIB['cpu_SGDOptimizerSparseUpdate']: from .cpu_links import sgd_update_sparse as cpu_sgd_update_sparse cpu_sgd_update_sparse( self.tensors[i], grads[i].indices, grads[i].values, self.learning_rate) else: grads[i].cpu_deduplicate() np_tensor = self.tensors[i].asnumpy() np_tensor[grads[i].indices.asnumpy().astype( np.int)] -= self.learning_rate * grads[i].values.asnumpy() self.tensors[i][:] = np_tensor grads[i].free_deduplicate() else: if DNNL_LIB['cpu_SGDOptimizerUpdate']: from .cpu_links import sgd_update as cpu_sgd_update if self.l2reg > 0: from .cpu_links import add_l2_regularization as cpu_add_l2_regularization cpu_add_l2_regularization( self.tensors[i], grads[i], self.l2reg) cpu_sgd_update( self.tensors[i], grads[i], self.learning_rate) else: prev_param = self.tensors[i].asnumpy() grad = grads[i].asnumpy( ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy() self.tensors[i][:] = prev_param - \ self.learning_rate * grad class MomentumOptimizer(Optimizer): def __init__(self, learning_rate=0.01, momentum=0.9, nesterov=False, l2reg=0): super(MomentumOptimizer, self).__init__(learning_rate, l2reg) self.momentum = momentum self.nesterov = nesterov self.name = "Momentum" def get_config(self): return (ctypes.c_int(self.nesterov + 1), (ctypes.c_float * 2)(self.learning_rate, self.momentum), ctypes.c_int(2)) def initiate_states(self, config): super().initiate_states(config) self.velocity = [] for t in self.tensors: self.velocity.append(None if t is None else ndarray.array( np.zeros(t.shape, dtype=np.float32), t.ctx)) def update(self, grads, stream_handle=None): assert self.initiated is True params_size = len(self.params) assert params_size == len(grads) for i in range(params_size): if grads[i] == None: continue if self.params[i].on_gpu: assert isinstance(self.tensors[i], ndarray.NDArray) assert isinstance( grads[i], (ndarray.NDArray, ndarray.IndexedSlices)) assert isinstance(self.velocity[i], ndarray.NDArray) if self.l2reg > 0: gpu_op.add_l2_regularization( self.tensors[i], grads[i], self.l2reg, stream_handle) gpu_op.momentum_update(self.tensors[i], grads[i], self.velocity[i], self.learning_rate, self.momentum, self.nesterov, stream_handle) else: if isinstance(grads[i], ndarray.IndexedSlices): raise NotImplementedError else: from ._base import DNNL_LIB if DNNL_LIB['cpu_MomentumOptimizerUpdate']: from .cpu_links import momentum_update as cpu_momentum_update if self.l2reg > 0: from .cpu_links import add_l2_regularization as cpu_add_l2_regularization cpu_add_l2_regularization( self.tensors[i], grads[i], self.l2reg) cpu_momentum_update(self.tensors[i], grads[i], self.velocity[i], self.learning_rate, self.momentum, self.nesterov) else: prev_param = self.tensors[i].asnumpy() grad = grads[i].asnumpy( ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy() velo = self.velocity[i].asnumpy() if self.nesterov: lr_grads = -self.learning_rate * grad self.velocity[i][:] = self.momentum * \ (velo + lr_grads) self.tensors[i][:] = prev_param + velo + lr_grads else: self.velocity[i][:] = self.momentum * \ velo - self.learning_rate * grad self.tensors[i][:] = prev_param + velo class AdaGradOptimizer(Optimizer): def __init__(self, learning_rate=0.01, initial_accumulator_value=0.0, eps=1e-7, l2reg=0): assert initial_accumulator_value >= 0.0, \ "initial accumulator value must be non-negative" assert eps > 0.0, \ "epsilon must be positive" super(AdaGradOptimizer, self).__init__(learning_rate, l2reg) self.initial_accumulator_value = initial_accumulator_value self.eps = eps self.name = "AdaGrad" def get_config(self): return (ctypes.c_int(3), (ctypes.c_float * 3)(self.learning_rate, self.initial_accumulator_value, self.eps), ctypes.c_int(3)) def initiate_states(self, config): super().initiate_states(config) self.accumulator_value = [] for t in self.tensors: self.accumulator_value.append(None if t is None else ndarray.array( np.full(t.shape, self.initial_accumulator_value), t.ctx)) def update(self, grads, stream_handle=None): assert self.initiated is True params_size = len(self.params) assert params_size == len(grads) for i in range(params_size): if grads[i] == None: continue if self.params[i].on_gpu: assert isinstance(self.tensors[i], ndarray.NDArray) assert isinstance( grads[i], (ndarray.NDArray, ndarray.IndexedSlices)) if self.l2reg > 0: gpu_op.add_l2_regularization( self.tensors[i], grads[i], self.l2reg, stream_handle) gpu_op.adagrad_update(self.tensors[i], grads[i], self.accumulator_value[i], self.learning_rate, self.eps, stream_handle) else: if isinstance(grads[i], ndarray.IndexedSlices): raise NotImplementedError else: from ._base import DNNL_LIB if DNNL_LIB['cpu_AdaGradOptimizerUpdate']: from .cpu_links import adagrad_update as cpu_adagrad_update if self.l2reg > 0: from .cpu_links import add_l2_regularization as cpu_add_l2_regularization cpu_add_l2_regularization( self.tensors[i], grads[i], self.l2reg) cpu_adagrad_update( self.tensors[i], grads[i], self.accumulator_value[i], self.learning_rate, self.eps) else: prev_param = self.tensors[i].asnumpy() grad = grads[i].asnumpy( ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy() self.accumulator_value[i][:] = self.accumulator_value[i].asnumpy( ) + np.power(grad, 2) self.tensors[i][:] = \ prev_param - self.learning_rate * grad / \ (np.sqrt( self.accumulator_value[i].asnumpy()) + self.eps) class AdamOptimizer(Optimizer): def __init__(self, learning_rate=0.01, beta1=0.9, beta2=0.999, epsilon=1e-7, l2reg=0): super(AdamOptimizer, self).__init__(learning_rate, l2reg) self.beta1 = beta1 self.beta1_t = 1.0 self.beta2 = beta2 self.beta2_t = 1.0 self.epsilon = epsilon self.name = "Adam" def get_config(self): return (ctypes.c_int(4), (ctypes.c_float * 4)(self.learning_rate, self.beta1, self.beta2, self.epsilon), ctypes.c_int(4)) def initiate_states(self, config): super().initiate_states(config) self.m = [] self.v = [] for t in self.tensors: self.m.append(None if t is None else ndarray.array( np.zeros(t.shape), t.ctx)) self.v.append(None if t is None else ndarray.array( np.zeros(t.shape), t.ctx)) def update(self, grads, stream_handle=None): assert self.initiated is True params_size = len(self.tensors) assert params_size == len(grads) self.beta1_t *= self.beta1 self.beta2_t *= self.beta2 for i in range(params_size): if grads[i] == None: continue if self.params[i].on_gpu: assert isinstance(self.tensors[i], ndarray.NDArray) assert isinstance( grads[i], (ndarray.NDArray, ndarray.IndexedSlices)) assert isinstance(self.m[i], ndarray.NDArray) assert isinstance(self.v[i], ndarray.NDArray) if self.l2reg > 0: gpu_op.add_l2_regularization( self.tensors[i], grads[i], self.l2reg, stream_handle) gpu_op.adam_update(self.tensors[i], grads[i], self.m[i], self.v[i], self.learning_rate, self.beta1, self.beta2, self.beta1_t, self.beta2_t, self.epsilon, stream_handle) else: if isinstance(grads[i], ndarray.IndexedSlices): raise NotImplementedError else: from ._base import DNNL_LIB if DNNL_LIB['cpu_AdamOptimizerUpdate']: from .cpu_links import adam_update as cpu_adam_update if self.l2reg > 0: from .cpu_links import add_l2_regularization as cpu_add_l2_regularization cpu_add_l2_regularization( self.tensors[i], grads[i], self.l2reg) cpu_adam_update(self.tensors[i], grads[i], self.m[i], self.v[i], self.learning_rate, self.beta1, self.beta2, self.beta1_t, self.beta2_t, self.epsilon) else: prev_param = self.tensors[i].asnumpy() grad = grads[i].asnumpy( ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy() self.m[i][:] = self.beta1 * \ self.m[i].asnumpy() + (1 - self.beta1) * grad self.v[i][:] = self.beta2 * self.v[i].asnumpy() + \ (1 - self.beta2) * grad * grad mc = self.m[i].asnumpy() / (1 - self.beta1_t) vc = self.v[i].asnumpy() / (1 - self.beta2_t) self.tensors[i][:] = prev_param - \ self.learning_rate * mc / \ (np.sqrt(vc) + self.epsilon)