You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

optimizer.py 18 kB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. import numpy as np
  2. import ctypes
  3. import hetu as ht
  4. from . import ndarray
  5. from . import gpu_links as gpu_op
  6. from .lr_scheduler import FixedScheduler
  7. from .gpu_ops.Node import Op
  8. from .gpu_ops.EmbeddingLookUp import EmbeddingLookUp_Gradient
  9. from .gpu_ops.ParameterServerCommunicate import ParameterServerCommunicateOp
  10. from .gpu_ops.Variable import PlaceholderOp
  11. class Optimizer(object):
  12. """Optimizers."""
  13. def __init__(self, learning_rate, l2reg=0):
  14. if isinstance(learning_rate, FixedScheduler):
  15. self.lr_sched = learning_rate
  16. else:
  17. assert learning_rate >= 0, \
  18. "learning rate must be non-negative"
  19. self.lr_sched = FixedScheduler(learning_rate)
  20. # now we don't support l2 regularizer for sparse updates
  21. # TODO: support l2 regularizer for sparse updates
  22. # now we don't support l2 regularizer for PS mode parameters
  23. # TODO: support l2 regularizer for PS mode parameters (after PS mode has optimizer on Servers)
  24. assert l2reg >= 0, 'L2 regularizer should be positive or 0.'
  25. self.l2reg = l2reg
  26. self.params = None
  27. self.tensors = None
  28. self.initiated = False
  29. @property
  30. def learning_rate(self):
  31. return self.lr_sched.get()
  32. @staticmethod
  33. def get_var_list(loss):
  34. def topo_sort_dfs(node, visited, var_list):
  35. if node in visited:
  36. return
  37. visited.add(node)
  38. if isinstance(node, PlaceholderOp) and node.trainable:
  39. var_list.append(node)
  40. return
  41. for n in node.inputs:
  42. topo_sort_dfs(n, visited, var_list)
  43. visited = set()
  44. trainable_vars = []
  45. if isinstance(loss, list):
  46. for l in loss:
  47. topo_sort_dfs(l, visited, trainable_vars)
  48. else:
  49. topo_sort_dfs(loss, visited, trainable_vars)
  50. return trainable_vars
  51. def initiate_states(self, config):
  52. assert not self.initiated, "Optimizer already initiated."
  53. self.tensors = [config.placeholder_to_arr_map[node]
  54. for node in self.params]
  55. self.initiated = True
  56. def minimize(self, loss, var_list=None):
  57. """Return an optimizer op to update parameters.
  58. Parameters
  59. ----------
  60. loss: loss node that we are minimizing.
  61. var_list: list of nodes that we are taking derivative wrt.
  62. Returns
  63. -------
  64. An optimizer node.
  65. """
  66. if not var_list:
  67. var_list = self.get_var_list(loss)
  68. self.params = var_list
  69. grads = ht.gradients(loss, self.params)
  70. optimizer_node = OptimizerOp(grads, self)
  71. return optimizer_node
  72. class OptimizerOp(Op):
  73. def __init__(self, grads, optimizer):
  74. super().__init__(OptimizerOp, grads, None)
  75. self.name = "Optimizer_%s" % (optimizer.name)
  76. self.optimizer = optimizer
  77. def compute(self, input_vals, output_val, stream_handle=None):
  78. assert output_val is None
  79. # For PS op, this input_vals is None
  80. # PS mode doesn't need local update
  81. if self.comm_mode != 'PS':
  82. self.optimizer.update(input_vals, stream_handle)
  83. def gradient(self, output_grad):
  84. raise NotImplementedError
  85. def infer_shape(self, input_shapes):
  86. return None
  87. def forward_hook(self, config):
  88. # disable inplace if not lazy execution
  89. # previously we use array reshape lazy callback to do this, which is deprecated (not efficient)
  90. for node in self.inputs:
  91. node.inplace = False
  92. self.optimizer.initiate_states(config)
  93. self.on_cpu = self.on_gpu = None
  94. self.comm_mode = config.comm_mode
  95. # some things todo.
  96. if self.comm_mode != 'PS':
  97. for i in range(len(self.inputs)):
  98. # Though the gradients for transfer ops are well defined,
  99. # we called gradients in optimizer op before transfer ops are added.
  100. # So here we also add tranfer ops for gradients update.
  101. # Could be optimized later.
  102. if not isinstance(self.inputs[i], ParameterServerCommunicateOp):
  103. paramctx = self.optimizer.params[i].ctx
  104. self.inputs[i] = super().add_transfer_op(
  105. self.inputs[i], paramctx, config.h2d_ops, config.d2h_ops)
  106. def backward_hook(self, config):
  107. self.comm_mode = config.comm_mode
  108. new_inputs = []
  109. for i, node in enumerate(self.inputs):
  110. current_strategy = config.node_strategy.get(
  111. self.optimizer.params[i], self.comm_mode)
  112. if current_strategy == 'AllReduce' or (current_strategy == 'Hybrid' and not isinstance(node, EmbeddingLookUp_Gradient)):
  113. new_inputs.append(ht.allreduceCommunicate_op(
  114. node, config.param_allreduce_group.get(self.optimizer.params[i], config.nccl_comm)))
  115. elif current_strategy == 'PS' or (current_strategy == 'Hybrid' and isinstance(node, EmbeddingLookUp_Gradient)):
  116. new_inputs.append(ht.parameterServerCommunicate_op(
  117. node, self.optimizer.params[i], self.optimizer.get_config()))
  118. else:
  119. new_inputs.append(node)
  120. self.inputs = new_inputs
  121. class SGDOptimizer(Optimizer):
  122. def __init__(self, learning_rate=0.01, l2reg=0):
  123. super(SGDOptimizer, self).__init__(learning_rate, l2reg)
  124. self.name = 'SGD'
  125. def get_config(self):
  126. return (ctypes.c_int(0), (ctypes.c_float * 1)(self.learning_rate), ctypes.c_int(1))
  127. def initiate_states(self, config):
  128. super().initiate_states(config)
  129. def update(self, grads, stream_handle=None):
  130. assert self.initiated is True
  131. params_size = len(self.params)
  132. assert params_size == len(grads)
  133. for i in range(params_size):
  134. if grads[i] == None:
  135. continue
  136. if self.params[i].on_gpu:
  137. assert isinstance(self.tensors[i], ndarray.NDArray)
  138. assert isinstance(
  139. grads[i], (ndarray.NDArray, ndarray.IndexedSlices))
  140. if self.l2reg > 0:
  141. gpu_op.add_l2_regularization(
  142. self.tensors[i], grads[i], self.l2reg, stream_handle)
  143. gpu_op.sgd_update(
  144. self.tensors[i], grads[i], self.learning_rate, stream_handle)
  145. else:
  146. from ._base import DNNL_LIB
  147. if isinstance(grads[i], ndarray.IndexedSlices):
  148. if DNNL_LIB['cpu_SGDOptimizerSparseUpdate']:
  149. from .cpu_links import sgd_update_sparse as cpu_sgd_update_sparse
  150. cpu_sgd_update_sparse(
  151. self.tensors[i], grads[i].indices, grads[i].values, self.learning_rate)
  152. else:
  153. grads[i].cpu_deduplicate()
  154. np_tensor = self.tensors[i].asnumpy()
  155. np_tensor[grads[i].indices.asnumpy().astype(
  156. np.int)] -= self.learning_rate * grads[i].values.asnumpy()
  157. self.tensors[i][:] = np_tensor
  158. grads[i].free_deduplicate()
  159. else:
  160. if DNNL_LIB['cpu_SGDOptimizerUpdate']:
  161. from .cpu_links import sgd_update as cpu_sgd_update
  162. if self.l2reg > 0:
  163. from .cpu_links import add_l2_regularization as cpu_add_l2_regularization
  164. cpu_add_l2_regularization(
  165. self.tensors[i], grads[i], self.l2reg)
  166. cpu_sgd_update(
  167. self.tensors[i], grads[i], self.learning_rate)
  168. else:
  169. prev_param = self.tensors[i].asnumpy()
  170. grad = grads[i].asnumpy(
  171. ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy()
  172. self.tensors[i][:] = prev_param - \
  173. self.learning_rate * grad
  174. class MomentumOptimizer(Optimizer):
  175. def __init__(self, learning_rate=0.01, momentum=0.9, nesterov=False, l2reg=0):
  176. super(MomentumOptimizer, self).__init__(learning_rate, l2reg)
  177. self.momentum = momentum
  178. self.nesterov = nesterov
  179. self.name = "Momentum"
  180. def get_config(self):
  181. return (ctypes.c_int(self.nesterov + 1), (ctypes.c_float * 2)(self.learning_rate, self.momentum), ctypes.c_int(2))
  182. def initiate_states(self, config):
  183. super().initiate_states(config)
  184. self.velocity = []
  185. for t in self.tensors:
  186. self.velocity.append(None if t is None else ndarray.array(
  187. np.zeros(t.shape, dtype=np.float32), t.ctx))
  188. def update(self, grads, stream_handle=None):
  189. assert self.initiated is True
  190. params_size = len(self.params)
  191. assert params_size == len(grads)
  192. for i in range(params_size):
  193. if grads[i] == None:
  194. continue
  195. if self.params[i].on_gpu:
  196. assert isinstance(self.tensors[i], ndarray.NDArray)
  197. assert isinstance(
  198. grads[i], (ndarray.NDArray, ndarray.IndexedSlices))
  199. assert isinstance(self.velocity[i], ndarray.NDArray)
  200. if self.l2reg > 0:
  201. gpu_op.add_l2_regularization(
  202. self.tensors[i], grads[i], self.l2reg, stream_handle)
  203. gpu_op.momentum_update(self.tensors[i], grads[i], self.velocity[i], self.learning_rate, self.momentum,
  204. self.nesterov, stream_handle)
  205. else:
  206. if isinstance(grads[i], ndarray.IndexedSlices):
  207. raise NotImplementedError
  208. else:
  209. from ._base import DNNL_LIB
  210. if DNNL_LIB['cpu_MomentumOptimizerUpdate']:
  211. from .cpu_links import momentum_update as cpu_momentum_update
  212. if self.l2reg > 0:
  213. from .cpu_links import add_l2_regularization as cpu_add_l2_regularization
  214. cpu_add_l2_regularization(
  215. self.tensors[i], grads[i], self.l2reg)
  216. cpu_momentum_update(self.tensors[i], grads[i], self.velocity[i], self.learning_rate, self.momentum,
  217. self.nesterov)
  218. else:
  219. prev_param = self.tensors[i].asnumpy()
  220. grad = grads[i].asnumpy(
  221. ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy()
  222. velo = self.velocity[i].asnumpy()
  223. if self.nesterov:
  224. lr_grads = -self.learning_rate * grad
  225. self.velocity[i][:] = self.momentum * \
  226. (velo + lr_grads)
  227. self.tensors[i][:] = prev_param + velo + lr_grads
  228. else:
  229. self.velocity[i][:] = self.momentum * \
  230. velo - self.learning_rate * grad
  231. self.tensors[i][:] = prev_param + velo
  232. class AdaGradOptimizer(Optimizer):
  233. def __init__(self, learning_rate=0.01, initial_accumulator_value=0.0, eps=1e-7, l2reg=0):
  234. assert initial_accumulator_value >= 0.0, \
  235. "initial accumulator value must be non-negative"
  236. assert eps > 0.0, \
  237. "epsilon must be positive"
  238. super(AdaGradOptimizer, self).__init__(learning_rate, l2reg)
  239. self.initial_accumulator_value = initial_accumulator_value
  240. self.eps = eps
  241. self.name = "AdaGrad"
  242. def get_config(self):
  243. return (ctypes.c_int(3), (ctypes.c_float * 3)(self.learning_rate, self.initial_accumulator_value, self.eps), ctypes.c_int(3))
  244. def initiate_states(self, config):
  245. super().initiate_states(config)
  246. self.accumulator_value = []
  247. for t in self.tensors:
  248. self.accumulator_value.append(None if t is None else ndarray.array(
  249. np.full(t.shape, self.initial_accumulator_value), t.ctx))
  250. def update(self, grads, stream_handle=None):
  251. assert self.initiated is True
  252. params_size = len(self.params)
  253. assert params_size == len(grads)
  254. for i in range(params_size):
  255. if grads[i] == None:
  256. continue
  257. if self.params[i].on_gpu:
  258. assert isinstance(self.tensors[i], ndarray.NDArray)
  259. assert isinstance(
  260. grads[i], (ndarray.NDArray, ndarray.IndexedSlices))
  261. if self.l2reg > 0:
  262. gpu_op.add_l2_regularization(
  263. self.tensors[i], grads[i], self.l2reg, stream_handle)
  264. gpu_op.adagrad_update(self.tensors[i], grads[i], self.accumulator_value[i], self.learning_rate, self.eps,
  265. stream_handle)
  266. else:
  267. if isinstance(grads[i], ndarray.IndexedSlices):
  268. raise NotImplementedError
  269. else:
  270. from ._base import DNNL_LIB
  271. if DNNL_LIB['cpu_AdaGradOptimizerUpdate']:
  272. from .cpu_links import adagrad_update as cpu_adagrad_update
  273. if self.l2reg > 0:
  274. from .cpu_links import add_l2_regularization as cpu_add_l2_regularization
  275. cpu_add_l2_regularization(
  276. self.tensors[i], grads[i], self.l2reg)
  277. cpu_adagrad_update(
  278. self.tensors[i], grads[i], self.accumulator_value[i], self.learning_rate, self.eps)
  279. else:
  280. prev_param = self.tensors[i].asnumpy()
  281. grad = grads[i].asnumpy(
  282. ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy()
  283. self.accumulator_value[i][:] = self.accumulator_value[i].asnumpy(
  284. ) + np.power(grad, 2)
  285. self.tensors[i][:] = \
  286. prev_param - self.learning_rate * grad / \
  287. (np.sqrt(
  288. self.accumulator_value[i].asnumpy()) + self.eps)
  289. class AdamOptimizer(Optimizer):
  290. def __init__(self, learning_rate=0.01, beta1=0.9, beta2=0.999, epsilon=1e-7, l2reg=0):
  291. super(AdamOptimizer, self).__init__(learning_rate, l2reg)
  292. self.beta1 = beta1
  293. self.beta1_t = 1.0
  294. self.beta2 = beta2
  295. self.beta2_t = 1.0
  296. self.epsilon = epsilon
  297. self.name = "Adam"
  298. def get_config(self):
  299. return (ctypes.c_int(4), (ctypes.c_float * 4)(self.learning_rate, self.beta1, self.beta2, self.epsilon), ctypes.c_int(4))
  300. def initiate_states(self, config):
  301. super().initiate_states(config)
  302. self.m = []
  303. self.v = []
  304. for t in self.tensors:
  305. self.m.append(None if t is None else ndarray.array(
  306. np.zeros(t.shape), t.ctx))
  307. self.v.append(None if t is None else ndarray.array(
  308. np.zeros(t.shape), t.ctx))
  309. def update(self, grads, stream_handle=None):
  310. assert self.initiated is True
  311. params_size = len(self.tensors)
  312. assert params_size == len(grads)
  313. self.beta1_t *= self.beta1
  314. self.beta2_t *= self.beta2
  315. for i in range(params_size):
  316. if grads[i] == None:
  317. continue
  318. if self.params[i].on_gpu:
  319. assert isinstance(self.tensors[i], ndarray.NDArray)
  320. assert isinstance(
  321. grads[i], (ndarray.NDArray, ndarray.IndexedSlices))
  322. assert isinstance(self.m[i], ndarray.NDArray)
  323. assert isinstance(self.v[i], ndarray.NDArray)
  324. if self.l2reg > 0:
  325. gpu_op.add_l2_regularization(
  326. self.tensors[i], grads[i], self.l2reg, stream_handle)
  327. gpu_op.adam_update(self.tensors[i], grads[i], self.m[i], self.v[i], self.learning_rate, self.beta1,
  328. self.beta2, self.beta1_t, self.beta2_t, self.epsilon, stream_handle)
  329. else:
  330. if isinstance(grads[i], ndarray.IndexedSlices):
  331. raise NotImplementedError
  332. else:
  333. from ._base import DNNL_LIB
  334. if DNNL_LIB['cpu_AdamOptimizerUpdate']:
  335. from .cpu_links import adam_update as cpu_adam_update
  336. if self.l2reg > 0:
  337. from .cpu_links import add_l2_regularization as cpu_add_l2_regularization
  338. cpu_add_l2_regularization(
  339. self.tensors[i], grads[i], self.l2reg)
  340. cpu_adam_update(self.tensors[i], grads[i], self.m[i], self.v[i], self.learning_rate, self.beta1,
  341. self.beta2, self.beta1_t, self.beta2_t, self.epsilon)
  342. else:
  343. prev_param = self.tensors[i].asnumpy()
  344. grad = grads[i].asnumpy(
  345. ) + self.l2reg * prev_param if self.l2reg > 0 else grads[i].asnumpy()
  346. self.m[i][:] = self.beta1 * \
  347. self.m[i].asnumpy() + (1 - self.beta1) * grad
  348. self.v[i][:] = self.beta2 * self.v[i].asnumpy() + \
  349. (1 - self.beta2) * grad * grad
  350. mc = self.m[i].asnumpy() / (1 - self.beta1_t)
  351. vc = self.v[i].asnumpy() / (1 - self.beta2_t)
  352. self.tensors[i][:] = prev_param - \
  353. self.learning_rate * mc / \
  354. (np.sqrt(vc) + self.epsilon)