Browse Source

fix issues on displaying configuration of cache server

tags/v1.2.0-rc1
tinazhang66 TinaMengtingZhang 4 years ago
parent
commit
5b7bdb9493
4 changed files with 72 additions and 32 deletions
  1. +8
    -8
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc
  2. +12
    -3
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc
  3. +17
    -13
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc
  4. +35
    -8
      mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h

+ 8
- 8
mindspore/ccsrc/minddata/dataset/engine/cache/cache_admin_arg.cc View File

@@ -339,7 +339,8 @@ Status CacheAdminArgHandler::Validate() {

// Additional checks here
auto max_num_workers = std::max<int32_t>(std::thread::hardware_concurrency(), 100);
if (num_workers_ < 1 || num_workers_ > max_num_workers)
if (used_args_[ArgValue::kArgNumWorkers] && (num_workers_ < 1 || num_workers_ > max_num_workers))
// Check the value of num_workers only if it's provided by users.
return Status(StatusCode::kMDSyntaxError,
"Number of workers must be in range of 1 and " + std::to_string(max_num_workers) + ".");
if (log_level_ < 0 || log_level_ > 4) return Status(StatusCode::kMDSyntaxError, "Log level must be in range (0..4).");
@@ -448,6 +449,7 @@ Status CacheAdminArgHandler::ShowServerInfo() {
int32_t num_workers = server_cfg_info.num_workers;
int8_t log_level = server_cfg_info.log_level;
std::string spill_dir = server_cfg_info.spill_dir;
if (spill_dir.empty()) spill_dir = "None";

int name_w = 20;
int value_w = 15;
@@ -460,11 +462,7 @@ Status CacheAdminArgHandler::ShowServerInfo() {
std::cout << std::setw(name_w) << "number of workers" << std::setw(value_w) << std::to_string(num_workers)
<< std::endl;
std::cout << std::setw(name_w) << "log level" << std::setw(value_w) << std::to_string(log_level) << std::endl;
if (spill_dir.empty()) {
std::cout << std::setw(name_w) << "spill" << std::setw(value_w) << "disabled" << std::endl;
} else {
std::cout << std::setw(name_w) << "spill dir" << std::setw(value_w) << spill_dir << std::endl;
}
std::cout << std::setw(name_w) << "spill dir" << std::setw(value_w) << spill_dir << std::endl;
std::cout << "----------------------------------------" << std::endl;

std::cout << "Active sessions: " << std::endl;
@@ -472,7 +470,7 @@ Status CacheAdminArgHandler::ShowServerInfo() {
for (auto session_id : session_ids) {
std::cout << session_id << " ";
}
std::cout << std::endl << "(Please use 'cache_admin --list_session' to get detailed info of sessions.)\n";
std::cout << std::endl << "(Please use 'cache_admin --list_sessions' to get detailed info of sessions.)\n";
} else {
std::cout << "No active sessions." << std::endl;
}
@@ -582,8 +580,10 @@ Status CacheAdminArgHandler::StartServer(CommandId command_id) {
close(0);
close(fd[1]);
// exec the cache server binary in this process
// If the user did not provide the value of num_workers, we pass -1 to cache server to allow it assign the default.
// So that the server knows if the number is provided by users or by default.
std::string workers_string = used_args_[ArgValue::kArgNumWorkers] ? std::to_string(num_workers_) : "-1";
std::string port_string = std::to_string(port_);
std::string workers_string = std::to_string(num_workers_);
std::string shared_memory_string = std::to_string(shm_mem_sz_);
std::string minloglevel_string = std::to_string(log_level_);
std::string daemonize_string = "true";


+ 12
- 3
mindspore/ccsrc/minddata/dataset/engine/cache/cache_main.cc View File

@@ -86,15 +86,19 @@ ms::Status StartServer(int argc, char **argv) {
std::this_thread::sleep_for(std::chrono::seconds(1));
ms::Status child_rc;
rc = msg.ReceiveStatus(&child_rc);
std::string warning_string;
if (rc.IsError()) {
return rc;
}
if (child_rc.IsError()) {
return child_rc;
} else {
warning_string = child_rc.ToString();
}
std::cout << "Cache server startup completed successfully!\n";
std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port
<< std::endl;
<< ".\n";
if (!warning_string.empty()) std::cout << "WARNING: " << warning_string;
std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server "
"logs (under "
<< ds::DefaultLogDir() << ") for any issues that may happen after startup\n";
@@ -116,12 +120,17 @@ ms::Status StartServer(int argc, char **argv) {
}
}

// Create the instance with some sanity checks built in
rc = builder.Build();
// Dump the summary
MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl;
MS_LOG(INFO) << builder << std::endl;
// Create the instance with some sanity checks built in
rc = builder.Build();
if (rc.IsOk()) {
if (daemonize && !rc.ToString().empty()) {
// If we have adjusted the number of workers provided by users, use the message queue to send the warning
// message if this is the child daemon.
msg.SendStatus(rc);
}
// If all goes well, kick off the threads. Loop forever and never return unless error.
ds::CacheServer &cs = ds::CacheServer::GetInstance();
rc = cs.Run(msg.GetMsgQueueId());


+ 17
- 13
mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc View File

@@ -44,7 +44,6 @@ Status CacheServer::DoServiceStart() {
MS_LOG(INFO) << "CacheServer will use disk folder: " << top_;
}
RETURN_IF_NOT_OK(vg_.ServiceStart());
RETURN_IF_NOT_OK(hw_info_->GetNumaNodeInfo());
auto num_numa_nodes = GetNumaNodeCount();
// If we link with numa library. Set default memory policy.
// If we don't pin thread to cpu, then use up all memory controllers to maximize
@@ -53,15 +52,6 @@ Status CacheServer::DoServiceStart() {
CacheServerHW::SetDefaultMemoryPolicy(numa_affinity_ ? CachePoolPolicy::kLocal : CachePoolPolicy::kInterleave));
auto my_node = hw_info_->GetMyNode();
MS_LOG(DEBUG) << "Cache server is running on numa node " << my_node;
// Bump up num_workers_ to at least the number of numa nodes
num_workers_ = std::max(num_numa_nodes, num_workers_);
// But also it shouldn't be too many more than the hardware concurrency
auto num_cpus = hw_info_->GetCpuCount();
num_workers_ = std::min(2 * num_cpus, num_workers_);
// Round up num_workers to a multiple of numa nodes.
auto remainder = num_workers_ % num_numa_nodes;
if (remainder > 0) num_workers_ += (num_numa_nodes - remainder);
MS_LOG(INFO) << "Re-adjusting the number of workers to " << num_workers_;
// There will be some threads working on the grpc queue and
// some number of threads working on the CacheServerRequest queue.
// Like a connector object we will set up the same number of queues but
@@ -993,7 +983,8 @@ session_id_type CacheServer::GetSessionID(connection_id_type connection_id) cons
}

CacheServer::CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port,
int32_t shared_meory_sz_in_gb, float memory_cap_ratio, int8_t log_level)
int32_t shared_meory_sz_in_gb, float memory_cap_ratio, int8_t log_level,
std::shared_ptr<CacheServerHW> hw_info)
: top_(spill_path),
num_workers_(num_workers),
num_grpc_workers_(num_workers_),
@@ -1002,8 +993,8 @@ CacheServer::CacheServer(const std::string &spill_path, int32_t num_workers, int
global_shutdown_(false),
memory_cap_ratio_(memory_cap_ratio),
numa_affinity_(true),
log_level_(log_level) {
hw_info_ = std::make_shared<CacheServerHW>();
log_level_(log_level),
hw_info_(std::move(hw_info)) {
// If we are not linked with numa library (i.e. NUMA_ENABLED is false), turn off cpu
// affinity which can make performance worse.
if (!CacheServerHW::numa_enabled()) {
@@ -1284,6 +1275,19 @@ Status CacheServer::Builder::SanityCheck() {
return Status::OK();
}

int32_t CacheServer::Builder::AdjustNumWorkers(int32_t num_workers) {
int32_t num_numa_nodes = hw_info_->GetNumaNodeCount();
// Bump up num_workers_ to at least the number of numa nodes
num_workers = std::max(num_numa_nodes, num_workers);
// But also it shouldn't be too many more than the hardware concurrency
int32_t num_cpus = hw_info_->GetCpuCount();
num_workers = std::min(2 * num_cpus, num_workers);
// Round up num_workers to a multiple of numa nodes.
auto remainder = num_workers % num_numa_nodes;
if (remainder > 0) num_workers += (num_numa_nodes - remainder);
return num_workers;
}

CacheServer::Builder::Builder()
: top_(""),
num_workers_(std::thread::hardware_concurrency() / 2),


+ 35
- 8
mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h View File

@@ -106,7 +106,7 @@ class CacheServer : public Service {
<< "Tcp/ip port: " << GetPort() << "\n"
<< "Shared memory size (in GB): " << GetSharedMemorySzInGb() << "\n"
<< "Memory cap ratio: " << GetMemoryCapRatio() << "\n"
<< "Log level: " << GetLogLevel();
<< "Log level: " << std::to_string(GetLogLevel());
}

friend std::ostream &operator<<(std::ostream &out, const Builder &bld) {
@@ -115,12 +115,34 @@ class CacheServer : public Service {
}

Status Build() {
// Get information of numa architecture and adjust num_workers_ based on numa count
hw_info_ = std::make_shared<CacheServerHW>();
RETURN_IF_NOT_OK(hw_info_->GetNumaNodeInfo());
std::string warning_string;
if (num_workers_ == -1) {
// if the user did not provide a value for num_workers, set it to half of num_cpu as default and adjust it if
// the default is not the optimal.
int32_t dft_num_workers = std::thread::hardware_concurrency() > 2 ? std::thread::hardware_concurrency() / 2 : 1;
num_workers_ = AdjustNumWorkers(dft_num_workers);
} else {
// if the users have given their own value, adjust it and provide a warning if it got changed.
int32_t num_workers_new = AdjustNumWorkers(num_workers_);
if (num_workers_ != num_workers_new) {
warning_string =
"The configuration of workers on the cache server is dependent on the NUMA architecture of the server. "
"The current setting is not the optimal for the NUMA architecture. Re-adjusting the number of workers "
"to optimal setting of " +
std::to_string(num_workers_new) + ".\n";
MS_LOG(INFO) << warning_string;
}
num_workers_ = num_workers_new;
}
RETURN_IF_NOT_OK(SanityCheck());
// We need to bring up the Task Manager by bringing up the Services singleton.
RETURN_IF_NOT_OK(Services::CreateInstance());
RETURN_IF_NOT_OK(
CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_, memory_cap_ratio_, log_level_));
return Status::OK();
RETURN_IF_NOT_OK(CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_,
memory_cap_ratio_, log_level_, std::move(hw_info_)));
return Status(StatusCode::kSuccess, warning_string);
}

private:
@@ -130,10 +152,14 @@ class CacheServer : public Service {
int32_t shared_memory_sz_in_gb_;
float memory_cap_ratio_;
int8_t log_level_;
std::shared_ptr<CacheServerHW> hw_info_;

/// \brief Sanity checks on the shared memory.
/// \return Status object
Status IpcResourceCleanup();

/// \brief Adjust the value of num_workers if it's not the optimal to NUMA architecture.
int32_t AdjustNumWorkers(int32_t num_workers);
};

CacheServer(const CacheServer &) = delete;
@@ -145,11 +171,12 @@ class CacheServer : public Service {
~CacheServer() override { (void)ServiceStop(); }

static Status CreateInstance(const std::string &spill_path, int32_t num_workers, int32_t port,
int32_t shared_memory_sz, float memory_cap_ratio, int8_t log_level) {
int32_t shared_memory_sz, float memory_cap_ratio, int8_t log_level,
std::shared_ptr<CacheServerHW> hw_info) {
std::call_once(init_instance_flag_, [&]() -> Status {
auto &SvcManager = Services::GetInstance();
RETURN_IF_NOT_OK(
SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio, log_level));
RETURN_IF_NOT_OK(SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio,
log_level, hw_info));
return Status::OK();
});
return Status::OK();
@@ -274,7 +301,7 @@ class CacheServer : public Service {
/// \param spill_path Top directory for spilling buffers to.
/// \param num_workers Number of threads for handling requests.
explicit CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port, int32_t share_memory_sz_in_gb,
float memory_cap_ratio, int8_t log_level);
float memory_cap_ratio, int8_t log_level, std::shared_ptr<CacheServerHW> hw_info);

/// \brief Locate a cache service from connection id.
/// \return Pointer to cache service. Null if not found


Loading…
Cancel
Save