diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc index 7da6c66c2e..8feaf2cf3b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin.cc @@ -19,6 +19,8 @@ #include #endif #include "minddata/dataset/engine/cache/cache_admin_arg.h" +#include "minddata/dataset/engine/cache/cache_common.h" +#include "minddata/dataset/util/path.h" namespace ds = mindspore::dataset; @@ -28,10 +30,26 @@ int main(int argc, char **argv) { std::stringstream arg_stream; #ifdef USE_GLOG - FLAGS_log_dir = "/tmp"; + FLAGS_minloglevel = google::WARNING; + FLAGS_log_dir = ds::DefaultLogDir(); + // Create default log dir + ds::Path log_dir = ds::Path(FLAGS_log_dir); + rc = log_dir.CreateDirectories(); + if (!rc.IsOk()) { + std::cerr << rc.ToString() << std::endl; + return 1; + } google::InitGoogleLogging(argv[0]); #endif + // Create default spilling dir + ds::Path spill_dir = ds::Path(ds::DefaultSpillDir()); + rc = spill_dir.CreateDirectories(); + if (!rc.IsOk()) { + std::cerr << rc.ToString() << std::endl; + return 1; + } + if (argc == 1) { args.Help(); return 0; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index ad203e393f..6fa579df2c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -39,7 +39,6 @@ const int32_t CacheAdminArgHandler::kDefaultNumWorkers = std::thread::hardware_c ? std::thread::hardware_concurrency() / 2 : 1; const char CacheAdminArgHandler::kServerBinary[] = "cache_server"; -const char CacheAdminArgHandler::kDefaultSpillDir[] = "/tmp"; CacheAdminArgHandler::CacheAdminArgHandler() : port_(kCfgDefaultCachePort), @@ -49,7 +48,7 @@ CacheAdminArgHandler::CacheAdminArgHandler() log_level_(kDefaultLogLevel), memory_cap_ratio_(kMemoryCapRatio), hostname_(kCfgDefaultCacheHost), - spill_dir_(kDefaultSpillDir), + spill_dir_(DefaultSpillDir()), command_id_(CommandId::kCmdUnknown) { // Initialize the command mappings arg_map_["-h"] = ArgValue::kArgHost; @@ -70,7 +69,7 @@ CacheAdminArgHandler::CacheAdminArgHandler() arg_map_["-m"] = ArgValue::kArgSharedMemorySize; arg_map_["--shared_memory_size"] = ArgValue::kArgSharedMemorySize; arg_map_["-l"] = ArgValue::kArgLogLevel; - arg_map_["--minloglevel"] = ArgValue::kArgLogLevel; + arg_map_["--loglevel"] = ArgValue::kArgLogLevel; arg_map_["-r"] = ArgValue::kArgMemoryCapRatio; arg_map_["--memory_cap_ratio"] = ArgValue::kArgMemoryCapRatio; arg_map_["--list_sessions"] = ArgValue::kArgListSessions; @@ -306,6 +305,7 @@ Status CacheAdminArgHandler::Validate() { // run. if (command_id_ == CommandId::kCmdUnknown) { std::string err_msg = "No command provided"; + err_msg += "\nPlease try `cache_admin --help` for more information"; return Status(StatusCode::kSyntaxError, err_msg); } @@ -353,12 +353,12 @@ Status CacheAdminArgHandler::RunCommand() { // the comm layer as soon as the request is received, and we need to wait // on the message queue instead. // The server will remove the queue and we will then wake up. But on the safe - // side, we will also set up an alarm and kill this proocess if we hang on + // side, we will also set up an alarm and kill this process if we hang on // the message queue. alarm(30); Status dummy_rc; (void)msg.ReceiveStatus(&dummy_rc); - std::cout << "Cache server has been stopped." << std::endl; + std::cout << "Cache server on port " << std::to_string(port_) << " has been stopped successfully." << std::endl; break; } case CommandId::kCmdGenerateSession: { @@ -367,7 +367,8 @@ Status CacheAdminArgHandler::RunCommand() { auto rq = std::make_shared(); RETURN_IF_NOT_OK(comm.HandleRequest(rq)); RETURN_IF_NOT_OK(rq->Wait()); - std::cout << "Session: " << rq->GetSessionId() << std::endl; + std::cout << "Session created for server on port " << std::to_string(port_) << ": " << rq->GetSessionId() + << std::endl; break; } case CommandId::kCmdDestroySession: { @@ -378,7 +379,7 @@ Status CacheAdminArgHandler::RunCommand() { auto rq = std::make_shared(cinfo); RETURN_IF_NOT_OK(comm.HandleRequest(rq)); RETURN_IF_NOT_OK(rq->Wait()); - std::cout << "Drop session successful" << std::endl; + std::cout << "Drop session successfully for server on port " << std::to_string(port_) << std::endl; break; } case CommandId::kCmdListSessions: { @@ -545,8 +546,8 @@ void CacheAdminArgHandler::Help() { std::cerr << " Possible values are in range [1...max(100, Number of CPU)].\n"; std::cerr << " Default is " << kDefaultNumWorkers << ".\n"; std::cerr << " [ [-s | --spilldir] ]\n"; - std::cerr << " Default is " << kDefaultSpillDir << ".\n"; - std::cerr << " [ [-l | --minloglevel] ]\n"; + std::cerr << " Default is " << DefaultSpillDir() << ".\n"; + std::cerr << " [ [-l | --loglevel] ]\n"; std::cerr << " Possible values are 0, 1, 2 and 3.\n"; std::cerr << " Default is 1 (info level).\n"; std::cerr << " [--list_sessions]\n"; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.h index f3ba121a8d..e06eb07fcd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.h @@ -35,7 +35,6 @@ class CacheAdminArgHandler { static constexpr int32_t kDefaultLogLevel = 1; static constexpr float kMemoryCapRatio = 0.8; static const char kServerBinary[]; - static const char kDefaultSpillDir[]; // These are the actual command types to execute enum class CommandId : int16_t { diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h index d2f7287305..101b15d369 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_common.h @@ -48,6 +48,8 @@ constexpr static uint32_t kLocalClientSupport = 1; constexpr static uint32_t kDataIsInSharedMemory = 2; /// \brief Size of each message used in message queue. constexpr static int32_t kSharedMessageSize = 2048; +/// \brief Prefix for default cache spilling path and log path +const char kDefaultPathPrefix[] = "/tmp/mindspore/cache"; /// \brief State of CacheService at the server. enum class CacheServiceState : int8_t { @@ -70,7 +72,9 @@ inline void Status2CacheReply(const Status &rc, CacheReply *reply) { /// \brief Generate the unix socket file we use on both client/server side given a tcp/ip port number /// \param port /// \return unix socket url -inline std::string PortToUnixSocketPath(int port) { return "/tmp/cache_server_p" + std::to_string(port); } +inline std::string PortToUnixSocketPath(int port) { + return kDefaultPathPrefix + std::string("/cache_server_p") + std::to_string(port); +} /// \brief Round up to the next 4k inline int64_t round_up_4K(int64_t sz) { @@ -87,6 +91,12 @@ using worker_id_t = int32_t; using numa_id_t = int32_t; using cpu_id_t = int32_t; +/// Return the default spill dir for cache +inline std::string DefaultSpillDir() { return kDefaultPathPrefix; } + +/// Return the default log dir for cache +inline std::string DefaultLogDir() { return kDefaultPathPrefix + std::string("/log"); } + } // namespace dataset } // namespace mindspore #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc index 2f5cbe6d63..aa451069a4 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_client.cc @@ -25,6 +25,7 @@ CacheClientGreeter::CacheClientGreeter(const std::string &hostname, int32_t port // We need to bump up the message size to unlimited. The default receiving // message limit is 4MB which is not big enough. args.SetMaxReceiveMessageSize(-1); + MS_LOG(INFO) << "Hostname: " << hostname << "."; #if CACHE_LOCAL_CLIENT // Try connect locally to the unix_socket first as the first preference // Need to resolve hostname to ip address rather than to do a string compare diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc index 14ae5fc7f2..d67ba30efc 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc @@ -44,10 +44,6 @@ ds::Status StartServer(int argc, char **argv) { .SetSharedMemorySizeInGB(strtol(argv[4], nullptr, 10)) .SetMemoryCapRatio(strtof(argv[7], nullptr)); -#ifdef USE_GLOG - FLAGS_minloglevel = strtol(argv[5], nullptr, 10); -#endif - auto daemonize_string = argv[6]; bool daemonize = strcmp(daemonize_string, "true") == 0 || strcmp(daemonize_string, "TRUE") == 0 || strcmp(daemonize_string, "t") == 0 || strcmp(daemonize_string, "T") == 0; @@ -63,7 +59,16 @@ ds::Status StartServer(int argc, char **argv) { ds::SharedMessage msg; if (daemonize) { #ifdef USE_GLOG - FLAGS_log_dir = "/tmp"; + // temporary setting log level to WARNING to avoid logging when creating dir + FLAGS_minloglevel = google::WARNING; + FLAGS_log_dir = ds::DefaultLogDir(); + // Create cache server default log dir + ds::Path log_dir = ds::Path(FLAGS_log_dir); + rc = log_dir.CreateDirectories(); + if (rc.IsError()) { + return rc; + } + FLAGS_minloglevel = strtol(argv[5], nullptr, 10); google::InitGoogleLogging(argv[0]); #endif rc = msg.Create(); @@ -88,8 +93,12 @@ ds::Status StartServer(int argc, char **argv) { if (child_rc.IsError()) { return child_rc; } - std::cerr << "cache server daemon has been created as process id " << pid << " and listening on port " << port - << "\nCheck log file for any start up error" << std::endl; + std::cout << "Cache server startup completed successfully!\n"; + std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port + << std::endl; + std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server " + "logs (under " + << ds::DefaultLogDir() << ") for any issues that may happen after startup\n"; signal(SIGCHLD, SIG_IGN); // ignore sig child signal. return ds::Status::OK(); } else { @@ -109,6 +118,7 @@ ds::Status StartServer(int argc, char **argv) { } // Dump the summary + MS_LOG(INFO) << "Logging services started with log level: " << argv[5]; MS_LOG(INFO) << builder << std::endl; // Create the instance with some sanity checks built in rc = builder.Build(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc index 0329d5f46a..3515d495de 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc @@ -1041,7 +1041,8 @@ Status CacheServer::DestroySession(CacheRequest *rq) { "A destroy cache request has been completed but it had a stale session id " + std::to_string(drop_session_id); RETURN_STATUS_UNEXPECTED(errMsg); } else { - std::string errMsg = "Session id " + std::to_string(drop_session_id) + " not found."; + std::string errMsg = + "Session id " + std::to_string(drop_session_id) + " not found in server on port " + std::to_string(port_) + "."; return Status(StatusCode::kFileNotExist, errMsg); } } @@ -1234,7 +1235,7 @@ Status CacheServer::Builder::SanityCheck() { } CacheServer::Builder::Builder() - : top_("/tmp"), + : top_(DefaultSpillDir()), num_workers_(std::thread::hardware_concurrency() / 2), port_(50052), shared_memory_sz_in_gb_(kDefaultSharedMemorySize), diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc index 9a31d35b36..65208dd752 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc @@ -84,9 +84,9 @@ Status CacheBase::FetchSamplesToWorkers() { // Instead of sending sampler id to WorkerEntry, we send them to the Prefetcher which will redirect them // to the WorkerEntry. do { - if (AllowCacheMiss() && wait_cnt > 0) { - MS_LOG(WARNING) << "Epoch: " << wait_cnt << " Cache Miss : " << num_cache_miss_ - << " Total number of rows : " << row_cnt_; + if (AllowCacheMiss() && wait_cnt > 0 && wait_cnt % op_num_repeats_per_epoch() == 0) { + MS_LOG(INFO) << "Epoch: " << op_current_epochs_ << " Cache Miss : " << num_cache_miss_ + << " Total number of rows : " << row_cnt_; } num_cache_miss_ = 0; row_cnt_ = 0; @@ -167,8 +167,8 @@ Status CacheBase::FetchSamplesToWorkers() { } // Dump the last epoch result (approximately) without waiting for the worker threads to come back. if (AllowCacheMiss()) { - MS_LOG(WARNING) << "Epoch: " << wait_cnt << " Cache Miss : " << num_cache_miss_ - << " Total number of rows : " << row_cnt_; + MS_LOG(INFO) << "Epoch: " << wait_cnt / op_num_repeats_per_epoch() << " Cache Miss : " << num_cache_miss_ + << " Total number of rows : " << row_cnt_; } return Status::OK(); }