diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc index 9e73b757e9..407716cd06 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc @@ -339,7 +339,8 @@ Status CacheAdminArgHandler::Validate() { // Additional checks here auto max_num_workers = std::max(std::thread::hardware_concurrency(), 100); - if (num_workers_ < 1 || num_workers_ > max_num_workers) + if (used_args_[ArgValue::kArgNumWorkers] && (num_workers_ < 1 || num_workers_ > max_num_workers)) + // Check the value of num_workers only if it's provided by users. return Status(StatusCode::kMDSyntaxError, "Number of workers must be in range of 1 and " + std::to_string(max_num_workers) + "."); if (log_level_ < 0 || log_level_ > 4) return Status(StatusCode::kMDSyntaxError, "Log level must be in range (0..4)."); @@ -448,6 +449,7 @@ Status CacheAdminArgHandler::ShowServerInfo() { int32_t num_workers = server_cfg_info.num_workers; int8_t log_level = server_cfg_info.log_level; std::string spill_dir = server_cfg_info.spill_dir; + if (spill_dir.empty()) spill_dir = "None"; int name_w = 20; int value_w = 15; @@ -460,11 +462,7 @@ Status CacheAdminArgHandler::ShowServerInfo() { std::cout << std::setw(name_w) << "number of workers" << std::setw(value_w) << std::to_string(num_workers) << std::endl; std::cout << std::setw(name_w) << "log level" << std::setw(value_w) << std::to_string(log_level) << std::endl; - if (spill_dir.empty()) { - std::cout << std::setw(name_w) << "spill" << std::setw(value_w) << "disabled" << std::endl; - } else { - std::cout << std::setw(name_w) << "spill dir" << std::setw(value_w) << spill_dir << std::endl; - } + std::cout << std::setw(name_w) << "spill dir" << std::setw(value_w) << spill_dir << std::endl; std::cout << "----------------------------------------" << std::endl; std::cout << "Active sessions: " << std::endl; @@ -472,7 +470,7 @@ Status CacheAdminArgHandler::ShowServerInfo() { for (auto session_id : session_ids) { std::cout << session_id << " "; } - std::cout << std::endl << "(Please use 'cache_admin --list_session' to get detailed info of sessions.)\n"; + std::cout << std::endl << "(Please use 'cache_admin --list_sessions' to get detailed info of sessions.)\n"; } else { std::cout << "No active sessions." << std::endl; } @@ -582,8 +580,10 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) { close(0); close(fd[1]); // exec the cache server binary in this process + // If the user did not provide the value of num_workers, we pass -1 to cache server to allow it assign the default. + // So that the server knows if the number is provided by users or by default. + std::string workers_string = used_args_[ArgValue::kArgNumWorkers] ? std::to_string(num_workers_) : "-1"; std::string port_string = std::to_string(port_); - std::string workers_string = std::to_string(num_workers_); std::string shared_memory_string = std::to_string(shm_mem_sz_); std::string minloglevel_string = std::to_string(log_level_); std::string daemonize_string = "true"; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc index 67d3c7ff7f..4e7379137d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc @@ -86,15 +86,19 @@ ms::Status StartServer(int argc, char **argv) { std::this_thread::sleep_for(std::chrono::seconds(1)); ms::Status child_rc; rc = msg.ReceiveStatus(&child_rc); + std::string warning_string; if (rc.IsError()) { return rc; } if (child_rc.IsError()) { return child_rc; + } else { + warning_string = child_rc.ToString(); } std::cout << "Cache server startup completed successfully!\n"; std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port - << std::endl; + << ".\n"; + if (!warning_string.empty()) std::cout << "WARNING: " << warning_string; std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server " "logs (under " << ds::DefaultLogDir() << ") for any issues that may happen after startup\n"; @@ -116,12 +120,17 @@ ms::Status StartServer(int argc, char **argv) { } } + // Create the instance with some sanity checks built in + rc = builder.Build(); // Dump the summary MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl; MS_LOG(INFO) << builder << std::endl; - // Create the instance with some sanity checks built in - rc = builder.Build(); if (rc.IsOk()) { + if (daemonize && !rc.ToString().empty()) { + // If we have adjusted the number of workers provided by users, use the message queue to send the warning + // message if this is the child daemon. + msg.SendStatus(rc); + } // If all goes well, kick off the threads. Loop forever and never return unless error. ds::CacheServer &cs = ds::CacheServer::GetInstance(); rc = cs.Run(msg.GetMsgQueueId()); diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc index 7efa94f263..54159ca14f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc @@ -44,7 +44,6 @@ Status CacheServer::DoServiceStart() { MS_LOG(INFO) << "CacheServer will use disk folder: " << top_; } RETURN_IF_NOT_OK(vg_.ServiceStart()); - RETURN_IF_NOT_OK(hw_info_->GetNumaNodeInfo()); auto num_numa_nodes = GetNumaNodeCount(); // If we link with numa library. Set default memory policy. // If we don't pin thread to cpu, then use up all memory controllers to maximize @@ -53,15 +52,6 @@ Status CacheServer::DoServiceStart() { CacheServerHW::SetDefaultMemoryPolicy(numa_affinity_ ? CachePoolPolicy::kLocal : CachePoolPolicy::kInterleave)); auto my_node = hw_info_->GetMyNode(); MS_LOG(DEBUG) << "Cache server is running on numa node " << my_node; - // Bump up num_workers_ to at least the number of numa nodes - num_workers_ = std::max(num_numa_nodes, num_workers_); - // But also it shouldn't be too many more than the hardware concurrency - auto num_cpus = hw_info_->GetCpuCount(); - num_workers_ = std::min(2 * num_cpus, num_workers_); - // Round up num_workers to a multiple of numa nodes. - auto remainder = num_workers_ % num_numa_nodes; - if (remainder > 0) num_workers_ += (num_numa_nodes - remainder); - MS_LOG(INFO) << "Re-adjusting the number of workers to " << num_workers_; // There will be some threads working on the grpc queue and // some number of threads working on the CacheServerRequest queue. // Like a connector object we will set up the same number of queues but @@ -993,7 +983,8 @@ session_id_type CacheServer::GetSessionID(connection_id_type connection_id) cons } CacheServer::CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port, - int32_t shared_meory_sz_in_gb, float memory_cap_ratio, int8_t log_level) + int32_t shared_meory_sz_in_gb, float memory_cap_ratio, int8_t log_level, + std::shared_ptr hw_info) : top_(spill_path), num_workers_(num_workers), num_grpc_workers_(num_workers_), @@ -1002,8 +993,8 @@ CacheServer::CacheServer(const std::string &spill_path, int32_t num_workers, int global_shutdown_(false), memory_cap_ratio_(memory_cap_ratio), numa_affinity_(true), - log_level_(log_level) { - hw_info_ = std::make_shared(); + log_level_(log_level), + hw_info_(std::move(hw_info)) { // If we are not linked with numa library (i.e. NUMA_ENABLED is false), turn off cpu // affinity which can make performance worse. if (!CacheServerHW::numa_enabled()) { @@ -1284,6 +1275,19 @@ Status CacheServer::Builder::SanityCheck() { return Status::OK(); } +int32_t CacheServer::Builder::AdjustNumWorkers(int32_t num_workers) { + int32_t num_numa_nodes = hw_info_->GetNumaNodeCount(); + // Bump up num_workers_ to at least the number of numa nodes + num_workers = std::max(num_numa_nodes, num_workers); + // But also it shouldn't be too many more than the hardware concurrency + int32_t num_cpus = hw_info_->GetCpuCount(); + num_workers = std::min(2 * num_cpus, num_workers); + // Round up num_workers to a multiple of numa nodes. + auto remainder = num_workers % num_numa_nodes; + if (remainder > 0) num_workers += (num_numa_nodes - remainder); + return num_workers; +} + CacheServer::Builder::Builder() : top_(""), num_workers_(std::thread::hardware_concurrency() / 2), diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h index 156f4b242e..3b251681ae 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h @@ -106,7 +106,7 @@ class CacheServer : public Service { << "Tcp/ip port: " << GetPort() << "\n" << "Shared memory size (in GB): " << GetSharedMemorySzInGb() << "\n" << "Memory cap ratio: " << GetMemoryCapRatio() << "\n" - << "Log level: " << GetLogLevel(); + << "Log level: " << std::to_string(GetLogLevel()); } friend std::ostream &operator<<(std::ostream &out, const Builder &bld) { @@ -115,12 +115,34 @@ class CacheServer : public Service { } Status Build() { + // Get information of numa architecture and adjust num_workers_ based on numa count + hw_info_ = std::make_shared(); + RETURN_IF_NOT_OK(hw_info_->GetNumaNodeInfo()); + std::string warning_string; + if (num_workers_ == -1) { + // if the user did not provide a value for num_workers, set it to half of num_cpu as default and adjust it if + // the default is not the optimal. + int32_t dft_num_workers = std::thread::hardware_concurrency() > 2 ? std::thread::hardware_concurrency() / 2 : 1; + num_workers_ = AdjustNumWorkers(dft_num_workers); + } else { + // if the users have given their own value, adjust it and provide a warning if it got changed. + int32_t num_workers_new = AdjustNumWorkers(num_workers_); + if (num_workers_ != num_workers_new) { + warning_string = + "The configuration of workers on the cache server is dependent on the NUMA architecture of the server. " + "The current setting is not the optimal for the NUMA architecture. Re-adjusting the number of workers " + "to optimal setting of " + + std::to_string(num_workers_new) + ".\n"; + MS_LOG(INFO) << warning_string; + } + num_workers_ = num_workers_new; + } RETURN_IF_NOT_OK(SanityCheck()); // We need to bring up the Task Manager by bringing up the Services singleton. RETURN_IF_NOT_OK(Services::CreateInstance()); - RETURN_IF_NOT_OK( - CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_, memory_cap_ratio_, log_level_)); - return Status::OK(); + RETURN_IF_NOT_OK(CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_, + memory_cap_ratio_, log_level_, std::move(hw_info_))); + return Status(StatusCode::kSuccess, warning_string); } private: @@ -130,10 +152,14 @@ class CacheServer : public Service { int32_t shared_memory_sz_in_gb_; float memory_cap_ratio_; int8_t log_level_; + std::shared_ptr hw_info_; /// \brief Sanity checks on the shared memory. /// \return Status object Status IpcResourceCleanup(); + + /// \brief Adjust the value of num_workers if it's not the optimal to NUMA architecture. + int32_t AdjustNumWorkers(int32_t num_workers); }; CacheServer(const CacheServer &) = delete; @@ -145,11 +171,12 @@ class CacheServer : public Service { ~CacheServer() override { (void)ServiceStop(); } static Status CreateInstance(const std::string &spill_path, int32_t num_workers, int32_t port, - int32_t shared_memory_sz, float memory_cap_ratio, int8_t log_level) { + int32_t shared_memory_sz, float memory_cap_ratio, int8_t log_level, + std::shared_ptr hw_info) { std::call_once(init_instance_flag_, [&]() -> Status { auto &SvcManager = Services::GetInstance(); - RETURN_IF_NOT_OK( - SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio, log_level)); + RETURN_IF_NOT_OK(SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio, + log_level, hw_info)); return Status::OK(); }); return Status::OK(); @@ -274,7 +301,7 @@ class CacheServer : public Service { /// \param spill_path Top directory for spilling buffers to. /// \param num_workers Number of threads for handling requests. explicit CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port, int32_t share_memory_sz_in_gb, - float memory_cap_ratio, int8_t log_level); + float memory_cap_ratio, int8_t log_level, std::shared_ptr hw_info); /// \brief Locate a cache service from connection id. /// \return Pointer to cache service. Null if not found