Browse Source

fixed scheduler cannot exit

tags/v1.2.0-rc1
chendongsheng 5 years ago
parent
commit
2d7c2fbc73
17 changed files with 6 additions and 25 deletions
  1. +0
    -1
      mindspore/ccsrc/ps/CMakeLists.txt
  2. +0
    -3
      mindspore/ccsrc/ps/core/cluster_metadata.cc
  3. +0
    -1
      mindspore/ccsrc/ps/core/cluster_metadata.h
  4. +0
    -1
      mindspore/ccsrc/ps/core/http_client.h
  5. +0
    -1
      mindspore/ccsrc/ps/core/scheduler_node.cc
  6. +0
    -1
      mindspore/ccsrc/ps/core/scheduler_node.h
  7. +0
    -3
      mindspore/ccsrc/ps/core/tcp_client.cc
  8. +1
    -1
      mindspore/ccsrc/ps/core/tcp_client.h
  9. +0
    -3
      mindspore/ccsrc/ps/core/tcp_server.cc
  10. +1
    -0
      mindspore/ccsrc/ps/core/tcp_server.h
  11. +1
    -0
      mindspore/ccsrc/ps/core/worker_node.cc
  12. +0
    -3
      mindspore/ccsrc/ps/parameter_server.cc
  13. +0
    -1
      mindspore/ccsrc/ps/parameter_server.h
  14. +1
    -0
      mindspore/ccsrc/ps/ps_context.cc
  15. +1
    -0
      mindspore/ccsrc/ps/ps_context.h
  16. +1
    -3
      mindspore/ccsrc/ps/scheduler.cc
  17. +0
    -3
      mindspore/ccsrc/ps/worker.cc

+ 0
- 1
mindspore/ccsrc/ps/CMakeLists.txt View File

@@ -12,7 +12,6 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU)))
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_client.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_message_handler.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/tcp_server.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/cluster_metadata.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/node.cc")
list(REMOVE_ITEM _PS_SRC_FILES "core/node_manager.cc")
list(REMOVE_ITEM _PS_SRC_FILES "ps_cache/ps_cache_manager.cc")


+ 0
- 3
mindspore/ccsrc/ps/core/cluster_metadata.cc View File

@@ -32,9 +32,6 @@ void ClusterMetadata::Init(const uint32_t &worker_num, const uint32_t &server_nu
const uint16_t &scheduler_port) {
worker_num_ = worker_num;
server_num_ = server_num;
if (!CommUtil::CheckIp(scheduler_host)) {
MS_LOG(EXCEPTION) << "The scheduler_host:" << scheduler_host << " is illegal!";
}
scheduler_host_ = std::make_unique<std::string>(scheduler_host);
scheduler_port_ = scheduler_port;
}


+ 0
- 1
mindspore/ccsrc/ps/core/cluster_metadata.h View File

@@ -23,7 +23,6 @@
#include <utility>

#include "utils/log_adapter.h"
#include "ps/core/comm_util.h"

namespace mindspore {
namespace ps {


+ 0
- 1
mindspore/ccsrc/ps/core/http_client.h View File

@@ -45,7 +45,6 @@
namespace mindspore {
namespace ps {
namespace core {

enum class HttpMethod { HM_GET = 1 << 0, HM_POST = 1 << 1 };

enum class Status : int {


+ 0
- 1
mindspore/ccsrc/ps/core/scheduler_node.cc View File

@@ -19,7 +19,6 @@
namespace mindspore {
namespace ps {
namespace core {

SchedulerNode::~SchedulerNode() {
MS_LOG(INFO) << "Stop scheduler node!";
Stop();


+ 0
- 1
mindspore/ccsrc/ps/core/scheduler_node.h View File

@@ -36,7 +36,6 @@
namespace mindspore {
namespace ps {
namespace core {

class SchedulerNode : public Node {
public:
SchedulerNode() : server_(nullptr), scheduler_thread_(nullptr), update_state_thread_(nullptr) {}


+ 0
- 3
mindspore/ccsrc/ps/core/tcp_client.cc View File

@@ -30,8 +30,6 @@
#include <string>
#include <utility>

#include "ps/core/comm_util.h"

namespace mindspore {
namespace ps {
namespace core {
@@ -335,7 +333,6 @@ void TcpClient::StartTimer(const uint32_t &time) {
void TcpClient::set_timer_callback(const OnTimer &timer) { on_timer_callback_ = timer; }

const event_base &TcpClient::eventbase() { return *event_base_; }

} // namespace core
} // namespace ps
} // namespace mindspore

+ 1
- 1
mindspore/ccsrc/ps/core/tcp_client.h View File

@@ -34,6 +34,7 @@

#include "ps/core/cluster_metadata.h"
#include "utils/convert_utils_base.h"
#include "ps/core/comm_util.h"

namespace mindspore {
namespace ps {
@@ -99,7 +100,6 @@ class TcpClient {
std::atomic<bool> is_stop_;
std::atomic<bool> is_connected_;
};

} // namespace core
} // namespace ps
} // namespace mindspore


+ 0
- 3
mindspore/ccsrc/ps/core/tcp_server.cc View File

@@ -29,8 +29,6 @@
#include <csignal>
#include <utility>

#include "ps/core/comm_util.h"

namespace mindspore {
namespace ps {
namespace core {
@@ -445,7 +443,6 @@ int TcpServer::ConnectionNum() const { return connections_.size(); }
const std::map<evutil_socket_t, std::shared_ptr<TcpConnection>> &TcpServer::Connections() const { return connections_; }

void TcpServer::SetMessageCallback(const OnServerReceiveMessage &cb) { message_callback_ = cb; }

} // namespace core
} // namespace ps
} // namespace mindspore

+ 1
- 0
mindspore/ccsrc/ps/core/tcp_server.h View File

@@ -37,6 +37,7 @@
#include "ps/core/tcp_message_handler.h"
#include "ps/core/cluster_metadata.h"
#include "utils/convert_utils_base.h"
#include "ps/core/comm_util.h"

namespace mindspore {
namespace ps {


+ 1
- 0
mindspore/ccsrc/ps/core/worker_node.cc View File

@@ -15,6 +15,7 @@
*/

#include "ps/core/worker_node.h"
#include "ps/core/comm_util.h"

namespace mindspore {
namespace ps {


+ 0
- 3
mindspore/ccsrc/ps/parameter_server.cc View File

@@ -23,9 +23,6 @@ void ParameterServer::Run(const FuncGraphPtr &func_graph) {
MS_LOG(INFO) << "PServer starts connecting to scheduler and workers...";
server_node_ = std::make_shared<core::ServerNode>();

core::ClusterMetadata::instance()->Init(
PSContext::instance()->initial_worker_num(), PSContext::instance()->initial_server_num(),
PSContext::instance()->scheduler_host(), PSContext::instance()->scheduler_port());
MS_LOG(INFO) << "PServer connected successfully.";
if (!PSContext::instance()->is_server()) {
MS_LOG(INFO) << "This is not the Server node.";


+ 0
- 1
mindspore/ccsrc/ps/parameter_server.h View File

@@ -63,7 +63,6 @@

namespace mindspore {
namespace ps {

class ParameterServer {
public:
static ParameterServer &GetInstance() {


+ 1
- 0
mindspore/ccsrc/ps/ps_context.cc View File

@@ -52,6 +52,7 @@ void PSContext::SetPSEnable(bool enabled) {
server_num_ = std::strtol(common::GetEnv(kEnvPServerNum).c_str(), nullptr, 10);
scheduler_host_ = common::GetEnv(kEnvSchedulerHost);
scheduler_port_ = std::strtol(common::GetEnv(kEnvSchedulerPort).c_str(), nullptr, 10);
core::ClusterMetadata::instance()->Init(worker_num_, server_num_, scheduler_host_, scheduler_port_);
} else {
MS_LOG(INFO) << "PS mode is disabled.";
is_worker_ = false;


+ 1
- 0
mindspore/ccsrc/ps/ps_context.h View File

@@ -20,6 +20,7 @@
#include <string>
#include <memory>
#include "ps/constants.h"
#include "ps/core/cluster_metadata.h"

namespace mindspore {
namespace ps {


+ 1
- 3
mindspore/ccsrc/ps/scheduler.cc View File

@@ -19,9 +19,7 @@
namespace mindspore {
namespace ps {
void Scheduler::Run() {
core::ClusterMetadata::instance()->Init(
PSContext::instance()->initial_worker_num(), PSContext::instance()->initial_server_num(),
PSContext::instance()->scheduler_host(), PSContext::instance()->scheduler_port());
MS_LOG(INFO) << "Start scheduler.";
scheduler_node_.Start();
scheduler_node_.Finish();
scheduler_node_.Stop();


+ 0
- 3
mindspore/ccsrc/ps/worker.cc View File

@@ -22,9 +22,6 @@ namespace ps {
void Worker::Run() {
std::lock_guard<std::mutex> lock(running_mutex_);

core::ClusterMetadata::instance()->Init(
PSContext::instance()->initial_worker_num(), PSContext::instance()->initial_server_num(),
PSContext::instance()->scheduler_host(), PSContext::instance()->scheduler_port());
server_num_ = PSContext::instance()->initial_server_num();
if (running_) {
MS_LOG(INFO) << "'Worker is already running.";


Loading…
Cancel
Save