Browse Source

Rebase to f352f9a54a

tags/v1.1.0
Jamie Nisbet Lixia Chen 5 years ago
parent
commit
5664641651
7 changed files with 38 additions and 48 deletions
  1. +19
    -9
      mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt
  2. +0
    -17
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc
  3. +14
    -10
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc
  4. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc
  5. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc
  6. +2
    -7
      mindspore/dataset/engine/cache_client.py
  7. +1
    -4
      mindspore/dataset/engine/validators.py

+ 19
- 9
mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt View File

@@ -6,16 +6,26 @@ ms_build_flatbuffers("de_tensor.fbs" ${CMAKE_CURRENT_SOURCE_DIR} generated_engin
file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")
set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD)

# Try to find numa header file and its library
find_file(NUMA_HDR NAMES "numa.h")

if (EXISTS ${NUMA_HDR})
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
endif ()

if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
ADD_DEFINITIONS(-DCACHE_LOCAL_CLIENT)

# Try to find numa header file and its library
FIND_PATH( NUMA_INCLUDE_DIR numa.h )
MESSAGE( "Numa include dir is: ${NUMA_INCLUDE_DIR}" )

FIND_LIBRARY( NUMA_LIBRARY NAMES libnuma.so )
MESSAGE( "Numa library is: ${NUMA_LIBRARY}" )

FIND_PACKAGE_HANDLE_STANDARD_ARGS( NUMA DEFAULT_MSG
NUMA_INCLUDE_DIR
NUMA_LIBRARY
)
if ( NUMA_FOUND )
ADD_DEFINITIONS(-DNUMA_ENABLED)
MESSAGE("Numa package found")
else()
MESSAGE(FATAL_ERROR "Numa package not found, try 'sudo yum install numactl-devel' or 'sudo apt-get install libnuma-dev'")
endif()
endif ()

add_library(engine-cache-client OBJECT
@@ -71,7 +81,7 @@ if (ENABLE_CACHE)
target_link_libraries(cache_server mindspore::glog)
endif ()

if (EXISTS ${NUMA_HDR})
if (NUMA_FOUND)
target_link_libraries(cache_server numa)
endif ()



+ 0
- 17
mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc View File

@@ -32,23 +32,6 @@ int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
#endif

std::string warningMsg;
warningMsg.reserve(512);
warningMsg += "WARNING:\n";
warningMsg += "cache_admin and the cache server that it controls are currently only used for experimental research";
warningMsg += " purposes at this time.\n";
auto env_enable_cache = std::getenv("MS_ENABLE_CACHE");
if (env_enable_cache == nullptr || strcmp(env_enable_cache, "TRUE") != 0) {
// temporary disable cache feature in the current release
warningMsg += "This command is currently disabled. Quitting.\n";
std::cerr << warningMsg << std::endl;
return 0;
}
warningMsg += "It is not intended for general availability yet as it may not be stable. Use it at your own risk.\n";

// A warning message until the code is mature enough.
std::cerr << warningMsg << std::endl;

if (argc == 1) {
args.Help();
return 0;


+ 14
- 10
mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc View File

@@ -343,7 +343,7 @@ Status CacheAdminArgHandler::RunCommand() {
if (rc.IsError()) {
msg.RemoveResourcesOnExit();
if (rc.IsNetWorkError()) {
std::string errMsg = "Server is not up or has been shutdown already.";
std::string errMsg = "Server on port " + std::to_string(port_) + " is not up or has been shutdown already.";
return Status(StatusCode::kNetWorkError, errMsg);
}
return rc;
@@ -355,9 +355,10 @@ Status CacheAdminArgHandler::RunCommand() {
// The server will remove the queue and we will then wake up. But on the safe
// side, we will also set up an alarm and kill this proocess if we hang on
// the message queue.
alarm(15);
alarm(30);
Status dummy_rc;
(void)msg.ReceiveStatus(&dummy_rc);
std::cout << "Cache server has been stopped." << std::endl;
break;
}
case CommandId::kCmdGenerateSession: {
@@ -384,6 +385,7 @@ Status CacheAdminArgHandler::RunCommand() {
CacheClientGreeter comm(hostname_, port_, 1);
RETURN_IF_NOT_OK(comm.ServiceStart());
auto rq = std::make_shared<ListSessionsRequest>();
std::cout << "Listing sessions for server on port " << port_ << "\n" << std::endl;
RETURN_IF_NOT_OK(comm.HandleRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::vector<SessionCacheInfo> session_info = rq->GetSessionCacheInfo();
@@ -481,12 +483,14 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) {
RETURN_STATUS_UNEXPECTED(err_msg);
}
msg.resize(n);
std::cout << msg << std::endl;
if (WIFEXITED(status)) {
auto exit_status = WEXITSTATUS(status);
if (exit_status) {
std::string errMsg = "Child exit status " + std::to_string(exit_status);
std::string errMsg = msg + "\nChild exit status " + std::to_string(exit_status);
return Status(StatusCode::kUnexpectedError, errMsg);
} else {
// Not an error, some info message goes to stdout
std::cout << msg << std::endl;
}
}
return Status::OK();
@@ -545,14 +549,14 @@ void CacheAdminArgHandler::Help() {
std::cerr << " [ [-l | --minloglevel] <log level> ]\n";
std::cerr << " Possible values are 0, 1, 2 and 3.\n";
std::cerr << " Default is 1 (info level).\n";
std::cerr << " [ --list_sessions ]\n";
std::cerr << " [--list_sessions]\n";
std::cerr << " [--help]" << std::endl;
// Do not expose these option to the user via help or documentation, but the options do exist to aid with
// development and tuning.
// std::cerr << " [ [-m | --shared_memory_size] <shared memory size> ]\n";
// std::cerr << " Default is " << kDefaultSharedMemorySizeInGB << " (Gb in unit).\n";
// std::cerr << " [ [-r | --memory_cap_ratio] <float percent value>]\n";
// std::cerr << " Default is " << kMemoryCapRatio << ".\n";
std::cerr << " [--help]" << std::endl;
// [ [-m | --shared_memory_size] <shared memory size> ]
// Default is: kDefaultSharedMemorySizeInGB (Gb in unit)
// [ [-r | --memory_cap_ratio] <float percent value>]
// Default is kMemoryCapRatio
}
} // namespace dataset
} // namespace mindspore

+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc View File

@@ -88,7 +88,7 @@ ds::Status StartServer(int argc, char **argv) {
if (child_rc.IsError()) {
return child_rc;
}
std::cerr << "cache server daemon process has been created as process id: " << pid
std::cerr << "cache server daemon has been created as process id " << pid << " and listening on port " << port
<< "\nCheck log file for any start up error" << std::endl;
signal(SIGCHLD, SIG_IGN); // ignore sig child signal.
return ds::Status::OK();


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc View File

@@ -631,6 +631,7 @@ Status CacheServer::GetCacheMissKeys(CacheRequest *rq, CacheReply *reply) {

inline Status GenerateClientSessionID(session_id_type session_id, CacheReply *reply) {
reply->set_result(std::to_string(session_id));
MS_LOG(INFO) << "Server generated new session id " << session_id;
return Status::OK();
}



+ 2
- 7
mindspore/dataset/engine/cache_client.py View File

@@ -15,8 +15,8 @@
"""Cache client
"""

import os
import copy
from mindspore._c_dataengine import CacheClient

from ..core.validator_helpers import type_check, check_uint32, check_uint64

@@ -39,12 +39,7 @@ class DatasetCache:
self.port = port
self.prefetch_size = prefetch_size
self.num_connections = num_connections
if os.getenv('MS_ENABLE_CACHE') != 'TRUE':
# temporary disable cache feature in the current release
self.cache_client = None
else:
from mindspore._c_dataengine import CacheClient
self.cache_client = CacheClient(session_id, size, spilling, hostname, port, num_connections, prefetch_size)
self.cache_client = CacheClient(session_id, size, spilling, hostname, port, num_connections, prefetch_size)

def GetStat(self):
return self.cache_client.GetStat()


+ 1
- 4
mindspore/dataset/engine/validators.py View File

@@ -30,6 +30,7 @@ from ..core.validator_helpers import parse_user_args, type_check, type_check_lis

from . import datasets
from . import samplers
from . import cache_client


def check_imagefolderdataset(method):
@@ -1259,8 +1260,4 @@ def check_paddeddataset(method):
def check_cache_option(cache):
"""Sanity check for cache parameter"""
if cache is not None:
if os.getenv('MS_ENABLE_CACHE') != 'TRUE':
# temporary disable cache feature in the current release
raise ValueError("Caching is disabled in the current release.")
from . import cache_client
type_check(cache, (cache_client.DatasetCache,), "cache")

Loading…
Cancel
Save