diff --git a/mindspore/ccsrc/runtime/framework/actor/actor_common.h b/mindspore/ccsrc/runtime/framework/actor/actor_common.h index b7835f3301..6882ec9d86 100644 --- a/mindspore/ccsrc/runtime/framework/actor/actor_common.h +++ b/mindspore/ccsrc/runtime/framework/actor/actor_common.h @@ -66,7 +66,10 @@ enum class KernelTransformType { kGatherActor, kEntranceActor, kExitActor, - kStackActor + kStackActor, + // RPC actor type. + kSendActor, + kRecvActor }; #define SET_OPCONTEXT_FAIL_RET_WITH_ERROR(op_context, message) \ diff --git a/mindspore/ccsrc/runtime/framework/actor/actor_set.h b/mindspore/ccsrc/runtime/framework/actor/actor_set.h index 32cc950939..c466a40221 100644 --- a/mindspore/ccsrc/runtime/framework/actor/actor_set.h +++ b/mindspore/ccsrc/runtime/framework/actor/actor_set.h @@ -30,6 +30,8 @@ #include "runtime/framework/actor/loop_count_actor.h" #include "runtime/framework/actor/kernel_actor.h" #include "runtime/framework/actor/custom_actor.h" +#include "runtime/framework/actor/rpc/send_actor.h" +#include "runtime/framework/actor/rpc/recv_actor.h" #include "runtime/framework/actor/super_kernel_actor.h" #include "runtime/framework/actor/output_actor.h" #include "runtime/framework/actor/copy_actor.h" @@ -61,6 +63,16 @@ struct ControlActorSet { }; using ControlActorSetPtr = std::shared_ptr; +// Rpc actor set is a series of actors implemented to communicate with other processes. In distributed execution mode, +// the graph could be considered as partitioned to different processes, which is connected by these rpc actors. Send +// actors are in charge of sending data to other processes. Recv actors are in charge of receiving data from other +// processes. +struct RpcActorSet { + std::vector send_actors_; + std::vector recv_actors_; +}; +using RpcActorSetPtr = std::shared_ptr; + // The actor set generated by graph transformer is the execution unit of actor runtime. // It includes data source actor, kernel actor, switch actor, copy actor, loop count actor and output actor. // The data prepare actor is used to prepare data for device tensor store and host tensor queue to represent the begin diff --git a/mindspore/ccsrc/runtime/framework/actor/rpc/recv_actor.h b/mindspore/ccsrc/runtime/framework/actor/rpc/recv_actor.h new file mode 100644 index 0000000000..a77f66b826 --- /dev/null +++ b/mindspore/ccsrc/runtime/framework/actor/rpc/recv_actor.h @@ -0,0 +1,56 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_RECV_ACTOR_H_ +#define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_RECV_ACTOR_H_ + +#include +#include +#include +#include "runtime/framework/actor/rpc/rpc_actor.h" + +namespace mindspore { +namespace runtime { +// RecvActor inherits from RpcActor and it's used to receive data from other processes. +class RecvActor : public RpcActor { + public: + RecvActor(const std::string &name, const CNodePtr &kernel, const DeviceContext *device_context, + const AID &memory_manager_aid, const AID *debug_aid, const AID *recorder_aid) + : RpcActor(name, KernelTransformType::kRecvActor, kernel, device_context, memory_manager_aid, debug_aid, + recorder_aid) {} + ~RecvActor() override = default; + + void Init() override; + + // The memory related operation interface. + void SendMemoryAllocReq(OpContext *const context) override; + void SendMemoryFreeReq(OpContext *const context) override; + // The callback after memory alloc finished. + void OnMemoryAllocFinish(OpContext *const context) override; + + protected: + void Run(OpContext *const context) override; + void SendRecorderInfo(OpContext *const context) const override; + + private: + friend class GraphScheduler; +}; + +using RecvActorPtr = std::shared_ptr; +} // namespace runtime +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_RECV_ACTOR_H_ diff --git a/mindspore/ccsrc/runtime/framework/actor/rpc/rpc_actor.h b/mindspore/ccsrc/runtime/framework/actor/rpc/rpc_actor.h new file mode 100644 index 0000000000..f98dc32665 --- /dev/null +++ b/mindspore/ccsrc/runtime/framework/actor/rpc/rpc_actor.h @@ -0,0 +1,63 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_ +#define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_ + +#include +#include +#include +#include +#include "runtime/framework/actor/debug_aware_actor.h" + +namespace mindspore { +namespace runtime { +using mindspore::device::KernelInfo; + +// RpcActor is used to do rpc with other processes in distributed execution. +// Besides data arrows and controlling arrows, RpcActor also has iter-process arrows which is in charge of remote +// communication with other processes. It supports both sync and async communication. +class RpcActor : public DebugAwareActor { + public: + RpcActor(const std::string &name, KernelTransformType type, const CNodePtr &kernel, + const DeviceContext *device_context, const AID &memory_manager_aid, const AID *debug_aid, + const AID *recorder_aid) + : DebugAwareActor(name, type, recorder_aid, memory_manager_aid, debug_aid), + rpc_kernel_(kernel), + kernel_info_(nullptr) { + (void)device_contexts_.emplace_back(device_context); + } + ~RpcActor() override = default; + + const CNodePtr &kernel() const { return rpc_kernel_; } + + protected: + // The arrows represent iter-process communication. + std::vector iter_process_input_arrows_; + std::vector iter_process_output_arrows_; + + private: + friend class GraphScheduler; + + CNodePtr rpc_kernel_; + KernelInfo *kernel_info_; +}; + +using RpcActorPtr = std::shared_ptr; +} // namespace runtime +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTO_RPC_RPC_ACTOR_H_ diff --git a/mindspore/ccsrc/runtime/framework/actor/rpc/send_actor.h b/mindspore/ccsrc/runtime/framework/actor/rpc/send_actor.h new file mode 100644 index 0000000000..da43debb14 --- /dev/null +++ b/mindspore/ccsrc/runtime/framework/actor/rpc/send_actor.h @@ -0,0 +1,56 @@ +/** + * Copyright 2022 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_SEND_ACTOR_H_ +#define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_SEND_ACTOR_H_ + +#include +#include +#include +#include "runtime/framework/actor/rpc/rpc_actor.h" + +namespace mindspore { +namespace runtime { +// SendActor inherits from RpcActor and it's used to send data to other processes. +class SendActor : public RpcActor { + public: + SendActor(const std::string &name, const CNodePtr &kernel, const DeviceContext *device_context, + const AID &memory_manager_aid, const AID *debug_aid, const AID *recorder_aid) + : RpcActor(name, KernelTransformType::kSendActor, kernel, device_context, memory_manager_aid, debug_aid, + recorder_aid) {} + ~SendActor() override = default; + + void Init() override; + + // The memory related operation interface. + void SendMemoryAllocReq(OpContext *const context) override; + void SendMemoryFreeReq(OpContext *const context) override; + // The callback after memory alloc finished. + void OnMemoryAllocFinish(OpContext *const context) override; + + protected: + void Run(OpContext *const context) override; + void SendRecorderInfo(OpContext *const context) const override; + + private: + friend class GraphScheduler; +}; + +using SendActorPtr = std::shared_ptr; +} // namespace runtime +} // namespace mindspore + +#endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_RPC_SEND_ACTOR_H_