Browse Source

!30016 Add distributed meta store module

Merge pull request !30016 from ZPaC/metadata-store-module
feature/build-system-rewrite
i-robot Gitee 4 years ago
parent
commit
2decc54375
No known key found for this signature in database GPG Key ID: 173E9B9CA92EEF8F
10 changed files with 321 additions and 1 deletions
  1. +1
    -0
      mindspore/ccsrc/distributed/CMakeLists.txt
  2. +30
    -0
      mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.cc
  3. +53
    -0
      mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.h
  4. +34
    -0
      mindspore/ccsrc/distributed/cluster/actor_route_table_service.cc
  5. +62
    -0
      mindspore/ccsrc/distributed/cluster/actor_route_table_service.h
  6. +41
    -0
      mindspore/ccsrc/distributed/cluster/dummy_actor_route_table_proxy.h
  7. +17
    -0
      mindspore/ccsrc/ps/core/protos/comm.proto
  8. +56
    -0
      mindspore/ccsrc/ps/core/scheduler_node.cc
  9. +26
    -1
      mindspore/ccsrc/ps/core/scheduler_node.h
  10. +1
    -0
      tests/ut/cpp/CMakeLists.txt

+ 1
- 0
mindspore/ccsrc/distributed/CMakeLists.txt View File

@@ -2,6 +2,7 @@ file(GLOB_RECURSE _DISTRIBUTED_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*

if(NOT ENABLE_CPU OR WIN32)
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/cluster_context.cc")
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/actor_route_table_proxy.cc")
else()
list(REMOVE_ITEM _DISTRIBUTED_SRC_FILES "cluster/dummy_cluster_context.cc")
endif()


+ 30
- 0
mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.cc View File

@@ -0,0 +1,30 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <string>
#include "distributed/cluster/actor_route_table_proxy.h"

namespace mindspore {
namespace distributed {
namespace cluster {
bool ActorRouteTableProxy::RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr) { return true; }

bool ActorRouteTableProxy::DeleteRoute(const std::string &actor_id) { return true; }

ActorAddress ActorRouteTableProxy::LookupRoute(const std::string &actor_id) const { return {}; }
} // namespace cluster
} // namespace distributed
} // namespace mindspore

+ 53
- 0
mindspore/ccsrc/distributed/cluster/actor_route_table_proxy.h View File

@@ -0,0 +1,53 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_PROXY_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_PROXY_H_

#include <string>
#include <memory>
#include "proto/comm.pb.h"
#include "ps/core/node.h"
#include "distributed/constants.h"

namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
// Actor route table proxy for nodes like workers and server. This class helps update actor route table in scheduler
// across the network.
class ActorRouteTableProxy {
public:
explicit ActorRouteTableProxy(const std::shared_ptr<ps::core::Node> &node) : node_(node) {}
~ActorRouteTableProxy() = default;

// Register actor address to the route table stored in scheduler.
bool RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr);

// Delete the actor address of the specified actor_id from the route table stored in scheduler.
bool DeleteRoute(const std::string &actor_id);

// Get the actor address for the specified actor_id from the route table stored in scheduler.
ActorAddress LookupRoute(const std::string &actor_id) const;

private:
// The node variable helps proxy to communicate with scheduler, e.g., SendMessage.
std::shared_ptr<ps::core::Node> node_;
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_PROXY_H_

+ 34
- 0
mindspore/ccsrc/distributed/cluster/actor_route_table_service.cc View File

@@ -0,0 +1,34 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "distributed/cluster/actor_route_table_service.h"

namespace mindspore {
namespace distributed {
namespace cluster {
bool ActorRouteTableService::Initialize() { return true; }

bool ActorRouteTableService::RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr,
std::string *error) {
return true;
}

bool ActorRouteTableService::DeleteRoute(const std::string &actor_id, std::string *error) { return true; }

ActorAddress ActorRouteTableService::LookupRoute(const std::string &actor_id, std::string *error) { return {}; }
} // namespace cluster
} // namespace distributed
} // namespace mindspore

+ 62
- 0
mindspore/ccsrc/distributed/cluster/actor_route_table_service.h View File

@@ -0,0 +1,62 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_SERVICE_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_SERVICE_H_

#include <map>
#include <string>
#include <memory>
#include <shared_mutex>
#include "proto/comm.pb.h"
#include "distributed/constants.h"

namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
// Metadata of actor's route table is physically stored in scheduler node. It receives requests from other nodes like
// workers and servers to update the actor route table.
class ActorRouteTableService {
public:
ActorRouteTableService() = default;
~ActorRouteTableService() = default;

bool Initialize();

// Register actor address to the route table. Parameter 'error' represents the failure information if this operation
// failed.
bool RegisterRoute(const std::string &actor_id, const ActorAddress &actor_addr, std::string *error);

// Delete the actor address of the specified actor_id. Parameter 'error' represents the failure information if this
// operation failed.
bool DeleteRoute(const std::string &actor_id, std::string *error);

// Get the actor address for the specified actor_id. Parameter 'error' represents the failure information if this
// operation failed.
ActorAddress LookupRoute(const std::string &actor_id, std::string *error);

private:
// Metadata of actor address which will used in rpc actors' inter-process communication as 'actor route table'.
std::map<std::string, ActorAddress> actor_addresses_;

// Read/write lock for the actor route table.
std::shared_mutex mtx_;
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_ACTOR_ROUTE_TABLE_SERVICE_H_

+ 41
- 0
mindspore/ccsrc/distributed/cluster/dummy_actor_route_table_proxy.h View File

@@ -0,0 +1,41 @@
/**
* Copyright 2022 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_

#include <string>
#include "proto/comm.pb.h"

namespace mindspore {
namespace distributed {
namespace cluster {
using ps::core::ActorAddress;
// The dummy ActorRouteTableProxy interface. This class is for ut test and windows compiling so the implementation is
// empty.
class ActorRouteTableProxy {
public:
ActorRouteTableProxy() = default;
~ActorRouteTableProxy() = default;

bool RegisterRoute(const std::string &, const ActorAddress &) { return true; }
bool DeleteRoute(const std::string &) { return true; }
ActorAddress LookupRoute(const std::string &) const { return {}; }
};
} // namespace cluster
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_CLUSTER_DUMMY_ACTOR_ROUTE_TABLE_PROXY_H_

+ 17
- 0
mindspore/ccsrc/ps/core/protos/comm.proto View File

@@ -43,6 +43,12 @@ enum NodeCommand {
SCHEDULER_RECOVERY = 13;
// This command is used to send prepare building network msg.
PREPARE_BUILDING_NETWORK = 14;
// Register address for actor's route table.
REGISTER_ACTOR_ROUTE = 15;
// Delete address of actor.
DELETE_ACTOR_ROUTE = 16;
// Lookup address of the actor.
LOOKUP_ACTOR_ROUTE = 17;
}

enum NodeRole {
@@ -208,3 +214,14 @@ message EventRespMessage {
message ScaleInFinishMessage {
bool is_all_nodes_registered = 1;
}

message GeneralResponseMsg {
bool is_success = 1;
string error = 2;
}

message ActorAddress {
string actor_id = 1;
string ip = 2;
uint32 port = 3;
}

+ 56
- 0
mindspore/ccsrc/ps/core/scheduler_node.cc View File

@@ -61,6 +61,8 @@ bool SchedulerNode::Start(const uint32_t &timeout) {
StartUpdatePersistentCommandTimer();
MS_LOG(INFO) << "[Scheduler start]: 4. Successfully start scheduler, there are " << node_manager_.worker_num()
<< " workers and " << node_manager_.server_num() << " servers registered.";

InitializeActorRouteTableService();
return true;
}

@@ -201,6 +203,13 @@ void SchedulerNode::InitCommandHandler() {
handlers_[NodeCommand::SCALE_OUT_DONE] = &SchedulerNode::ProcessScaleOutDone;
handlers_[NodeCommand::SCALE_IN_DONE] = &SchedulerNode::ProcessScaleInDone;
handlers_[NodeCommand::SEND_EVENT] = &SchedulerNode::ProcessSendEvent;
RegisterActorRouteTableServiceHandler();
}

void SchedulerNode::RegisterActorRouteTableServiceHandler() {
handlers_[NodeCommand::REGISTER_ACTOR_ROUTE] = &SchedulerNode::ProcessRegisterActorRoute;
handlers_[NodeCommand::DELETE_ACTOR_ROUTE] = &SchedulerNode::ProcessDeleteActorRoute;
handlers_[NodeCommand::LOOKUP_ACTOR_ROUTE] = &SchedulerNode::ProcessLookupActorRoute;
}

void SchedulerNode::CreateTcpServer() {
@@ -476,6 +485,28 @@ void SchedulerNode::ProcessSendEvent(const std::shared_ptr<TcpServer> &server,
}
}

void SchedulerNode::ProcessRegisterActorRoute(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size) {
MS_ERROR_IF_NULL_WO_RET_VAL(data);
MS_ERROR_IF_NULL_WO_RET_VAL(actor_route_table_service_);
ActorAddress actor_address;
actor_address.ParseFromArray(data, SizeToInt(size));
std::string actor_id = actor_address.actor_id();

std::string error = "";
bool ret = actor_route_table_service_->RegisterRoute(actor_id, actor_address, &error);
GeneralResponse(server, conn, meta, ret, error);
}

void SchedulerNode::ProcessDeleteActorRoute(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size) {}

void SchedulerNode::ProcessLookupActorRoute(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size) {}

bool SchedulerNode::SendPrepareBuildingNetwork(const std::unordered_map<std::string, NodeInfo> &node_infos) {
uint64_t request_id = AddMessageTrack(node_infos.size());
for (const auto &kvs : node_infos) {
@@ -636,6 +667,11 @@ void SchedulerNode::StartUpdatePersistentCommandTimer() {
MS_EXCEPTION_IF_NULL(update_persistent_cmd_thread_);
}

void SchedulerNode::InitializeActorRouteTableService() {
actor_route_table_service_ = std::make_unique<ActorRouteTableService>();
MS_EXCEPTION_IF_NULL(actor_route_table_service_);
}

const std::shared_ptr<TcpClient> &SchedulerNode::GetOrCreateClient(const NodeInfo &node_info) {
std::lock_guard<std::mutex> lock(client_mutex_);
if (connected_nodes_.count(node_info.node_id_)) {
@@ -1383,6 +1419,26 @@ void SchedulerNode::SetRegisterConnectionFd(const std::shared_ptr<TcpConnection>
MS_LOG(INFO) << "register client fd:" << fd << ", register client id:" << node_id;
register_connection_fd_[fd] = node_id;
}

void SchedulerNode::GeneralResponse(const std::shared_ptr<TcpServer> &server,
const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, bool is_success,
const std::string &error) {
MS_ERROR_IF_NULL_WO_RET_VAL(server);
MS_ERROR_IF_NULL_WO_RET_VAL(conn);
MS_ERROR_IF_NULL_WO_RET_VAL(meta);

GeneralResponseMsg general_response_message;
general_response_message.set_is_success(is_success);
general_response_message.set_error(error);

if (!server->SendMessage(conn, meta, Protos::PROTOBUF, general_response_message.SerializeAsString().data(),
general_response_message.ByteSizeLong())) {
MS_LOG(ERROR) << "Scheduler failed to respond message.";
return;
}
return;
}
} // namespace core
} // namespace ps
} // namespace mindspore

+ 26
- 1
mindspore/ccsrc/ps/core/scheduler_node.h View File

@@ -42,10 +42,12 @@
#include "ps/core/leader_scaler.h"
#include "ps/core/recovery_base.h"
#include "ps/core/instance_manager.h"
#include "distributed/cluster/actor_route_table_service.h"

namespace mindspore {
namespace ps {
namespace core {
using distributed::cluster::ActorRouteTableService;
class SchedulerNode : public Node {
public:
SchedulerNode()
@@ -60,7 +62,8 @@ class SchedulerNode : public Node {
leader_scaler_(nullptr),
scheduler_recovery_(nullptr),
persistent_cmd_(PersistentCommand::DEFAULT),
is_worker_timeout_(false) {}
is_worker_timeout_(false),
actor_route_table_service_(nullptr) {}
~SchedulerNode() override;

typedef void (SchedulerNode::*ResponseHandler)(const std::shared_ptr<TcpServer> &server,
@@ -81,6 +84,10 @@ class SchedulerNode : public Node {
// Persistent timer, periodically trigger persistent behavior.
void StartUpdatePersistentCommandTimer();

// Register and initialize the actor route table service.
void RegisterActorRouteTableServiceHandler();
void InitializeActorRouteTableService();

const std::shared_ptr<TcpClient> &GetOrCreateClient(const NodeInfo &node_info);

void ProcessHeartbeat(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
@@ -103,6 +110,18 @@ class SchedulerNode : public Node {
void ProcessSendEvent(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);

// Process register actor route messages from other nodes.
void ProcessRegisterActorRoute(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);

// Process delete actor route messages from other nodes.
void ProcessDeleteActorRoute(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);

// Process lookup actor route messages from other nodes.
void ProcessLookupActorRoute(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, const void *data, size_t size);

// Determine whether the registration request of the node should be rejected, the registration of the
// alive node should be rejected.
virtual bool NeedRejectRegister(const NodeInfo &node_info) { return false; }
@@ -176,6 +195,10 @@ class SchedulerNode : public Node {

bool SendPrepareBuildingNetwork(const std::unordered_map<std::string, NodeInfo> &node_infos);

// Responding peer with the general response message.
void GeneralResponse(const std::shared_ptr<TcpServer> &server, const std::shared_ptr<TcpConnection> &conn,
const std::shared_ptr<MessageMeta> &meta, bool is_success, const std::string &error);

std::shared_ptr<TcpServer> server_;
std::unique_ptr<std::thread> scheduler_thread_;
std::unique_ptr<std::thread> update_state_thread_;
@@ -214,6 +237,8 @@ class SchedulerNode : public Node {
std::atomic<bool> is_worker_timeout_;
// This is a map of register connection fd to client node id
std::unordered_map<int, std::string> register_connection_fd_;

std::unique_ptr<ActorRouteTableService> actor_route_table_service_;
};
} // namespace core
} // namespace ps


+ 1
- 0
tests/ut/cpp/CMakeLists.txt View File

@@ -178,6 +178,7 @@ file(GLOB_RECURSE MINDSPORE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR}
"../../../mindspore/ccsrc/transform/graph_ir/op_declare/*.cc"
"../../../mindspore/ccsrc/ps/*.cc"
"../../../mindspore/ccsrc/fl/*.cc"
"../../../mindspore/ccsrc/distributed/cluster/actor_route_table_service.cc"
"../../../mindspore/ccsrc/distributed/persistent/*.cc"
"../../../mindspore/ccsrc/distributed/rpc/tcp/*.cc"
"../../../mindspore/ccsrc/profiler/device/ascend/*.cc"


Loading…
Cancel
Save