You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ndarray.py 18 kB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. from __future__ import absolute_import
  2. from ._base import _LIB, check_call, c_array
  3. import ctypes
  4. import numpy as np
  5. import scipy.sparse
  6. import socket
  7. class DLContext(ctypes.Structure):
  8. """DL context strucure."""
  9. _fields_ = [("device_id", ctypes.c_int),
  10. ("device_type", ctypes.c_int)]
  11. MASK2STR = {
  12. 1: 'cpu',
  13. 2: 'gpu',
  14. }
  15. def __init__(self, device_id, device_type, hostname='localhost'):
  16. super(DLContext, self).__init__()
  17. self.device_id = device_id
  18. self.device_type = device_type
  19. if hostname in ('localhost', socket.gethostname()):
  20. self.hostname = 'localhost'
  21. self.local = True
  22. else:
  23. self.hostname = hostname
  24. self.local = False
  25. def __repr__(self):
  26. if not hasattr(self, 'local') or self.local:
  27. return "%s(%d)" % (
  28. DLContext.MASK2STR[self.device_type], self.device_id)
  29. else:
  30. return "%s:%s(%d)" % (
  31. self.hostname, DLContext.MASK2STR[self.device_type], self.device_id)
  32. def __hash__(self):
  33. if not hasattr(self, 'local') or self.local:
  34. return hash((self.device_type, self.device_id))
  35. else:
  36. return hash((self.hostname, self.device_type, self.device_id))
  37. def __eq__(self, other):
  38. return hash(self) == hash(other)
  39. def __ne__(self, other):
  40. return hash(self) != hash(other)
  41. class DLArray(ctypes.Structure):
  42. """DLArray in C API"""
  43. _fields_ = [("data", ctypes.c_void_p),
  44. ("ctx", DLContext),
  45. ("ndim", ctypes.c_int),
  46. ("shape", ctypes.POINTER(ctypes.c_int64)),
  47. ("stride", ctypes.POINTER(ctypes.c_int64))]
  48. DLArrayHandle = ctypes.POINTER(DLArray)
  49. def cpu(dev_id=0):
  50. """Construct a CPU device
  51. Parameters
  52. ----------
  53. dev_id : int, optional
  54. The integer device id
  55. """
  56. return DLContext(dev_id, 1)
  57. def gpu(dev_id=0):
  58. """Construct a GPU device
  59. Parameters
  60. ----------
  61. dev_id : int, optional
  62. The integer device id
  63. """
  64. return DLContext(dev_id, 2)
  65. def rcpu(hostname, dev_id=0):
  66. """Construct a remote CPU device
  67. Parameters
  68. ----------
  69. hostname: str
  70. The hostname of device
  71. dev_id : int, optional
  72. The integer device id
  73. """
  74. return DLContext(dev_id, 1, hostname=hostname)
  75. def rgpu(hostname, dev_id=0):
  76. """Construct a remote GPU device
  77. Parameters
  78. ----------
  79. hostname: str
  80. The hostname of device
  81. dev_id : int, optional
  82. The integer device id
  83. """
  84. return DLContext(dev_id, 2, hostname=hostname)
  85. def is_gpu_ctx(ctx):
  86. """Return if context is GPU context.
  87. Parameters
  88. ----------
  89. ctx : DLContext
  90. The query context
  91. """
  92. return ctx and ctx.device_type == 2
  93. def shape_to_stride(shape):
  94. """Return the stride.
  95. Parameters
  96. ----------
  97. shape : tuple(int)
  98. The shape tuple
  99. """
  100. ndim = len(shape)
  101. stride = [1] * ndim
  102. for i in range(ndim-1, 0, -1):
  103. stride[i-1] = stride[i] * shape[i]
  104. return tuple(stride)
  105. class NDArray(object):
  106. """Lightweight NDArray class of DL runtime.
  107. Strictly this is only an Array Container(a buffer object)
  108. No arthimetic operations are defined.
  109. """
  110. __slots__ = ["handle", "no_free"]
  111. def __init__(self, handle):
  112. """Initialize the function with handle
  113. Parameters
  114. ----------
  115. handle : DLArrayHandle
  116. the handle to the underlying C++ DLArray
  117. """
  118. self.handle = handle
  119. self.no_free = False
  120. def __del__(self):
  121. if self.no_free:
  122. return
  123. check_call(_LIB.DLArrayFree(self.handle))
  124. @property
  125. def shape(self):
  126. """Shape of this array"""
  127. return tuple(self.handle.contents.shape[i]
  128. for i in range(self.handle.contents.ndim))
  129. @property
  130. def stride(self):
  131. """Stride of this array"""
  132. return tuple(self.handle.contents.stride[i]
  133. for i in range(self.handle.contents.ndim))
  134. @property
  135. def lazy(self):
  136. """Whether this array is lazy"""
  137. return not self.stride == shape_to_stride(self.shape)
  138. @property
  139. def ctx(self):
  140. """context of this array"""
  141. return self.handle.contents.ctx
  142. def __setitem__(self, in_slice, value):
  143. """Set ndarray value"""
  144. if (not isinstance(in_slice, slice) or
  145. in_slice.start is not None
  146. or in_slice.stop is not None):
  147. raise ValueError('Array only support set from numpy array')
  148. if isinstance(value, NDArray):
  149. if value.handle is not self.handle:
  150. value.copyto(self)
  151. elif isinstance(value, (np.ndarray, np.generic)):
  152. self._sync_copyfrom(value)
  153. else:
  154. raise TypeError('type %s not supported' % str(type(value)))
  155. def _sync_copyfrom(self, source_array, data_type=np.float32):
  156. """Peform an synchronize copy from the array.
  157. Parameters
  158. ----------
  159. source_array : array_like
  160. The data source we should like to copy from.
  161. """
  162. if not isinstance(source_array, np.ndarray):
  163. try:
  164. source_array = np.array(source_array, dtype=data_type)
  165. except:
  166. raise TypeError('array must be an array_like data,' +
  167. 'type %s is not supported'
  168. % str(type(source_array)))
  169. source_array = np.ascontiguousarray(source_array, dtype=data_type)
  170. if source_array.shape != self.shape:
  171. raise ValueError('array shape do not match the shape of NDArray')
  172. source_arr, shape, stride = NDArray._numpyasarray(source_array)
  173. check_call(_LIB.DLArrayCopyFromTo(
  174. ctypes.byref(source_arr), self.handle, None))
  175. # de-allocate shape until now
  176. _ = shape
  177. _ = stride
  178. def _async_copyfrom(self, source_array, stream_handle, event_handle=None):
  179. """Peform an asynchronize copy from the array.
  180. Parameters
  181. ----------
  182. source_array : array_like
  183. The data source we should like to copy from.
  184. """
  185. check_call(_LIB.DLArrayCopyFromTo(
  186. source_array.handle, self.handle, stream_handle.handle))
  187. if not event_handle is None:
  188. check_call(_LIB.DLEventRecord(
  189. stream_handle.handle, event_handle.handle))
  190. def async_h2d(self, source_array, stream_handle, event_handle=None):
  191. if isinstance(source_array, np.ndarray):
  192. source_array = array(source_array, cpu(0))
  193. assert self.handle.contents.ctx.device_type == 2
  194. assert source_array.handle.contents.ctx.device_type == 1
  195. assert stream_handle
  196. self._async_copyfrom(source_array, stream_handle, event_handle)
  197. def async_d2h(self, source_array, stream_handle, event_handle=None):
  198. assert self.handle.contents.ctx.device_type == 1
  199. assert source_array.handle.contents.ctx.device_type == 2
  200. assert stream_handle
  201. self._async_copyfrom(source_array, stream_handle, event_handle)
  202. @staticmethod
  203. def _numpyasarray(np_data):
  204. """Return a DLArray representation of a numpy array."""
  205. data = np_data
  206. assert data.flags['C_CONTIGUOUS']
  207. arr = DLArray()
  208. shape = c_array(ctypes.c_int64, data.shape)
  209. stride = c_array(ctypes.c_int64, shape_to_stride(data.shape))
  210. arr.data = data.ctypes.data_as(ctypes.c_void_p)
  211. arr.shape = shape
  212. arr.stride = stride
  213. arr.ndim = data.ndim
  214. # CPU device
  215. arr.ctx = cpu(0)
  216. return arr, shape, stride
  217. def asnumpy(self):
  218. """Convert this array to numpy array
  219. Returns
  220. -------
  221. np_arr : numpy.ndarray
  222. The corresponding numpy array.
  223. """
  224. self.wrapped_lazy_callback()
  225. np_arr = np.empty(self.shape, dtype=np.float32)
  226. arr, shape, stride = NDArray._numpyasarray(np_arr)
  227. check_call(_LIB.DLArrayCopyFromTo(
  228. self.handle, ctypes.byref(arr), None))
  229. _ = shape
  230. _ = stride
  231. return np_arr
  232. def copyto(self, target):
  233. """Copy array to target
  234. Parameters
  235. ----------
  236. target : NDArray
  237. The target array to be copied, must have same shape as this array.
  238. """
  239. self.wrapped_lazy_callback()
  240. if isinstance(target, DLContext):
  241. target = empty(self.shape, target)
  242. if isinstance(target, NDArray):
  243. check_call(_LIB.DLArrayCopyFromTo(
  244. self.handle, target.handle, None))
  245. else:
  246. raise ValueError("Unsupported target type %s" % str(type(target)))
  247. return target
  248. def reshape(self, shape, target):
  249. """Reshape the array to target array.
  250. Parameters
  251. ----------
  252. shape : tuple (int)
  253. The target shape.
  254. target : NDArray
  255. The target array.
  256. """
  257. self.wrapped_lazy_callback()
  258. arr = DLArray()
  259. arr.data = self.handle.contents.data
  260. arr.ctx = self.handle.contents.ctx
  261. arr.ndim = len(shape)
  262. arr.shape = c_array(ctypes.c_int64, shape)
  263. arr.stride = c_array(ctypes.c_int64, shape_to_stride(shape))
  264. target.handle = ctypes.pointer(arr)
  265. target.no_free = True
  266. def broadcast_to(self, shape, target, add_axes=None):
  267. """Broadcast the array to target array (lazy).
  268. Parameters
  269. ----------
  270. shape : tuple (int)
  271. The target shape.
  272. target : NDArray
  273. The target array.
  274. add_axes(Optional): list (int)
  275. Add axes if needed, using index of shape parameter.
  276. This is for gradient node of reduce_sum_op when there exists keepdims == False.
  277. """
  278. if add_axes is None:
  279. add_axes = []
  280. arr_ndim = len(shape)
  281. self_ndim = len(self.shape) + len(add_axes)
  282. ori_self_shape = list(self.shape)
  283. ori_self_stride = list(self.stride)
  284. if self_ndim > arr_ndim:
  285. assert self_ndim == arr_ndim + 1 and tuple(self.shape) == (1,)
  286. ori_self_shape = []
  287. ori_self_stride = []
  288. self_ndim = len(ori_self_shape)
  289. self_shape = [1] * arr_ndim
  290. self_stride = [0] * arr_ndim
  291. idx = self_ndim - 1
  292. target_stride = [0] * arr_ndim
  293. rule = True
  294. for i in range(arr_ndim):
  295. pos = arr_ndim - 1 - i
  296. if pos not in add_axes and idx >= 0:
  297. self_shape[pos] = ori_self_shape[idx]
  298. self_stride[pos] = ori_self_stride[idx]
  299. idx -= 1
  300. if self_shape[pos] == shape[pos]:
  301. target_stride[pos] = self_stride[pos]
  302. elif self_shape[pos] != 1:
  303. rule = False
  304. break
  305. assert rule
  306. arr = DLArray()
  307. arr.data = self.handle.contents.data
  308. arr.ctx = self.handle.contents.ctx
  309. arr.ndim = arr_ndim
  310. arr.shape = c_array(ctypes.c_int64, tuple(shape))
  311. arr.stride = c_array(ctypes.c_int64, tuple(target_stride))
  312. target.handle = ctypes.pointer(arr)
  313. target.no_free = True
  314. def lazy_callback(self, stream=None):
  315. assert self.handle.contents.ctx.device_type == 2
  316. assert self.lazy
  317. shape = c_array(ctypes.c_int64, self.shape)
  318. stride = c_array(ctypes.c_int64, shape_to_stride(self.shape))
  319. ndim = ctypes.c_int(len(self.shape))
  320. handle = DLArrayHandle()
  321. check_call(_LIB.DLArrayAlloc(shape, stride, ndim,
  322. self.handle.contents.ctx, ctypes.byref(handle)))
  323. check_call(_LIB.DLGpuArrayLazyCallback(
  324. self.handle, handle, stream.handle if stream else None))
  325. self.handle = handle
  326. def wrapped_lazy_callback(self, stream=None):
  327. # TODO: reshape / copyto / asnumpy may have more efficient implementation
  328. # This is just a workaround.
  329. if self.lazy:
  330. # here we move the judgement for lazy into forward hooks, shouldn't have callbacks.
  331. assert False
  332. self.lazy_callback(stream)
  333. def array(arr, ctx, data_type=np.float32):
  334. """Create an array from source arr.
  335. Parameters
  336. ----------
  337. arr : numpy.ndarray
  338. The array to be copied from
  339. ctx : DLContext, optional
  340. The device context to create the array
  341. Returns
  342. -------
  343. ret : NDArray
  344. The created array
  345. """
  346. if not isinstance(arr, np.ndarray):
  347. arr = np.array(arr, dtype=data_type)
  348. ret = empty(arr.shape, ctx)
  349. ret._sync_copyfrom(arr, data_type=data_type)
  350. return ret
  351. def empty(shape, ctx=cpu(0)):
  352. """Create an empty array given shape and device
  353. Parameters
  354. ----------
  355. shape : tuple of int
  356. The shape of the array
  357. ctx : DLContext
  358. The context of the array
  359. Returns
  360. -------
  361. arr : ndarray
  362. The array hetusys supported.
  363. """
  364. shape = c_array(ctypes.c_int64, shape)
  365. stride = c_array(ctypes.c_int64, shape_to_stride(shape))
  366. ndim = ctypes.c_int(len(shape))
  367. handle = DLArrayHandle()
  368. check_call(_LIB.DLArrayAlloc(
  369. shape, stride, ndim, ctx, ctypes.byref(handle)))
  370. return NDArray(handle)
  371. def numpyasdlarrayhandle(data):
  372. if not data.flags['C_CONTIGUOUS']:
  373. data = np.ascontiguousarray(data)
  374. arr = DLArray()
  375. shape = c_array(ctypes.c_int64, data.shape)
  376. arr.data = data.ctypes.data_as(ctypes.c_void_p)
  377. arr.shape = shape
  378. arr.stride = c_array(ctypes.c_int64, shape_to_stride(data.shape))
  379. arr.ndim = data.ndim
  380. arr.ctx = cpu(0)
  381. return arr
  382. class ND_Sparse_Array(object):
  383. __slots__ = ["data", "row", "col", "nrow", "ncol", "lazy"]
  384. def __init__(self, data, row, col, nrow, ncol):
  385. self.data = data
  386. self.row = row
  387. self.col = col
  388. self.nrow = nrow
  389. self.ncol = ncol
  390. self.lazy = False
  391. @property
  392. def shape(self):
  393. """Shape of this array"""
  394. return tuple((self.nrow, self.ncol))
  395. def sparse_array(values, indices, shape, ctx=cpu(0)):
  396. """Create an sparse array from source arrs.
  397. ----------
  398. values : numpy.ndarray
  399. The value array to be copied from
  400. indices : tuple(numpy.ndarray, numpy.ndarray)
  401. The index array to be copied from
  402. ctx : DLContext, optional
  403. The device context to create the array
  404. Returns
  405. -------
  406. ret : NDArray
  407. The created array
  408. """
  409. assert len(shape) == len(indices) == 2
  410. assert len(values) == len(indices[0]) == len(indices[1])
  411. assert isinstance(indices, tuple)
  412. mat = scipy.sparse.csr_matrix((values, indices), shape)
  413. values = mat.data
  414. rows = mat.indptr
  415. cols = mat.indices
  416. values_ret = empty(values.shape, ctx)
  417. values_ret._sync_copyfrom(values)
  418. row_ret = empty(rows.shape, ctx)
  419. row_ret._sync_copyfrom(rows, np.int32)
  420. col_ret = empty(cols.shape, ctx)
  421. col_ret._sync_copyfrom(cols, np.int32)
  422. return ND_Sparse_Array(values_ret, row_ret, col_ret, shape[0], shape[1])
  423. class IndexedSlices(object):
  424. __slots__ = ["indices", "values", "dense_shape", "deduplicated", "lazy"]
  425. def __init__(self, indices=None, values=None, dense_shape=None):
  426. self.indices = indices
  427. self.values = values
  428. self.dense_shape = dense_shape
  429. self.deduplicated = False
  430. self.lazy = False
  431. def get_dense_shape(self):
  432. assert self.dense_shape is not None
  433. return self.dense_shape
  434. def get_sparse_shape(self):
  435. assert isinstance(self.values, NDArray)
  436. return self.values.shape
  437. def update(self, indices, values, dense_shape):
  438. self.indices = indices
  439. self.values = values
  440. if self.dense_shape is not None:
  441. assert tuple(self.dense_shape) == tuple(dense_shape)
  442. else:
  443. self.dense_shape = dense_shape
  444. def deduplicate(self, stream):
  445. assert is_gpu_ctx(self.indices.ctx)
  446. np_indices = self.indices.asnumpy()
  447. unique_indices, inverse = np.unique(np_indices, return_inverse=True)
  448. indices_on_ctx = array(unique_indices, ctx=self.indices.ctx)
  449. self.indices = indices_on_ctx
  450. inverse_on_ctx = array(inverse, ctx=self.indices.ctx)
  451. new_value_shape = list(unique_indices.shape)
  452. new_value_shape.append(self.values.shape[-1])
  453. new_values = empty(new_value_shape, ctx=self.values.ctx)
  454. _LIB.DLGpuArraySet(new_values.handle, ctypes.c_float(
  455. 0), stream.handle if stream else None)
  456. _LIB.DeduplicateIndexedSlices(
  457. self.values.handle, inverse_on_ctx.handle, new_values.handle, stream.handle if stream else None)
  458. self.values = new_values
  459. self.deduplicated = True
  460. def cpu_deduplicate(self):
  461. assert not is_gpu_ctx(self.indices.ctx)
  462. np_indices = self.indices.asnumpy()
  463. unique_indices, inverse = np.unique(np_indices, return_inverse=True)
  464. new_value_shape = list(unique_indices.shape)
  465. last_dim = self.values.shape[-1]
  466. new_value_shape.append(last_dim)
  467. new_values = np.zeros(new_value_shape).astype(np.float32)
  468. flatten_ind = np_indices.reshape(-1)
  469. flatten = self.values.asnumpy().reshape((-1, last_dim))
  470. for i, ind in enumerate(inverse):
  471. new_values[ind] += flatten[i]
  472. self.values = array(new_values, cpu(0))
  473. self.indices = array(unique_indices, cpu(0))
  474. self.deduplicated = True
  475. def free_deduplicate(self):
  476. if self.deduplicated:
  477. del self.indices
  478. del self.values
  479. self.indices = None
  480. self.values = None
  481. self.deduplicated = False