Browse Source

Add MindSpore communciation framework network building and communication lib implementation.

tags/v1.6.0
ZPaC 4 years ago
parent
commit
6d236baaf2
11 changed files with 491 additions and 6 deletions
  1. +1
    -1
      mindspore/ccsrc/CMakeLists.txt
  2. +6
    -0
      mindspore/ccsrc/distributed/CMakeLists.txt
  3. +191
    -0
      mindspore/ccsrc/distributed/cluster/cluster_context.cc
  4. +101
    -0
      mindspore/ccsrc/distributed/cluster/cluster_context.h
  5. +39
    -0
      mindspore/ccsrc/distributed/cluster/dummy_cluster_context.cc
  6. +50
    -0
      mindspore/ccsrc/distributed/cluster/dummy_cluster_context.h
  7. +41
    -0
      mindspore/ccsrc/distributed/constants.h
  8. +46
    -0
      mindspore/ccsrc/runtime/hardware/cpu/ms_collective_comm_lib.cc
  9. +1
    -1
      mindspore/ccsrc/runtime/hardware/cpu/ms_collective_comm_lib.h
  10. +6
    -4
      mindspore/ccsrc/runtime/hardware/cpu/ms_communication_group.h
  11. +9
    -0
      mindspore/core/utils/ms_utils.h

+ 1
- 1
mindspore/ccsrc/CMakeLists.txt View File

@@ -221,7 +221,7 @@ set(SUB_COMP
frontend/operator
pipeline/jit
pipeline/pynative
common debug pybind_api utils vm profiler ps fl
common debug pybind_api utils vm profiler ps fl distributed
)

foreach(_comp ${SUB_COMP})


+ 6
- 0
mindspore/ccsrc/distributed/CMakeLists.txt View File

@@ -1,5 +1,11 @@
file(GLOB_RECURSE _DISTRIBUTED_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc")

if(NOT ENABLE_CPU OR WIN32)
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/cluster_context.cc")
else()
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/dummy_cluster_context.cc")
endif()

set_property(SOURCE ${_DISTRIBUTED_SRC_FILES} PROPERTY COMPILE_DEFINITIONS
SUBMODULE_ID=mindspore::SubModuleId::SM_DISTRIBUTED)
add_library(_mindspore_distributed_obj OBJECT ${_DISTRIBUTED_SRC_FILES})

+ 191
- 0
mindspore/ccsrc/distributed/cluster/cluster_context.cc View File

@@ -0,0 +1,191 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <vector>
#include "distributed/cluster/cluster_context.h"
#include "utils/ms_context.h"
#include "ps/ps_context.h"
#include "debug/common.h"

namespace mindspore {
namespace distributed {
namespace cluster {
ClusterContext::ClusterContext()
: inited_(false),
finalized_(true),
node_num_each_role_({}),
scheduler_host_(kLocalHost),
scheduler_port_(kDefaultSchedPort),
node_(nullptr),
node_role_(kEnvRoleOfWorker),
cluster_config_(nullptr) {}

ClusterContext::~ClusterContext() {
if (!finalized_) {
Finalize();
}
}

std::shared_ptr<ClusterContext> ClusterContext::instance() {
static std::shared_ptr<ClusterContext> cluster_instance = nullptr;
if (cluster_instance == nullptr) {
cluster_instance.reset(new (std::nothrow) ClusterContext());
MS_EXCEPTION_IF_NULL(cluster_instance);
}
return cluster_instance;
}

void ClusterContext::Initialize() {
if (inited_) {
MS_LOG(INFO) << "The cluster has been initialized.";
return;
}

// Step 1: Initialize cluster configuration.
InitClusterConfig();

// Step 2: Build network for this cluster. Every process will block in this method until networking is done.
if (!BuildCluster()) {
MS_LOG(EXCEPTION) << "Building networking for " << node_role_ << " failed.";
return;
}

inited_ = true;
finalized_ = false;
}

void ClusterContext::Finalize() {
if (finalized_) {
return;
}
// In some cases, one node calls the Finish function while other nodes don't. So timeout is acceptable.
if (!node_->Finish()) {
MS_LOG(WARNING) << "Finishing node " << node_role_ << " timeout.";
}
if (!node_->Stop()) {
MS_LOG(ERROR) << "Failed to stop node " << node_role_;
return;
}
finalized_ = true;
wait_finish_cond_.notify_all();
}

std::string ClusterContext::node_role() const { return node_role_; }

void ClusterContext::InitClusterConfig() {
InitNodeRole();
InitSchedulerIp();
InitSchedulerPort();
ps::PSContext::instance()->set_worker_num(node_num_each_role_[kEnvRoleOfWorker]);
ps::PSContext::instance()->set_server_num(node_num_each_role_[kEnvRoleOfServer]);
ps::PSContext::instance()->set_scheduler_ip(scheduler_host_);
ps::PSContext::instance()->set_scheduler_port(scheduler_port_);
ps::PSContext::instance()->cluster_config().initial_worker_num = node_num_each_role_[kEnvRoleOfWorker];
ps::PSContext::instance()->cluster_config().initial_server_num = node_num_each_role_[kEnvRoleOfServer];
ps::PSContext::instance()->cluster_config().scheduler_host = scheduler_host_;
ps::PSContext::instance()->cluster_config().scheduler_port = scheduler_port_;
}

bool ClusterContext::BuildCluster() {
// Create node according to different role.
if (node_role_ == kEnvRoleOfWorker) {
node_ = std::make_shared<ps::core::WorkerNode>();
} else if (node_role_ == kEnvRoleOfServer) {
node_ = std::make_shared<ps::core::ServerNode>();
} else if (node_role_ == kEnvRoleOfScheduler) {
node_ = std::make_shared<ps::core::SchedulerNode>();
} else {
MS_LOG(EXCEPTION) << "The role " << node_role_ << " is invalid.";
return false;
}
MS_EXCEPTION_IF_NULL(node_);

RegisterEventCallback();
if (!node_->Start()) {
MS_LOG(EXCEPTION) << "Building network failed.";
return false;
}
MS_LOG(INFO) << "Cluster is successfully initialized.";
return true;
}

void ClusterContext::InitNodeRole() {
node_role_ = common::GetEnv(kEnvRole);
if (kValidRoleName.count(node_role_) == 0) {
MS_LOG(EXCEPTION) << "Role name " << node_role_ << " is invalid.";
return;
}

if (common::GetEnv(kEnvWorkerNum).empty()) {
node_num_each_role_[kEnvRoleOfWorker] = 0;
} else {
TRY_AND_CATCH_WITH_EXCEPTION(
(node_num_each_role_[kEnvRoleOfWorker] = IntToUint(std::stoi(common::GetEnv(kEnvWorkerNum)))),
"The environment variable MS_WORKER_NUM is invalid.");
}

if (common::GetEnv(kEnvServerNum).empty()) {
node_num_each_role_[kEnvRoleOfServer] = 0;
} else {
TRY_AND_CATCH_WITH_EXCEPTION(
(node_num_each_role_[kEnvRoleOfServer] = IntToUint(std::stoi(common::GetEnv(kEnvServerNum)))),
"The environment variable MS_SERVER_NUM is invalid.");
}
}

void ClusterContext::InitSchedulerIp() {
scheduler_host_ = common::GetEnv(kEnvSchedulerHost);
if (scheduler_host_ != kLocalHost) {
MS_LOG(EXCEPTION) << "Scheduler IP should be 127.0.0.1";
}
}

void ClusterContext::InitSchedulerPort() {
TRY_AND_CATCH_WITH_EXCEPTION((scheduler_port_ = static_cast<uint16_t>(std::stoi(common::GetEnv(kEnvSchedulerPort)))),
"The environment variable MS_SCHED_PORT is invalid.");
if (scheduler_port_ > kMaxPort) {
MS_LOG(EXCEPTION) << "The port: " << scheduler_port_ << " is invalid.";
}
}

void ClusterContext::RegisterEventCallback() {
auto abstract_node = std::dynamic_pointer_cast<ps::core::AbstractNode>(node_);
if (abstract_node != nullptr) {
abstract_node->RegisterEventCallback(ps::core::ClusterEvent::SCHEDULER_TIMEOUT, [this]() {
MS_LOG(ERROR) << "Event SCHEDULER_TIMEOUT is captured.";
Finalize();
try {
MS_LOG(EXCEPTION)
<< "Event SCHEDULER_TIMEOUT is captured. This is because scheduler node is finalized or crashed.";
} catch (std::exception &) {
MsException::Instance().SetException();
}
});

abstract_node->RegisterEventCallback(ps::core::ClusterEvent::NODE_TIMEOUT, [this]() {
MS_LOG(ERROR) << "Event NODE_TIMEOUT is captured.";
Finalize();
try {
MS_LOG(EXCEPTION) << "Event NODE_TIMEOUT is captured. This is because some nodes are finalized or crashed.";
} catch (std::exception &) {
MsException::Instance().SetException();
}
});
}
}
} // namespace cluster
} // namespace distributed
} // namespace mindspore

+ 101
- 0
mindspore/ccsrc/distributed/cluster/cluster_context.h View File

@@ -0,0 +1,101 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_CLUSTER_CONTEXT_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_CLUSTER_CONTEXT_H_

#include <map>
#include <set>
#include <string>
#include <memory>
#include <atomic>
#include <vector>
#include "distributed/constants.h"
#include "utils/log_adapter.h"
#include "utils/ms_utils.h"

#include "ps/core/cluster_config.h"
#include "ps/core/node.h"
#include "ps/core/worker_node.h"
#include "ps/core/server_node.h"
#include "ps/core/scheduler_node.h"

namespace mindspore {
namespace distributed {
namespace cluster {
// Node role based cluster built by MindSpore communication framework.
class ClusterContext {
public:
~ClusterContext();
DISABLE_COPY_AND_ASSIGN(ClusterContext)
static std::shared_ptr<ClusterContext> instance();

// Initialize the cluster configuration and build network.
void Initialize();

// Finalize the cluster and process exits.
void Finalize();

std::string node_role() const;

private:
ClusterContext();

// This initializing cluster configurations. They can be exported by environment variables, set by python API or
// configuration file.
void InitClusterConfig();

// Build the cluster with other processes. This method will not return until the networking is done.
bool BuildCluster();

// Load the cluster configuration like worker number, server number and etc.
void InitNodeRole();
void InitSchedulerIp();
void InitSchedulerPort();

// Register event callbacks for NODE_TIMEOUT, SCHEDULER_TIMEOUT, etc.
void RegisterEventCallback();

// The flag that whether this cluster context instance is already initialized.
std::atomic_bool inited_;

// The flag that whether this cluster context instance is already finalized.
std::atomic_bool finalized_;

// The condition variable and mutex about exiting status of this node.
std::mutex wait_finish_mutex_;
std::condition_variable wait_finish_cond_;

// Node role to role number map.
std::map<std::string, uint32_t> node_num_each_role_;

// Scheduler information.
std::string scheduler_host_;
uint16_t scheduler_port_;

// The node could be Worker, Server or Scheduler, etc.
std::shared_ptr<ps::core::Node> node_;

// The role of this process in the cluster.
std::string node_role_;

// The configuration of this cluster.
std::unique_ptr<ps::core::ClusterConfig> cluster_config_;
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_CLUSTER_CONTEXT_H_

+ 39
- 0
mindspore/ccsrc/distributed/cluster/dummy_cluster_context.cc View File

@@ -0,0 +1,39 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <string>
#include "distributed/cluster/dummy_cluster_context.h"

namespace mindspore {
namespace distributed {
namespace cluster {
std::shared_ptr<ClusterContext> ClusterContext::instance() {
static std::shared_ptr<ClusterContext> cluster_instance = nullptr;
if (cluster_instance == nullptr) {
cluster_instance.reset(new (std::nothrow) ClusterContext());
MS_EXCEPTION_IF_NULL(cluster_instance);
}
return cluster_instance;
}

void ClusterContext::Initialize() const { return; }

void ClusterContext::Finalize() const { return; }

std::string ClusterContext::node_role() const { return ""; }
} // namespace cluster
} // namespace distributed
} // namespace mindspore

+ 50
- 0
mindspore/ccsrc/distributed/cluster/dummy_cluster_context.h View File

@@ -0,0 +1,50 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_CLUSTER_CONTEXT_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_CLUSTER_CONTEXT_H_

#include <map>
#include <set>
#include <string>
#include <memory>
#include <atomic>
#include <vector>
#include "distributed/constants.h"
#include "utils/log_adapter.h"
#include "utils/ms_utils.h"

namespace mindspore {
namespace distributed {
namespace cluster {
// The dummy cluster context interface. This class is the stub for some test cases and windows compiling.
class ClusterContext {
public:
~ClusterContext() = default;
DISABLE_COPY_AND_ASSIGN(ClusterContext)
static std::shared_ptr<ClusterContext> instance();

void Initialize() const;
void Finalize() const;
std::string node_role() const;

private:
ClusterContext() = default;
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_CLUSTER_CONTEXT_H_

+ 41
- 0
mindspore/ccsrc/distributed/constants.h View File

@@ -0,0 +1,41 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CONSTANTS_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CONSTANTS_H_

#include <set>
#include <string>

namespace mindspore {
namespace distributed {
constexpr char kEnvServerNum[] = "MS_SERVER_NUM";
constexpr char kEnvWorkerNum[] = "MS_WORKER_NUM";
constexpr char kEnvSchedulerHost[] = "MS_SCHED_HOST";
constexpr char kEnvSchedulerPort[] = "MS_SCHED_PORT";

constexpr char kEnvRole[] = "MS_ROLE";
constexpr char kEnvRoleOfServer[] = "MS_SERVER";
constexpr char kEnvRoleOfWorker[] = "MS_WORKER";
constexpr char kEnvRoleOfScheduler[] = "MS_SCHED";
const std::set<std::string> kValidRoleName = {kEnvRoleOfServer, kEnvRoleOfWorker, kEnvRoleOfScheduler};

constexpr char kLocalHost[] = "127.0.0.1";
const uint16_t kDefaultSchedPort = 6667;
const uint16_t kMaxPort = 65535;
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CONSTANTS_H_

+ 46
- 0
mindspore/ccsrc/runtime/hardware/cpu/ms_collective_comm_lib.cc View File

@@ -0,0 +1,46 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "runtime/hardware/cpu/ms_collective_comm_lib.h"

namespace mindspore {
namespace device {
namespace cpu {
bool MsCollectiveCommLib::Initialize(uint32_t global_rank, uint32_t global_rank_size) {
if (initialized_) {
return false;
}

global_rank_id_ = global_rank;
global_rank_size_ = global_rank_size;
initialized_ = true;
return true;
}

bool MsCollectiveCommLib::Finalize() { return true; }

bool MsCollectiveCommLib::CreateCommunicationGroup(const std::string &group_name,
const std::vector<uint32_t> &group_ranks) {
CHECK_RET((groups_.count(group_name) == 0), true, "The group " + group_name + " has already existed.");

MsCommunicationGroupPtr group = std::make_shared<MsCommunicationGroup>(group_name, group_ranks, global_rank_id_);
CHECK_IF_NULL(group);
groups_[group_name] = group;
return true;
}
} // namespace cpu
} // namespace device
} // namespace mindspore

+ 1
- 1
mindspore/ccsrc/runtime/hardware/cpu/ms_collective_comm_lib.h View File

@@ -21,6 +21,7 @@
#include <vector>
#include <string>
#include "runtime/hardware/collective/collective_communication_lib.h"
#include "runtime/hardware/cpu/ms_communication_group.h"

namespace mindspore {
namespace device {
@@ -37,7 +38,6 @@ class MsCollectiveCommLib : public CollectiveCommunicationLib {
bool Finalize() override;

bool CreateCommunicationGroup(const std::string &group_name, const std::vector<uint32_t> &group_ranks) override;
bool DestroyCommunicationGroup(const std::string &group_name) override;

private:
MsCollectiveCommLib() {}


+ 6
- 4
mindspore/ccsrc/runtime/hardware/cpu/ms_communication_group.h View File

@@ -19,7 +19,8 @@

#include <string>
#include <vector>
#include "runtime/hardware/communication_group.h"
#include <memory>
#include "runtime/hardware/collective/communication_group.h"

namespace mindspore {
namespace device {
@@ -27,13 +28,14 @@ namespace cpu {
class MsCommunicationGroup : public CommunicationGroup {
public:
explicit MsCommunicationGroup(const std::string name, const std::vector<uint32_t> &group_ranks, uint32_t global_rank)
: MsCommunicationGroup(name, group_ranks, global_rank) {}
: CommunicationGroup(name, group_ranks, global_rank) {}

~MsCommunicationGroup() override = default;

bool Initialize(void *root_info) override;
bool Finalize() override;
bool Initialize(void *root_info) override { return true; }
bool Finalize() override { return true; }
};
using MsCommunicationGroupPtr = std::shared_ptr<MsCommunicationGroup>;
} // namespace cpu
} // namespace device
} // namespace mindspore


+ 9
- 0
mindspore/core/utils/ms_utils.h View File

@@ -27,6 +27,15 @@
ClassType(const ClassType &) = delete; \
ClassType &operator=(const ClassType &) = delete;

#define TRY_AND_CATCH_WITH_EXCEPTION(expr, error_msg) \
do { \
try { \
(expr); \
} catch (const std::exception &e) { \
MS_LOG(EXCEPTION) << "Caught exception " << e.what() << ". " << error_msg; \
} \
} while (0)

namespace mindspore {
namespace common {
// TODO(jiaorui): delete


Loading…
Cancel
Save