|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- from __future__ import absolute_import
-
- from ._base import _LIB, check_call, c_array
- import ctypes
- import numpy as np
- import scipy.sparse
- import socket
-
-
- class DLContext(ctypes.Structure):
- """DL context strucure."""
- _fields_ = [("device_id", ctypes.c_int),
- ("device_type", ctypes.c_int)]
-
- MASK2STR = {
- 1: 'cpu',
- 2: 'gpu',
- }
-
- def __init__(self, device_id, device_type, hostname='localhost'):
- super(DLContext, self).__init__()
- self.device_id = device_id
- self.device_type = device_type
- if hostname in ('localhost', socket.gethostname()):
- self.hostname = 'localhost'
- self.local = True
- else:
- self.hostname = hostname
- self.local = False
-
- def __repr__(self):
- if not hasattr(self, 'local') or self.local:
- return "%s(%d)" % (
- DLContext.MASK2STR[self.device_type], self.device_id)
- else:
- return "%s:%s(%d)" % (
- self.hostname, DLContext.MASK2STR[self.device_type], self.device_id)
-
- def __hash__(self):
- if not hasattr(self, 'local') or self.local:
- return hash((self.device_type, self.device_id))
- else:
- return hash((self.hostname, self.device_type, self.device_id))
-
- def __eq__(self, other):
- return hash(self) == hash(other)
-
- def __ne__(self, other):
- return hash(self) != hash(other)
-
-
- class DLArray(ctypes.Structure):
- """DLArray in C API"""
- _fields_ = [("data", ctypes.c_void_p),
- ("ctx", DLContext),
- ("ndim", ctypes.c_int),
- ("shape", ctypes.POINTER(ctypes.c_int64)),
- ("stride", ctypes.POINTER(ctypes.c_int64))]
-
-
- DLArrayHandle = ctypes.POINTER(DLArray)
-
-
- def cpu(dev_id=0):
- """Construct a CPU device
- Parameters
- ----------
- dev_id : int, optional
- The integer device id
- """
- return DLContext(dev_id, 1)
-
-
- def gpu(dev_id=0):
- """Construct a GPU device
- Parameters
- ----------
- dev_id : int, optional
- The integer device id
- """
- return DLContext(dev_id, 2)
-
-
- def rcpu(hostname, dev_id=0):
- """Construct a remote CPU device
- Parameters
- ----------
- hostname: str
- The hostname of device
- dev_id : int, optional
- The integer device id
- """
- return DLContext(dev_id, 1, hostname=hostname)
-
-
- def rgpu(hostname, dev_id=0):
- """Construct a remote GPU device
- Parameters
- ----------
- hostname: str
- The hostname of device
- dev_id : int, optional
- The integer device id
- """
- return DLContext(dev_id, 2, hostname=hostname)
-
-
- def is_gpu_ctx(ctx):
- """Return if context is GPU context.
- Parameters
- ----------
- ctx : DLContext
- The query context
- """
- return ctx and ctx.device_type == 2
-
-
- def shape_to_stride(shape):
- """Return the stride.
- Parameters
- ----------
- shape : tuple(int)
- The shape tuple
- """
- ndim = len(shape)
- stride = [1] * ndim
- for i in range(ndim-1, 0, -1):
- stride[i-1] = stride[i] * shape[i]
- return tuple(stride)
-
-
- class NDArray(object):
- """Lightweight NDArray class of DL runtime.
- Strictly this is only an Array Container(a buffer object)
- No arthimetic operations are defined.
- """
- __slots__ = ["handle", "no_free"]
-
- def __init__(self, handle):
- """Initialize the function with handle
- Parameters
- ----------
- handle : DLArrayHandle
- the handle to the underlying C++ DLArray
- """
- self.handle = handle
- self.no_free = False
-
- def __del__(self):
- if self.no_free:
- return
- check_call(_LIB.DLArrayFree(self.handle))
-
- @property
- def shape(self):
- """Shape of this array"""
- return tuple(self.handle.contents.shape[i]
- for i in range(self.handle.contents.ndim))
-
- @property
- def stride(self):
- """Stride of this array"""
- return tuple(self.handle.contents.stride[i]
- for i in range(self.handle.contents.ndim))
-
- @property
- def lazy(self):
- """Whether this array is lazy"""
- return not self.stride == shape_to_stride(self.shape)
-
- @property
- def ctx(self):
- """context of this array"""
- return self.handle.contents.ctx
-
- def __setitem__(self, in_slice, value):
- """Set ndarray value"""
- if (not isinstance(in_slice, slice) or
- in_slice.start is not None
- or in_slice.stop is not None):
- raise ValueError('Array only support set from numpy array')
- if isinstance(value, NDArray):
- if value.handle is not self.handle:
- value.copyto(self)
- elif isinstance(value, (np.ndarray, np.generic)):
- self._sync_copyfrom(value)
- else:
- raise TypeError('type %s not supported' % str(type(value)))
-
- def _sync_copyfrom(self, source_array, data_type=np.float32):
- """Peform an synchronize copy from the array.
- Parameters
- ----------
- source_array : array_like
- The data source we should like to copy from.
- """
- if not isinstance(source_array, np.ndarray):
- try:
- source_array = np.array(source_array, dtype=data_type)
- except:
- raise TypeError('array must be an array_like data,' +
- 'type %s is not supported'
- % str(type(source_array)))
- source_array = np.ascontiguousarray(source_array, dtype=data_type)
- if source_array.shape != self.shape:
- raise ValueError('array shape do not match the shape of NDArray')
- source_arr, shape, stride = NDArray._numpyasarray(source_array)
- check_call(_LIB.DLArrayCopyFromTo(
- ctypes.byref(source_arr), self.handle, None))
- # de-allocate shape until now
- _ = shape
- _ = stride
-
- def _async_copyfrom(self, source_array, stream_handle, event_handle=None):
- """Peform an asynchronize copy from the array.
- Parameters
- ----------
- source_array : array_like
- The data source we should like to copy from.
- """
- check_call(_LIB.DLArrayCopyFromTo(
- source_array.handle, self.handle, stream_handle.handle))
- if not event_handle is None:
- check_call(_LIB.DLEventRecord(
- stream_handle.handle, event_handle.handle))
-
- def async_h2d(self, source_array, stream_handle, event_handle=None):
- if isinstance(source_array, np.ndarray):
- source_array = array(source_array, cpu(0))
- assert self.handle.contents.ctx.device_type == 2
- assert source_array.handle.contents.ctx.device_type == 1
- assert stream_handle
- self._async_copyfrom(source_array, stream_handle, event_handle)
-
- def async_d2h(self, source_array, stream_handle, event_handle=None):
- assert self.handle.contents.ctx.device_type == 1
- assert source_array.handle.contents.ctx.device_type == 2
- assert stream_handle
- self._async_copyfrom(source_array, stream_handle, event_handle)
-
- @staticmethod
- def _numpyasarray(np_data):
- """Return a DLArray representation of a numpy array."""
- data = np_data
- assert data.flags['C_CONTIGUOUS']
- arr = DLArray()
- shape = c_array(ctypes.c_int64, data.shape)
- stride = c_array(ctypes.c_int64, shape_to_stride(data.shape))
- arr.data = data.ctypes.data_as(ctypes.c_void_p)
- arr.shape = shape
- arr.stride = stride
- arr.ndim = data.ndim
- # CPU device
- arr.ctx = cpu(0)
- return arr, shape, stride
-
- def asnumpy(self):
- """Convert this array to numpy array
- Returns
- -------
- np_arr : numpy.ndarray
- The corresponding numpy array.
- """
- self.wrapped_lazy_callback()
- np_arr = np.empty(self.shape, dtype=np.float32)
- arr, shape, stride = NDArray._numpyasarray(np_arr)
- check_call(_LIB.DLArrayCopyFromTo(
- self.handle, ctypes.byref(arr), None))
- _ = shape
- _ = stride
- return np_arr
-
- def copyto(self, target):
- """Copy array to target
- Parameters
- ----------
- target : NDArray
- The target array to be copied, must have same shape as this array.
- """
- self.wrapped_lazy_callback()
- if isinstance(target, DLContext):
- target = empty(self.shape, target)
- if isinstance(target, NDArray):
- check_call(_LIB.DLArrayCopyFromTo(
- self.handle, target.handle, None))
- else:
- raise ValueError("Unsupported target type %s" % str(type(target)))
- return target
-
- def reshape(self, shape, target):
- """Reshape the array to target array.
- Parameters
- ----------
- shape : tuple (int)
- The target shape.
- target : NDArray
- The target array.
- """
- self.wrapped_lazy_callback()
- arr = DLArray()
- arr.data = self.handle.contents.data
- arr.ctx = self.handle.contents.ctx
- arr.ndim = len(shape)
- arr.shape = c_array(ctypes.c_int64, shape)
- arr.stride = c_array(ctypes.c_int64, shape_to_stride(shape))
- target.handle = ctypes.pointer(arr)
- target.no_free = True
-
- def broadcast_to(self, shape, target, add_axes=None):
- """Broadcast the array to target array (lazy).
- Parameters
- ----------
- shape : tuple (int)
- The target shape.
- target : NDArray
- The target array.
- add_axes(Optional): list (int)
- Add axes if needed, using index of shape parameter.
- This is for gradient node of reduce_sum_op when there exists keepdims == False.
- """
- if add_axes is None:
- add_axes = []
- arr_ndim = len(shape)
- self_ndim = len(self.shape) + len(add_axes)
- ori_self_shape = list(self.shape)
- ori_self_stride = list(self.stride)
- if self_ndim > arr_ndim:
- assert self_ndim == arr_ndim + 1 and tuple(self.shape) == (1,)
- ori_self_shape = []
- ori_self_stride = []
- self_ndim = len(ori_self_shape)
- self_shape = [1] * arr_ndim
- self_stride = [0] * arr_ndim
- idx = self_ndim - 1
- target_stride = [0] * arr_ndim
- rule = True
- for i in range(arr_ndim):
- pos = arr_ndim - 1 - i
- if pos not in add_axes and idx >= 0:
- self_shape[pos] = ori_self_shape[idx]
- self_stride[pos] = ori_self_stride[idx]
- idx -= 1
- if self_shape[pos] == shape[pos]:
- target_stride[pos] = self_stride[pos]
- elif self_shape[pos] != 1:
- rule = False
- break
- assert rule
- arr = DLArray()
- arr.data = self.handle.contents.data
- arr.ctx = self.handle.contents.ctx
- arr.ndim = arr_ndim
- arr.shape = c_array(ctypes.c_int64, tuple(shape))
- arr.stride = c_array(ctypes.c_int64, tuple(target_stride))
- target.handle = ctypes.pointer(arr)
- target.no_free = True
-
- def lazy_callback(self, stream=None):
- assert self.handle.contents.ctx.device_type == 2
- assert self.lazy
- shape = c_array(ctypes.c_int64, self.shape)
- stride = c_array(ctypes.c_int64, shape_to_stride(self.shape))
- ndim = ctypes.c_int(len(self.shape))
- handle = DLArrayHandle()
- check_call(_LIB.DLArrayAlloc(shape, stride, ndim,
- self.handle.contents.ctx, ctypes.byref(handle)))
- check_call(_LIB.DLGpuArrayLazyCallback(
- self.handle, handle, stream.handle if stream else None))
- self.handle = handle
-
- def wrapped_lazy_callback(self, stream=None):
- # TODO: reshape / copyto / asnumpy may have more efficient implementation
- # This is just a workaround.
- if self.lazy:
- # here we move the judgement for lazy into forward hooks, shouldn't have callbacks.
- assert False
- self.lazy_callback(stream)
-
-
- def array(arr, ctx, data_type=np.float32):
- """Create an array from source arr.
- Parameters
- ----------
- arr : numpy.ndarray
- The array to be copied from
- ctx : DLContext, optional
- The device context to create the array
- Returns
- -------
- ret : NDArray
- The created array
- """
- if not isinstance(arr, np.ndarray):
- arr = np.array(arr, dtype=data_type)
- ret = empty(arr.shape, ctx)
- ret._sync_copyfrom(arr, data_type=data_type)
- return ret
-
-
- def empty(shape, ctx=cpu(0)):
- """Create an empty array given shape and device
- Parameters
- ----------
- shape : tuple of int
- The shape of the array
- ctx : DLContext
- The context of the array
- Returns
- -------
- arr : ndarray
- The array hetusys supported.
- """
- shape = c_array(ctypes.c_int64, shape)
- stride = c_array(ctypes.c_int64, shape_to_stride(shape))
- ndim = ctypes.c_int(len(shape))
- handle = DLArrayHandle()
- check_call(_LIB.DLArrayAlloc(
- shape, stride, ndim, ctx, ctypes.byref(handle)))
- return NDArray(handle)
-
-
- def numpyasdlarrayhandle(data):
- if not data.flags['C_CONTIGUOUS']:
- data = np.ascontiguousarray(data)
- arr = DLArray()
- shape = c_array(ctypes.c_int64, data.shape)
- arr.data = data.ctypes.data_as(ctypes.c_void_p)
- arr.shape = shape
- arr.stride = c_array(ctypes.c_int64, shape_to_stride(data.shape))
- arr.ndim = data.ndim
- arr.ctx = cpu(0)
- return arr
-
-
- class ND_Sparse_Array(object):
- __slots__ = ["data", "row", "col", "nrow", "ncol", "lazy"]
-
- def __init__(self, data, row, col, nrow, ncol):
- self.data = data
- self.row = row
- self.col = col
- self.nrow = nrow
- self.ncol = ncol
- self.lazy = False
-
- @property
- def shape(self):
- """Shape of this array"""
- return tuple((self.nrow, self.ncol))
-
-
- def sparse_array(values, indices, shape, ctx=cpu(0)):
- """Create an sparse array from source arrs.
- ----------
- values : numpy.ndarray
- The value array to be copied from
- indices : tuple(numpy.ndarray, numpy.ndarray)
- The index array to be copied from
- ctx : DLContext, optional
- The device context to create the array
- Returns
- -------
- ret : NDArray
- The created array
- """
- assert len(shape) == len(indices) == 2
- assert len(values) == len(indices[0]) == len(indices[1])
- assert isinstance(indices, tuple)
- mat = scipy.sparse.csr_matrix((values, indices), shape)
- values = mat.data
- rows = mat.indptr
- cols = mat.indices
- values_ret = empty(values.shape, ctx)
- values_ret._sync_copyfrom(values)
- row_ret = empty(rows.shape, ctx)
- row_ret._sync_copyfrom(rows, np.int32)
- col_ret = empty(cols.shape, ctx)
- col_ret._sync_copyfrom(cols, np.int32)
- return ND_Sparse_Array(values_ret, row_ret, col_ret, shape[0], shape[1])
-
-
- class IndexedSlices(object):
- __slots__ = ["indices", "values", "dense_shape", "deduplicated", "lazy"]
-
- def __init__(self, indices=None, values=None, dense_shape=None):
- self.indices = indices
- self.values = values
- self.dense_shape = dense_shape
- self.deduplicated = False
- self.lazy = False
-
- def get_dense_shape(self):
- assert self.dense_shape is not None
- return self.dense_shape
-
- def get_sparse_shape(self):
- assert isinstance(self.values, NDArray)
- return self.values.shape
-
- def update(self, indices, values, dense_shape):
- self.indices = indices
- self.values = values
- if self.dense_shape is not None:
- assert tuple(self.dense_shape) == tuple(dense_shape)
- else:
- self.dense_shape = dense_shape
-
- def deduplicate(self, stream):
- assert is_gpu_ctx(self.indices.ctx)
- np_indices = self.indices.asnumpy()
- unique_indices, inverse = np.unique(np_indices, return_inverse=True)
- indices_on_ctx = array(unique_indices, ctx=self.indices.ctx)
- self.indices = indices_on_ctx
- inverse_on_ctx = array(inverse, ctx=self.indices.ctx)
- new_value_shape = list(unique_indices.shape)
- new_value_shape.append(self.values.shape[-1])
- new_values = empty(new_value_shape, ctx=self.values.ctx)
- _LIB.DLGpuArraySet(new_values.handle, ctypes.c_float(
- 0), stream.handle if stream else None)
- _LIB.DeduplicateIndexedSlices(
- self.values.handle, inverse_on_ctx.handle, new_values.handle, stream.handle if stream else None)
- self.values = new_values
- self.deduplicated = True
-
- def cpu_deduplicate(self):
- assert not is_gpu_ctx(self.indices.ctx)
- np_indices = self.indices.asnumpy()
- unique_indices, inverse = np.unique(np_indices, return_inverse=True)
- new_value_shape = list(unique_indices.shape)
- last_dim = self.values.shape[-1]
- new_value_shape.append(last_dim)
- new_values = np.zeros(new_value_shape).astype(np.float32)
- flatten_ind = np_indices.reshape(-1)
- flatten = self.values.asnumpy().reshape((-1, last_dim))
- for i, ind in enumerate(inverse):
- new_values[ind] += flatten[i]
- self.values = array(new_values, cpu(0))
- self.indices = array(unique_indices, cpu(0))
- self.deduplicated = True
-
- def free_deduplicate(self):
- if self.deduplicated:
- del self.indices
- del self.values
- self.indices = None
- self.values = None
- self.deduplicated = False
|