You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

comm_ops.py 21 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """comm_ops"""
  16. from ..._checkparam import Validator as validator
  17. from ..._checkparam import Rel
  18. from ...communication.management import get_rank, get_group_size, GlobalComm, _get_group
  19. from ...common import dtype as mstype
  20. from ..primitive import PrimitiveWithInfer, prim_attr_register
  21. class ReduceOp:
  22. """
  23. Operation options for reduce tensors.
  24. There are four kinds of operation options, "SUM","MAX","MIN","PROD".
  25. - SUM: Take the sum.
  26. - MAX: Take the maximum.
  27. - MIN: Take the minimum.
  28. - PROD: Take the product.
  29. """
  30. SUM = "sum"
  31. MAX = "max"
  32. MIN = "min"
  33. PROD = "prod"
  34. target_dtypes = (mstype.int8, mstype.int32, mstype.float16, mstype.float32)
  35. class AllReduce(PrimitiveWithInfer):
  36. """
  37. Reduces the tensor data across all devices in such a way that all devices will get the same final result.
  38. Note:
  39. The operation of AllReduce does not support "prod" currently.
  40. Tensor must have same shape and format in all processes participating in the collective.
  41. Args:
  42. op (str): Specifies an operation used for element-wise reductions,
  43. like sum, max, min. Default: ReduceOp.SUM.
  44. group (str): The communication group to work on. Default: "hccl_world_group".
  45. Raises:
  46. TypeError: If any of op and group is not a string
  47. or fusion is not a integer or the input's dtype is bool.
  48. ValueError: If op is "prod"
  49. Inputs:
  50. - **input_x** (Tensor) - The shape of tensor is :math:`(x_1, x_2, ..., x_R)`.
  51. Outputs:
  52. Tensor, has the same shape of the input, i.e., :math:`(x_1, x_2, ..., x_R)`.
  53. The contents depend on the specified operation.
  54. Examples:
  55. >>> from mindspore.communication import init
  56. >>> from mindspore import Tensor
  57. >>> from mindspore.ops.operations.comm_ops import ReduceOp
  58. >>> import mindspore.nn as nn
  59. >>> import mindspore.ops.operations as P
  60. >>>
  61. >>> init('nccl')
  62. >>> class Net(nn.Cell):
  63. >>> def __init__(self):
  64. >>> super(Net, self).__init__()
  65. >>> self.allreduce_sum = P.AllReduce(ReduceOp.SUM, group="nccl_world_group")
  66. >>>
  67. >>> def construct(self, x):
  68. >>> return self.allreduce_sum(x)
  69. >>>
  70. >>> input_ = Tensor(np.ones([2, 8]).astype(np.float32))
  71. >>> net = Net()
  72. >>> output = net(input_)
  73. """
  74. @prim_attr_register
  75. def __init__(self, op=ReduceOp.SUM, group=GlobalComm.WORLD_COMM_GROUP):
  76. if not isinstance(op, type(ReduceOp.SUM)):
  77. raise TypeError("The operation of AllReduce should be str.")
  78. if op == ReduceOp.PROD:
  79. raise RuntimeError("The operation of AllReduce 'prod' is not supported yet.")
  80. if not isinstance(_get_group(group), str):
  81. raise TypeError("The group of AllReduce should be str.")
  82. self.op = op
  83. self.add_prim_attr('group', _get_group(group))
  84. self.add_prim_attr('fusion', 0)
  85. def vm_impl(self, x):
  86. """Implement by vm mode."""
  87. x = x.asnumpy()
  88. return Tensor(x)
  89. def infer_shape(self, x_shape):
  90. return x_shape
  91. def infer_dtype(self, x_dtype):
  92. validator.check_tensor_type_same({'x': x_dtype}, target_dtypes, self.name)
  93. return x_dtype
  94. class AllGather(PrimitiveWithInfer):
  95. """
  96. Gathers tensors from the specified communication group.
  97. Note:
  98. Tensor must have the same shape and format in all processes participating in the collective.
  99. Args:
  100. group (str): The communication group to work on. Default: "hccl_world_group".
  101. Raises:
  102. TypeError: If group is not a string.
  103. ValueError: If the local rank id of the calling process in the group
  104. is larger than the group's rank size.
  105. Inputs:
  106. - **input_x** (Tensor) - The shape of tensor is :math:`(x_1, x_2, ..., x_R)`.
  107. Outputs:
  108. Tensor. If the number of devices in the group is N,
  109. then the shape of output is :math:`(N, x_1, x_2, ..., x_R)`.
  110. Examples:
  111. >>> import mindspore.ops.operations as P
  112. >>> import mindspore.nn as nn
  113. >>> from mindspore.communication import init
  114. >>> from mindspore import Tensor
  115. >>>
  116. >>> init('nccl')
  117. >>> class Net(nn.Cell):
  118. >>> def __init__(self):
  119. >>> super(Net, self).__init__()
  120. >>> self.allgather = P.AllGather(group="nccl_world_group")
  121. >>>
  122. >>> def construct(self, x):
  123. >>> return self.allgather(x)
  124. >>>
  125. >>> input_ = Tensor(np.ones([2, 8]).astype(np.float32))
  126. >>> net = Net()
  127. >>> output = net(input_)
  128. """
  129. @prim_attr_register
  130. def __init__(self, group=GlobalComm.WORLD_COMM_GROUP):
  131. validator.check_value_type('group', _get_group(group), (str,), self.name)
  132. self.rank = get_rank(_get_group(group))
  133. self.rank_size = get_group_size(_get_group(group))
  134. validator.check('rank', self.rank, 'rank_size', self.rank_size, Rel.LT, self.name)
  135. self.add_prim_attr('rank_size', self.rank_size)
  136. self.add_prim_attr('group', _get_group(group))
  137. def infer_shape(self, x_shape):
  138. validator.check_integer("x shape", len(x_shape), 0, Rel.GT, self.name)
  139. x_shape[0] = x_shape[0] * self.rank_size
  140. return x_shape
  141. def infer_dtype(self, x_dtype):
  142. validator.check_tensor_type_same({'x': x_dtype}, target_dtypes, self.name)
  143. return x_dtype
  144. def __call__(self, tensor):
  145. raise NotImplementedError
  146. class HostAllGather(PrimitiveWithInfer):
  147. """
  148. Gathers tensors from the specified communication group on host.
  149. Note:
  150. Tensor must have the same shape and format in all processes participating in the collective.
  151. HostAllGather is a host-side operator, it depends on OpenMPI and must use build option -M on
  152. to enable it. Using mpirun command to run it:
  153. mpirun -output-filename log -merge-stderr-to-stdout -np 3 python test_host_all_gather.py
  154. Args:
  155. group (Union[tuple[int],list[int]]): The rand_ids of communication group to work on.
  156. Raises:
  157. TypeError: If group is not a list nor tuple, or elements of group are not int.
  158. ValueError: If group is not set, or rank_id from group not in [0, 7].
  159. Inputs:
  160. - **input_x** (Tensor) - The shape of tensor is :math:`(x_1, x_2, ..., x_R)`.
  161. Outputs:
  162. Tensor. If the number of devices in the group is N,
  163. then the shape of output is :math:`(N, x_1, x_2, ..., x_R)`.
  164. Examples:
  165. >>> import mindspore.nn as nn
  166. >>> import mindspore.context as context
  167. >>> import mindspore.ops.operations as P
  168. >>> from mindspore import Tensor
  169. >>>
  170. >>> context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
  171. >>> context.set_mpi_config(enable_mpi=True)
  172. >>>
  173. >>> class Net(nn.Cell):
  174. >>> def __init__(self):
  175. >>> super(Net, self).__init__()
  176. >>> self.hostallgather = P.HostAllGather(group=(0, 1, 2, 3))
  177. >>>
  178. >>> def construct(self, x):
  179. >>> return self.hostallgather(x)
  180. >>>
  181. >>> input_ = Tensor(np.ones([2, 8]).astype(np.float32))
  182. >>> net = Net()
  183. >>> output = net(input_)
  184. """
  185. @prim_attr_register
  186. def __init__(self, group=None):
  187. if group is None:
  188. raise ValueError(f"For '{self.name}' group must be set.")
  189. validator.check_value_type('group', group, (tuple, list), self.name)
  190. validator.check_integer("group size", len(group), 2, Rel.GE, self.name)
  191. for r in group:
  192. validator.check_int_range("rank_id", r, 0, 7, Rel.INC_BOTH, self.name)
  193. validator.check_value_type("rank_id", r, (int,), self.name)
  194. self.group_size = len(group)
  195. self.add_prim_attr('group', group)
  196. def infer_shape(self, x_shape):
  197. validator.check_integer("x shape", len(x_shape), 0, Rel.GT, self.name)
  198. x_shape[0] = x_shape[0] * self.group_size
  199. return x_shape
  200. def infer_dtype(self, x_dtype):
  201. validator.check_tensor_type_same({'x': x_dtype}, target_dtypes, self.name)
  202. return x_dtype
  203. def __call__(self, tensor):
  204. raise NotImplementedError
  205. class ReduceScatter(PrimitiveWithInfer):
  206. """
  207. Reduces and scatters tensors from the specified communication group.
  208. Note:
  209. The back propagation of the op is not surported yet. Stay tuned for more.
  210. Tensor must have the same shape and format in all processes participating in the collective.
  211. Args:
  212. op (str): Specifies an operation used for element-wise reductions,
  213. like sum, max, avg. Default: ReduceOp.SUM.
  214. group (str): The communication group to work on. Default: "hccl_world_group".
  215. Raises:
  216. TypeError: If any of op and group is not a string
  217. ValueError: If the first dimension of input can not be divided by rank size.
  218. Examples:
  219. >>> from mindspore import Tensor
  220. >>> from mindspore.communication import init
  221. >>> from mindspore.ops.operations.comm_ops import ReduceOp
  222. >>> import mindspore.nn as nn
  223. >>> import mindspore.ops.operations as P
  224. >>>
  225. >>> init('nccl')
  226. >>> class Net(nn.Cell):
  227. >>> def __init__(self):
  228. >>> super(Net, self).__init__()
  229. >>> self.reducescatter = P.ReduceScatter(ReduceOp.SUM, group="nccl_world_group")
  230. >>>
  231. >>> def construct(self, x):
  232. >>> return self.reducescatter(x)
  233. >>>
  234. >>> input_ = Tensor(np.ones([8, 8]).astype(np.float32))
  235. >>> net = Net()
  236. >>> output = net(input_)
  237. """
  238. @prim_attr_register
  239. def __init__(self, op=ReduceOp.SUM, group=GlobalComm.WORLD_COMM_GROUP):
  240. validator.check_value_type('op', op, (type(ReduceOp.SUM),), self.name)
  241. validator.check_value_type('group', _get_group(group), (str,), self.name)
  242. self.op = op
  243. self.rank_size = get_group_size(_get_group(group))
  244. self.add_prim_attr('rank_size', self.rank_size)
  245. self.add_prim_attr('group', _get_group(group))
  246. def infer_shape(self, x_shape):
  247. if x_shape[0] % self.rank_size != 0:
  248. raise ValueError(f"For '{self.name}' the first dimension of x should be divided by rank_size.")
  249. x_shape[0] = int(x_shape[0]/self.rank_size)
  250. return x_shape
  251. def infer_dtype(self, x_dtype):
  252. validator.check_tensor_type_same({'x': x_dtype}, target_dtypes, self.name)
  253. return x_dtype
  254. def __call__(self, tensor):
  255. raise NotImplementedError
  256. class HostReduceScatter(PrimitiveWithInfer):
  257. """
  258. Reduces and scatters tensors from the specified communication group on host.
  259. Note:
  260. Tensor must have the same shape and format in all processes participating in the collective.
  261. HostReduceScatter is a host-side operator, it depends on OpenMPI and must use build option
  262. -M on to enable it. Using mpirun command to run it:
  263. mpirun -output-filename log -merge-stderr-to-stdout -np 3 python test_host_reduce_scatter.py
  264. Args:
  265. op (str): Specifies an operation used for element-wise reductions,
  266. like sum, max, avg. Default: ReduceOp.SUM.
  267. group (Union[tuple[int],list[int]]): The rand_ids of communication group to work on.
  268. Raises:
  269. TypeError: If op is not a string and group is not a list nor tuple,
  270. or elements of group are not int.
  271. ValueError: If the first dimension of input can not be divided by group size,
  272. or group is not set, or rank_id not in [0, 7].
  273. Examples:
  274. >>> import mindspore.nn as nn
  275. >>> import mindspore.context as context
  276. >>> import mindspore.ops.operations as P
  277. >>> from mindspore import Tensor
  278. >>> from mindspore.ops.operations.comm_ops import ReduceOp
  279. >>>
  280. >>> context.set_context(mode=context.GRAPH_MODE, device_target='CPU')
  281. >>> context.set_mpi_config(enable_mpi=True)
  282. >>>
  283. >>> class Net(nn.Cell):
  284. >>> def __init__(self):
  285. >>> super(Net, self).__init__()
  286. >>> self.hostreducescatter = P.HostReduceScatter(ReduceOp.SUM, group=[0, 1, 2, 3])
  287. >>>
  288. >>> def construct(self, x):
  289. >>> return self.hostreducescatter(x)
  290. >>>
  291. >>> input_ = Tensor(np.ones([8, 8]).astype(np.float32))
  292. >>> net = Net()
  293. >>> output = net(input_)
  294. """
  295. @prim_attr_register
  296. def __init__(self, op=ReduceOp.SUM, group=None):
  297. if group is None:
  298. raise ValueError(f"For '{self.name}' group must be set.")
  299. validator.check_value_type('op', op, (type(ReduceOp.SUM),), self.name)
  300. validator.check_value_type('group', group, (tuple, list), self.name)
  301. validator.check_integer("group size", len(group), 2, Rel.GE, self.name)
  302. for r in group:
  303. validator.check_int_range("rank_id", r, 0, 7, Rel.INC_BOTH, self.name)
  304. validator.check_value_type("rank_id", r, (int,), self.name)
  305. self.op = op
  306. self.group_size = len(group)
  307. self.add_prim_attr('group', group)
  308. def infer_shape(self, x_shape):
  309. if x_shape[0] % self.group_size != 0:
  310. raise ValueError(f"For '{self.name}' the first dimension of x should be divided by group_size.")
  311. x_shape[0] = int(x_shape[0]/self.group_size)
  312. return x_shape
  313. def infer_dtype(self, x_dtype):
  314. validator.check_tensor_type_same({'x': x_dtype}, target_dtypes, self.name)
  315. return x_dtype
  316. def __call__(self, tensor):
  317. raise NotImplementedError
  318. class Broadcast(PrimitiveWithInfer):
  319. """
  320. Broadcasts the tensor to the whole group.
  321. Note:
  322. Tensor must have the same shape and format in all processes participating in the collective.
  323. Args:
  324. root_rank (int): Source rank. Required in all processes except the one
  325. that is sending the data.
  326. group (str): The communication group to work on. Default: "hccl_world_group".
  327. Inputs:
  328. - **input_x** (Tensor) - The shape of tensor is :math:`(x_1, x_2, ..., x_R)`.
  329. Outputs:
  330. Tensor, has the same shape of the input, i.e., :math:`(x_1, x_2, ..., x_R)`.
  331. The contents depend on the data of the `root_rank` device.
  332. Raises:
  333. TypeError: If root_rank is not a integer or group is not a string.
  334. Examples:
  335. >>> from mindspore import Tensor
  336. >>> from mindspore.communication import init
  337. >>> import mindspore.nn as nn
  338. >>> import mindspore.ops.operations as P
  339. >>>
  340. >>> init('nccl')
  341. >>> class Net(nn.Cell):
  342. >>> def __init__(self):
  343. >>> super(Net, self).__init__()
  344. >>> self.broadcast = P.Broadcast(1)
  345. >>>
  346. >>> def construct(self, x):
  347. >>> return self.broadcast((x,))
  348. >>>
  349. >>> input_ = Tensor(np.ones([2, 8]).astype(np.float32))
  350. >>> net = Net()
  351. >>> output = net(input_)
  352. """
  353. @prim_attr_register
  354. def __init__(self, root_rank, group=GlobalComm.WORLD_COMM_GROUP):
  355. validator.check_value_type('root_rank', root_rank, (int,), self.name)
  356. validator.check_value_type('group', _get_group(group), (str,), self.name)
  357. self.add_prim_attr('group', _get_group(group))
  358. def infer_shape(self, x_shape):
  359. return x_shape
  360. def infer_dtype(self, x_dtype):
  361. if not isinstance(x_dtype, tuple):
  362. raise TypeError(f"{self.name}'s input should be a tuple!")
  363. for _ele in x_dtype:
  364. validator.check_tensor_type_same({'x': _ele}, target_dtypes, self.name)
  365. return x_dtype
  366. class _AlltoAll(PrimitiveWithInfer):
  367. """
  368. AlltoAll is a collective operation.
  369. AlltoAll sends data from the all processes to the all processes in the specified group. It has two phases:
  370. - The scatter phase: On each process, the operand is split into split_count number of blocks along the
  371. split_dimensions, and the blocks are scattered to all processes, e.g., the ith block is send to the ith process.
  372. - The gather phase: Each process concatenates the received blocks along the concat_dimension.
  373. Note:
  374. Tensor must have the same shape and format in all processes participating in the collective.
  375. Args:
  376. split_count (int): On each process, divide blocks into split_count number.
  377. split_dim (int): On each process, split blocks along the split_dim.
  378. concat_dim (int): On each process, gather the received blocks along the concat_dimension.
  379. group (str): The communication group to work on. Default: "hccl_world_group".
  380. Raises:
  381. TypeError: If group is not a string.
  382. """
  383. @prim_attr_register
  384. def __init__(self, split_count, split_dim, concat_dim, group=GlobalComm.WORLD_COMM_GROUP):
  385. """init AlltoAll"""
  386. validator.check_value_type('group', _get_group(group), (str,), self.name)
  387. self.split_count = split_count
  388. self.split_dim = split_dim
  389. self.concat_dim = concat_dim
  390. self.add_prim_attr('group', _get_group(group))
  391. def infer_shape(self, x_shape):
  392. x_shape[self.concat_dim] = x_shape[self.concat_dim] * self.split_count
  393. x_shape[self.split_dim] = int(x_shape[self.split_dim] / self.split_count)
  394. return x_shape
  395. def infer_dtype(self, x_dtype):
  396. validator.check_tensor_type_same({'x': x_dtype}, target_dtypes, self.name)
  397. return x_dtype
  398. def __call__(self, tensor):
  399. return
  400. class _MirrorOperator(PrimitiveWithInfer):
  401. """
  402. Auto parallel virtual operator. Do nothing in forward, do all reduce and mean in backward. It is only for
  403. internal use of parallel modules and cannot be called by users.
  404. Args:
  405. group (str): The communication group to work on. Default: None.
  406. dev_num (int): The device number of the group. Default: None.
  407. mean_flag (bool): Whether use mean in backward. Default: None.
  408. """
  409. @prim_attr_register
  410. def __init__(self, group=None, dev_num=None, mean_flag=None):
  411. self.group = group
  412. self.dev_num = dev_num
  413. self.mean_flag = mean_flag
  414. def infer_shape(self, x_shape):
  415. return x_shape
  416. def infer_dtype(self, x_dtype):
  417. return x_dtype
  418. mirror = _MirrorOperator()
  419. class _VirtualDiv(PrimitiveWithInfer):
  420. """
  421. Auto parallel virtual operator. Do nothing in forward, do Div in backward.
  422. Args:
  423. divisor: float32
  424. """
  425. @prim_attr_register
  426. def __init__(self, divisor=None):
  427. self.divisor = divisor
  428. def infer_shape(self, x_shape):
  429. return x_shape
  430. def infer_dtype(self, x_dtype):
  431. return x_dtype
  432. virtual_div = _VirtualDiv()
  433. class _VirtualDataset(PrimitiveWithInfer):
  434. """
  435. Auto parallel virtual dataset operator.
  436. It would insert Broadcast operator in forward computation and be deleted before backward computation.
  437. """
  438. @prim_attr_register
  439. def __init__(self):
  440. """init"""
  441. def infer_shape(self, *args):
  442. if len(args) == 1:
  443. return args[0]
  444. return args
  445. def infer_dtype(self, *args):
  446. if len(args) == 1:
  447. return args[0]
  448. return args
  449. virtual_dataset = _VirtualDataset()
  450. class _GetTensorSlice(PrimitiveWithInfer):
  451. """
  452. Gets tensor slice by device matrix and tensor map.
  453. Args:
  454. dev_mat (tuple): The device matrix of the slice tensor.
  455. tensor_map (tuple): The tensor map of the slice tensor.
  456. """
  457. @prim_attr_register
  458. def __init__(self):
  459. """init ChunkTensor"""
  460. def infer_value(self, x, dev_mat, tensor_map):
  461. from mindspore.parallel._tensor import _load_tensor
  462. validator.check_value_type("dev_mat", dev_mat, [tuple], self.name)
  463. validator.check_value_type("tensor_map", tensor_map, [tuple], self.name)
  464. return _load_tensor(x, dev_mat, tensor_map)