From fba1dd8f2fff21158170e96a07fde149272e2c57 Mon Sep 17 00:00:00 2001 From: limingqi107 Date: Fri, 23 Apr 2021 17:16:16 +0800 Subject: [PATCH] add the continue memory alloc of communication kernel for actor runtime --- .../ccsrc/backend/session/kernel_graph.h | 6 ++ mindspore/ccsrc/pipeline/jit/pipeline.cc | 5 +- .../runtime/device/gpu/gpu_memory_manager.cc | 1 + .../ccsrc/runtime/device/memory_manager.cc | 1 + .../framework/actor/data_source_actor.cc | 21 +++-- .../runtime/framework/actor/kernel_actor.cc | 23 ++++- .../framework/actor/loop_count_actor.cc | 9 +- .../ccsrc/runtime/framework/graph_compiler.cc | 6 ++ .../runtime/framework/graph_scheduler.cc | 84 +++++++++++++++++++ .../ccsrc/runtime/framework/graph_scheduler.h | 1 + .../ccsrc/runtime/hardware/device_context.h | 2 +- .../hardware/gpu/gpu_device_context.cc | 39 ++++++--- .../runtime/hardware/gpu/gpu_device_context.h | 8 +- mindspore/ccsrc/vm/backend.cc | 2 + 14 files changed, 179 insertions(+), 29 deletions(-) diff --git a/mindspore/ccsrc/backend/session/kernel_graph.h b/mindspore/ccsrc/backend/session/kernel_graph.h index b17d2a59fd..5dfa87824f 100644 --- a/mindspore/ccsrc/backend/session/kernel_graph.h +++ b/mindspore/ccsrc/backend/session/kernel_graph.h @@ -292,6 +292,9 @@ class KernelGraph : public FuncGraph { // set flag to indicate whether has multi-call. void set_subgraph_multi_call(bool flag) { has_subgraph_multicall_ = flag; } + bool is_all_nop_node() const { return is_all_nop_node_; } + void set_is_all_nop_node(bool is_all_nop_node) { is_all_nop_node_ = is_all_nop_node; } + private: // remove value node form graph bool RemoveValueNodeFromGraph(const ValueNodePtr &value_node); @@ -381,6 +384,9 @@ class KernelGraph : public FuncGraph { // Number of labels. This is also the 'batch_num' for DavinciModel, // It should be 1 if no labels used for control flow. uint32_t label_num_ = 1; + + // If all the nodes of graph is the nop node. + bool is_all_nop_node_{false}; }; } // namespace session using KernelGraphPtr = std::shared_ptr; diff --git a/mindspore/ccsrc/pipeline/jit/pipeline.cc b/mindspore/ccsrc/pipeline/jit/pipeline.cc index 66e365f567..0767ab07b7 100644 --- a/mindspore/ccsrc/pipeline/jit/pipeline.cc +++ b/mindspore/ccsrc/pipeline/jit/pipeline.cc @@ -987,12 +987,13 @@ bool InitExecDatasetVm(const std::string &queue_name, int64_t size, int64_t batc // AbstractNone indicates there is no output for this apply node. auto abstract_none = std::make_shared(); app_init->set_abstract(abstract_none); + // Before the graph compiling, need reset the iter num. + ConfigManager::GetInstance().ResetIterNum(); auto backend = compile::CreateBackend(); MS_EXCEPTION_IF_NULL(backend); // The data set graph compiling and running of mindRT. if (compile::IsMindRTUsed()) { - ConfigManager::GetInstance().set_iter_num(size); const auto &mindrt_backend = std::dynamic_pointer_cast(backend); MS_EXCEPTION_IF_NULL(mindrt_backend); auto graph_id = mindrt_backend->CompileGraphs(func_graph); @@ -1000,13 +1001,13 @@ bool InitExecDatasetVm(const std::string &queue_name, int64_t size, int64_t batc if (need_run) { (void)mindrt_backend->RunGraph(graph_id, args); } + ConfigManager::GetInstance().set_iter_num(size); return true; } auto convert_fn = backend->convert_fn(); MS_EXCEPTION_IF_NULL(convert_fn); // Convert CNodeList to LinConvertResult. - ConfigManager::GetInstance().set_iter_num(1); auto segment = std::make_shared(std::vector{app_init}, false); auto runner = convert_fn(segment, ""); if (MsContext::GetInstance()->get_param(MS_CTX_EXECUTION_MODE) != kPynativeMode) { diff --git a/mindspore/ccsrc/runtime/device/gpu/gpu_memory_manager.cc b/mindspore/ccsrc/runtime/device/gpu/gpu_memory_manager.cc index 20e3323544..c3d5f230ca 100644 --- a/mindspore/ccsrc/runtime/device/gpu/gpu_memory_manager.cc +++ b/mindspore/ccsrc/runtime/device/gpu/gpu_memory_manager.cc @@ -61,6 +61,7 @@ bool GPUMemoryManager::MallocContinuousMemFromMemPool(const DeviceAddressPtrList FreeMemFromMemPool(old_addr); } addr_list[i]->ptr_ = new_addr; + addr_list[i]->size_ = size_list[i]; addr_list[i]->from_mem_pool_ = true; } if (need_sync_stream) { diff --git a/mindspore/ccsrc/runtime/device/memory_manager.cc b/mindspore/ccsrc/runtime/device/memory_manager.cc index 29919eab68..5c6d702e0a 100644 --- a/mindspore/ccsrc/runtime/device/memory_manager.cc +++ b/mindspore/ccsrc/runtime/device/memory_manager.cc @@ -214,6 +214,7 @@ bool MemoryManager::MallocContinuousMemFromMemPool(const DeviceAddressPtrList ad MS_EXCEPTION_IF_NULL(device_ptr_list[i]); MS_EXCEPTION_IF_NULL(addr_list[i]); addr_list[i]->ptr_ = device_ptr_list[i]; + addr_list[i]->size_ = size_list[i]; addr_list[i]->from_mem_pool_ = true; } return true; diff --git a/mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc b/mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc index 4118b922b4..c396aafcd0 100644 --- a/mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc +++ b/mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc @@ -27,9 +27,12 @@ void DataSourceActor::FetchData(OpContext *context) { MS_LOG(INFO) << "Data source actor(" << GetAID().Name() << ") fetches data."; MS_EXCEPTION_IF_NULL(context); if (buffers_.size() == buffer_capacity_) { - // Send output to trigger computing and free memory. - SendOutput(context); + // Note that FreeMemory must be before SendOutput, because SendOutput will trigger AllocateMemory of the next actor + // and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor is + // before AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure the + // execution order and avoid the illegal memory timing problem. FreeMemory(context); + SendOutput(context); buffers_.pop(); return; } @@ -110,9 +113,12 @@ void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext *co SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info); } - // Send output to trigger computing and free memory. - SendOutput(context); + // Note that FreeMemory must be in front of SendOutput, because SendOutput will trigger AllocateMemory of the next + // actor and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor + // is in front of AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure + // the execution order and avoid the illegal memory timing problem. FreeMemory(context); + SendOutput(context); buffers_.pop(); } @@ -156,9 +162,12 @@ void HostQueueDataSourceActor::OnMemoryAllocFinish(OpContext *cont } } - // Send output to trigger computing and free memory. - SendOutput(context); + // Note that FreeMemory must be in front of SendOutput, because SendOutput will trigger AllocateMemory of the next + // actor and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor + // is in front of AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure + // the execution order and avoid the illegal memory timing problem. FreeMemory(context); + SendOutput(context); buffers_.pop(); } } // namespace runtime diff --git a/mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc b/mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc index 8c82adf6f5..37101c783a 100644 --- a/mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc +++ b/mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc @@ -74,9 +74,16 @@ void KernelActor::OnMemoryAllocFinish(OpContext *context) { std::string error_info = "Launch kernel failed: " + kernel_->ToString(); SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info); } - SendOutput(context); - FreeMemory(context); + + // The input is invalid and needs to be erased when finish kernel launch. EraseInput(context); + + // Note that FreeMemory must be in front of SendOutput, because SendOutput will trigger AllocateMemory of the next + // actor and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor + // is in front of AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure + // the execution order and avoid the illegal memory timing problem. + FreeMemory(context); + SendOutput(context); } bool KernelActor::CheckLaunchCondition(OpContext *context) const { @@ -188,11 +195,19 @@ void KernelActor::SendOutput(OpContext *context) const { void KernelActor::EraseInput(OpContext *context) { MS_EXCEPTION_IF_NULL(context); if (input_datas_num_ != 0) { - (void)input_op_datas_.erase(context->sequential_num_); + auto ret = input_op_datas_.erase(context->sequential_num_); + if (ret == 0) { + std::string error_info = "Erase input data failed: " + GetAID().Name(); + SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info); + } } if (input_controls_num_ != 0) { - (void)input_op_controls_.erase(context->sequential_num_); + auto ret = input_op_controls_.erase(context->sequential_num_); + if (ret == 0) { + std::string error_info = "Erase input controls failed: " + GetAID().Name(); + SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info); + } } } diff --git a/mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc b/mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc index 615b7677a1..f24a42952a 100644 --- a/mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc +++ b/mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc @@ -27,9 +27,14 @@ void LoopCountActor::RunOpControl(AID *input_control, OpContext *c auto sequential_num = context->sequential_num_; input_op_controls_[sequential_num].emplace_back(input_control); if (input_op_controls_[sequential_num].size() == input_controls_num_) { + auto ret = input_op_controls_.erase(sequential_num); + if (ret == 0) { + std::string error_info = "Erase input controls failed: " + GetAID().Name(); + SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info); + } + current_count_++; - (void)input_op_controls_.erase(sequential_num); - MS_LOG(INFO) << "Loop count actor(" << GetAID().Name() << ") runs op control, loop count: " << loop_count_ + MS_LOG(INFO) << "Loop count actor(" << GetAID().Name() << ") running, loop count: " << loop_count_ << ", current count: " << current_count_; if (current_count_ == loop_count_) { current_count_ = 0; diff --git a/mindspore/ccsrc/runtime/framework/graph_compiler.cc b/mindspore/ccsrc/runtime/framework/graph_compiler.cc index 829b6107c6..ebb080bec9 100644 --- a/mindspore/ccsrc/runtime/framework/graph_compiler.cc +++ b/mindspore/ccsrc/runtime/framework/graph_compiler.cc @@ -22,6 +22,7 @@ #include "common/trans.h" #include "utils/convert_utils.h" #include "ir/tensor.h" +#include "backend/optimizer/common/helper.h" namespace mindspore { namespace runtime { @@ -226,6 +227,8 @@ GraphId GraphCompiler::CompileGraphImpl(const KernelGraphPtr &graph) const { // Create device address for all anf nodes of graph. CreateDeviceAddress(graph); + graph->set_is_all_nop_node(opt::IsAllNopNode(graph.get())); + return graph->graph_id(); } @@ -257,6 +260,9 @@ GraphId GraphCompiler::CompileGraph(session::OpRunInfo *op_run_info, const Graph // Create device address for all anf nodes of graph. CreateDeviceAddress(graph); + + graph->set_is_all_nop_node(opt::IsAllNopNode(graph.get())); + // Transform graph to actor DAG, contains build and link. GraphScheduler::GetInstance().Transform({graph}, {device_context_}, input_tensors, nullptr, GraphExecutionStrategy::kStep); diff --git a/mindspore/ccsrc/runtime/framework/graph_scheduler.cc b/mindspore/ccsrc/runtime/framework/graph_scheduler.cc index dd970d3be2..9e457408c0 100644 --- a/mindspore/ccsrc/runtime/framework/graph_scheduler.cc +++ b/mindspore/ccsrc/runtime/framework/graph_scheduler.cc @@ -283,6 +283,82 @@ BaseRef CreateOutputTensors(const AnfNodePtr &output_node, const KernelGraphPtr return CreateOutputTensor(item_with_index, graph, input_tensors); } + +void AllocateContinuousMemoryForInput(const AnfNodePtr &kernel, const DeviceContext *device_context, + bool is_all_nop_node) { + MS_EXCEPTION_IF_NULL(kernel); + MS_EXCEPTION_IF_NULL(device_context); + bool is_need_alloc_memory = false; + size_t total_size = 0; + std::vector size_list; + std::vector addr_list; + + const auto &kernel_mod = AnfAlgo::GetKernelMod(kernel); + MS_EXCEPTION_IF_NULL(kernel_mod); + const auto &intput_sizes = kernel_mod->GetInputSizeList(); + for (size_t i = 0; i < intput_sizes.size(); ++i) { + DeviceTensorPtr device_tensor; + if (is_all_nop_node) { + // Graph may be all nop nodes and not remove nop node, so this can not skip nop node. + device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(kernel, i, false); + } else { + device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(kernel, i, true); + } + MS_EXCEPTION_IF_NULL(device_tensor); + // In the scene of communication op and computing op parallel multi stream, the input address of communication op + // can't be reused, so set the max reference count. + device_tensor->set_ref_count(SIZE_MAX); + device_tensor->ResetRefCountUsed(); + + if (device_tensor->GetPtr() == nullptr) { + is_need_alloc_memory = true; + } + total_size += intput_sizes[i]; + size_list.emplace_back(intput_sizes[i]); + addr_list.emplace_back(device_tensor); + } + + if (is_need_alloc_memory) { + auto ret = device_context->AllocateContinuousMemory(addr_list, total_size, size_list); + if (!ret) { + MS_LOG(EXCEPTION) << "Malloc device memory failed."; + } + } +} + +void AllocateContinuousMemoryForOutput(const AnfNodePtr &kernel, const DeviceContext *device_context) { + MS_EXCEPTION_IF_NULL(kernel); + MS_EXCEPTION_IF_NULL(device_context); + bool is_need_alloc_memory = false; + size_t total_size = 0; + std::vector size_list; + std::vector addr_list; + + const auto &kernel_mod = AnfAlgo::GetKernelMod(kernel); + MS_EXCEPTION_IF_NULL(kernel_mod); + const auto &output_sizes = kernel_mod->GetOutputSizeList(); + for (size_t i = 0; i < output_sizes.size(); ++i) { + const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(kernel, i, false); + MS_EXCEPTION_IF_NULL(device_tensor); + // One time application for continuous memory, so set the max reference count. + device_tensor->set_ref_count(SIZE_MAX); + device_tensor->ResetRefCountUsed(); + + if (device_tensor->GetPtr() == nullptr) { + is_need_alloc_memory = true; + } + total_size += output_sizes[i]; + size_list.emplace_back(output_sizes[i]); + addr_list.emplace_back(device_tensor); + } + + if (is_need_alloc_memory) { + auto ret = device_context->AllocateContinuousMemory(addr_list, total_size, size_list); + if (!ret) { + MS_LOG(EXCEPTION) << "Malloc device memory failed."; + } + } +} } // namespace void GraphScheduler::Initialize() { @@ -409,6 +485,14 @@ void GraphScheduler::PrepareRun(const KernelGraphPtr &graph, const std::vectorfullname_with_scope(); outputs->emplace_back(CreateOutputTensors(output_node, graph, *input_tensors)); } + + // 4.Prepare the continuous memory for communication kernel. + for (const auto &kernel : graph->execution_order()) { + if (AnfAlgo::IsCommunicationOp(kernel)) { + AllocateContinuousMemoryForInput(kernel, device_context, graph->is_all_nop_node()); + AllocateContinuousMemoryForOutput(kernel, device_context); + } + } } bool GraphScheduler::Run(const ActorSet *actor_set, GraphExecutionStrategy strategy) { diff --git a/mindspore/ccsrc/runtime/framework/graph_scheduler.h b/mindspore/ccsrc/runtime/framework/graph_scheduler.h index d8886366e0..d2aa36ff27 100644 --- a/mindspore/ccsrc/runtime/framework/graph_scheduler.h +++ b/mindspore/ccsrc/runtime/framework/graph_scheduler.h @@ -82,6 +82,7 @@ class GraphScheduler { // 1. Prepare the data of device tensor store(such as weights and value nodes of graph). // 2. Prepare the data of host tensor queue(such as non weighted parameters of graph). // 3. Prepare the output tensor of graph. + // 4.Prepare the continuous memory for communication kernel. void PrepareRun(const KernelGraphPtr &graph, const std::vector *input_tensors, VectorRef *const &outputs); // The processing entry of actors running. diff --git a/mindspore/ccsrc/runtime/hardware/device_context.h b/mindspore/ccsrc/runtime/hardware/device_context.h index f25f89ad40..6cfdd75186 100644 --- a/mindspore/ccsrc/runtime/hardware/device_context.h +++ b/mindspore/ccsrc/runtime/hardware/device_context.h @@ -58,7 +58,7 @@ class DeviceContext { // Allocate continuous device memory end to end into 'addr_list'. // Communication operators may need continuous memory for input and output // to optimize the communication performance. - virtual bool AllocateContinuousMemory(const std::vector &addr_list, size_t total_size, + virtual bool AllocateContinuousMemory(const std::vector &addr_list, size_t total_size, const std::vector &size_list) const { return true; } diff --git a/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc b/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc index f120047014..33eb6d65b1 100644 --- a/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc +++ b/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc @@ -34,6 +34,8 @@ namespace mindspore { namespace device { namespace gpu { +static thread_local bool cur_thread_device_inited{false}; + bool GPUDeviceContext::Initialize() { if (initialized_ == true) { CHECK_OP_RET_WITH_EXCEPT(CudaDriver::SetDevice(UintToInt(device_context_key_.device_id_)), @@ -130,6 +132,9 @@ void GPUDeviceContext::Destroy() { bool GPUDeviceContext::AllocateMemory(DeviceAddress *const &address, size_t size) const { MS_EXCEPTION_IF_NULL(address); + if (!BindDeviceToCurrentThread()) { + return false; + } auto device_ptr = mem_manager_->MallocMemFromMemPool(size); if (!device_ptr) { return false; @@ -147,22 +152,12 @@ void GPUDeviceContext::FreeMemory(DeviceAddress *const &address) const { address->ptr_ = nullptr; } -bool GPUDeviceContext::AllocateContinuousMemory(const std::vector &addr_list, size_t total_size, +bool GPUDeviceContext::AllocateContinuousMemory(const std::vector &addr_list, size_t total_size, const std::vector &size_list) const { - auto device_ptr_list = mem_manager_->MallocContinuousMemFromMemPool(total_size, size_list); - if (device_ptr_list.size() == 0) { + if (!BindDeviceToCurrentThread()) { return false; } - if (addr_list.size() != device_ptr_list.size()) { - MS_LOG(EXCEPTION) << "The size of device list is not equal to the size of address list."; - } - for (size_t i = 0; i < addr_list.size(); i++) { - MS_EXCEPTION_IF_NULL(device_ptr_list[i]); - MS_EXCEPTION_IF_NULL(addr_list[i]); - addr_list[i]->ptr_ = device_ptr_list[i]; - addr_list[i]->from_mem_pool_ = true; - } - return true; + return mem_manager_->MallocContinuousMemFromMemPool(addr_list, total_size, size_list); } DeviceAddressPtr GPUDeviceContext::CreateDeviceAddress(void *device_ptr, size_t device_size, const string &format, @@ -265,6 +260,10 @@ bool GPUDeviceContext::LaunchKernel(KernelMod *kernel_mod, const std::vector &workspace, const std::vector &outputs) const { MS_EXCEPTION_IF_NULL(kernel_mod); + if (!BindDeviceToCurrentThread()) { + return false; + } + std::lock_guard locker(launch_mutex_); return kernel_mod->Launch(inputs, workspace, outputs, streams_.front()); } @@ -275,6 +274,20 @@ bool GPUDeviceContext::SyncStream(size_t stream_id) const { return GPUDeviceManager::GetInstance().SyncStream(streams_[stream_id]); } +bool GPUDeviceContext::BindDeviceToCurrentThread() const { + if (cur_thread_device_inited) { + return true; + } + + if (!CudaDriver::SetDevice(UintToInt(device_context_key_.device_id_))) { + MS_LOG(ERROR) << "Failed to set device id: " << device_context_key_.device_id_; + return false; + } + + cur_thread_device_inited = true; + return true; +} + MS_REGISTER_DEVICE(kGPUDevice, GPUDeviceContext); } // namespace gpu } // namespace device diff --git a/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.h b/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.h index 37963b79bd..7e69bbaa5a 100644 --- a/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.h +++ b/mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.h @@ -41,7 +41,7 @@ class GPUDeviceContext : public DeviceContext { bool AllocateMemory(DeviceAddress *const &address, size_t size) const override; void FreeMemory(DeviceAddress *const &address) const override; - bool AllocateContinuousMemory(const std::vector &addr_list, size_t total_size, + bool AllocateContinuousMemory(const std::vector &addr_list, size_t total_size, const std::vector &size_list) const override; DeviceAddressPtr CreateDeviceAddress(void *device_ptr, size_t device_size, const string &format, @@ -73,6 +73,12 @@ class GPUDeviceContext : public DeviceContext { // Update Graph Dynamic Shape Attr. void UpdateGraphDynamicShapeAttr(const NotNull &graph) const; + bool BindDeviceToCurrentThread() const; + + // The cublas handle is not thread safety specifically, it is not recommended that multiple threads access the same + // cublas handle at the same time, so need the launch mutex when multiple threads launch the cublas kernels. + mutable std::mutex launch_mutex_; + std::shared_ptr mem_manager_; std::vector streams_; bool initialized_; diff --git a/mindspore/ccsrc/vm/backend.cc b/mindspore/ccsrc/vm/backend.cc index 53bc9fbe8f..c7b9c53e59 100644 --- a/mindspore/ccsrc/vm/backend.cc +++ b/mindspore/ccsrc/vm/backend.cc @@ -30,6 +30,7 @@ #include "runtime/hardware/device_context_manager.h" #include "runtime/framework/graph_compiler.h" #include "runtime/framework/graph_scheduler.h" +#include "utils/scoped_long_running.h" #ifdef ENABLE_GE #include "utils/callbacks_ge.h" #endif @@ -345,6 +346,7 @@ VectorRef MindRTBackend::RunGraph(GraphId graph_id, const VectorRef &args) { MS_EXCEPTION_IF_NULL(actor_set); // Run actor DAG. + mindspore::ScopedLongRunning long_running; VectorRef outputs; runtime::GraphScheduler::GetInstance().PrepareRun(kernel_graph, &inputs, &outputs); if (!runtime::GraphScheduler::GetInstance().Run(actor_set)) {