| @@ -15,8 +15,8 @@ | |||||
| */ | */ | ||||
| #include "backend/session/executor.h" | #include "backend/session/executor.h" | ||||
| #include <exception> | #include <exception> | ||||
| #include "runtime/device/kernel_runtime_manager.h" | |||||
| #include "backend/session/executor_manager.h" | #include "backend/session/executor_manager.h" | ||||
| #include "runtime/device/kernel_runtime_manager.h" | |||||
| #include "utils/comm_manager.h" | #include "utils/comm_manager.h" | ||||
| #include "utils/scoped_long_running.h" | #include "utils/scoped_long_running.h" | ||||
| @@ -52,6 +52,19 @@ void UpdateOutputTensors(const VectorRef *outputs, | |||||
| tensor->set_device_address(nullptr); | tensor->set_device_address(nullptr); | ||||
| tensor->set_sync_status(kNeedSyncHostToDevice); | tensor->set_sync_status(kNeedSyncHostToDevice); | ||||
| } | } | ||||
| } | |||||
| } | |||||
| } | |||||
| void NotifyOutputTensors(const VectorRef *outputs) { | |||||
| MS_EXCEPTION_IF_NULL(outputs); | |||||
| for (auto item : *outputs) { | |||||
| if (utils::isa<VectorRefPtr>(item)) { | |||||
| auto vector_ref = utils::cast<VectorRef>(item); | |||||
| NotifyOutputTensors(&vector_ref); | |||||
| } else if (utils::isa<tensor::TensorPtr>(item)) { | |||||
| auto tensor = utils::cast<tensor::TensorPtr>(item); | |||||
| MS_EXCEPTION_IF_NULL(tensor); | |||||
| tensor->SetNeedWait(false); | tensor->SetNeedWait(false); | ||||
| } | } | ||||
| } | } | ||||
| @@ -92,10 +105,12 @@ void RunGraphTask::Run() { | |||||
| MS_EXCEPTION_IF_NULL(session_); | MS_EXCEPTION_IF_NULL(session_); | ||||
| try { | try { | ||||
| session_->RunGraphImpl(graph_id_, input_tensors_, &outputs_); | session_->RunGraphImpl(graph_id_, input_tensors_, &outputs_); | ||||
| UpdateOutputTensors(&outputs_, tensor_to_node_); | |||||
| } catch (const std::exception &e) { | } catch (const std::exception &e) { | ||||
| MsException::GetInstance().SetException(); | MsException::GetInstance().SetException(); | ||||
| } | } | ||||
| UpdateOutputTensors(&outputs_, tensor_to_node_); | |||||
| NotifyOutputTensors(&outputs_); | |||||
| for (auto &tensor : input_need_lock_tensors_) { | for (auto &tensor : input_need_lock_tensors_) { | ||||
| tensor->SetNeedWait(false); | tensor->SetNeedWait(false); | ||||
| } | } | ||||
| @@ -252,19 +267,19 @@ void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id, | |||||
| const std::vector<tensor::TensorPtr> &inputs, VectorRef *outputs) { | const std::vector<tensor::TensorPtr> &inputs, VectorRef *outputs) { | ||||
| MS_EXCEPTION_IF_NULL(session); | MS_EXCEPTION_IF_NULL(session); | ||||
| MS_EXCEPTION_IF_NULL(outputs); | MS_EXCEPTION_IF_NULL(outputs); | ||||
| if (session != nullptr) { | |||||
| RunGraph(session, graph_id, inputs, outputs); | |||||
| return; | |||||
| } | |||||
| auto task = std::make_shared<RunGraphTask>(); | auto task = std::make_shared<RunGraphTask>(); | ||||
| task->session_ = session; | task->session_ = session; | ||||
| task->graph_id_ = graph_id; | task->graph_id_ = graph_id; | ||||
| task->input_tensors_ = inputs; | task->input_tensors_ = inputs; | ||||
| task->input_need_lock_tensors_ = session->GetNeedLockInputTensors(graph_id, inputs); | task->input_need_lock_tensors_ = session->GetNeedLockInputTensors(graph_id, inputs); | ||||
| // lock inputs | |||||
| for (auto &tensor : inputs) { | for (auto &tensor : inputs) { | ||||
| if (tensor->NeedWait()) { | if (tensor->NeedWait()) { | ||||
| task->input_need_wait_tensors_.emplace_back(tensor); | |||||
| if (tensor->IsGraphOutput()) { | |||||
| task->input_need_wait_tensors_.emplace_back(tensor); | |||||
| } else { | |||||
| mindspore::ScopedLongRunning long_running; | |||||
| tensor->Wait(); | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| for (auto &tensor : task->input_need_lock_tensors_) { | for (auto &tensor : task->input_need_lock_tensors_) { | ||||
| @@ -78,6 +78,7 @@ tensor::TensorPtr CreateCNodeOutputTensor(const session::KernelWithIndex &node_o | |||||
| tensor->set_padding_type(AnfAlgo::GetOutputReshapeType(node, output_index)); | tensor->set_padding_type(AnfAlgo::GetOutputReshapeType(node, output_index)); | ||||
| tensor->set_sync_status(kNoNeedSync); | tensor->set_sync_status(kNoNeedSync); | ||||
| tensor->SetNeedWait(true); | tensor->SetNeedWait(true); | ||||
| tensor->SetIsGraphOutput(); | |||||
| return tensor; | return tensor; | ||||
| } | } | ||||
| @@ -102,6 +103,7 @@ tensor::TensorPtr CreateCNodeOutputTensor(const session::KernelWithIndex &node_o | |||||
| tensor->set_sync_status(kNeedSyncDeviceToHost); | tensor->set_sync_status(kNeedSyncDeviceToHost); | ||||
| } | } | ||||
| tensor->SetNeedWait(true); | tensor->SetNeedWait(true); | ||||
| tensor->SetIsGraphOutput(); | |||||
| return tensor; | return tensor; | ||||
| } | } | ||||
| @@ -1041,7 +1043,7 @@ std::vector<tensor::TensorPtr> SessionBasic::GetNeedLockInputTensors(const Graph | |||||
| } | } | ||||
| std::vector<tensor::TensorPtr> result; | std::vector<tensor::TensorPtr> result; | ||||
| for (auto &tensor : inputs) { | for (auto &tensor : inputs) { | ||||
| if (!tensor->NeedWait()) { | |||||
| if (!tensor->IsGraphOutput()) { | |||||
| result.emplace_back(tensor); | result.emplace_back(tensor); | ||||
| } | } | ||||
| } | } | ||||
| @@ -258,6 +258,10 @@ py::tuple TensorPy::GetPyTupleShape(const Tensor &tensor) { | |||||
| } | } | ||||
| py::array TensorPy::SyncAsNumpy(const Tensor &tensor) { | py::array TensorPy::SyncAsNumpy(const Tensor &tensor) { | ||||
| if (tensor.NeedWait()) { | |||||
| py::gil_scoped_release gil_release; | |||||
| tensor.Wait(); | |||||
| } | |||||
| tensor.data_sync(); | tensor.data_sync(); | ||||
| return AsNumpy(tensor); | return AsNumpy(tensor); | ||||
| } | } | ||||
| @@ -176,6 +176,7 @@ tensor::TensorPtr CPUKernelRuntime::CreatTensorForOutput(session::KernelGraph *k | |||||
| (void)bound_addresses_.insert(address); | (void)bound_addresses_.insert(address); | ||||
| } | } | ||||
| tensor->SetNeedWait(true); | tensor->SetNeedWait(true); | ||||
| tensor->SetIsGraphOutput(); | |||||
| return tensor; | return tensor; | ||||
| } | } | ||||
| @@ -174,6 +174,7 @@ constexpr auto kStridedReadOpName = "StridedRead"; | |||||
| constexpr auto kStridedWriteOpName = "StridedWrite"; | constexpr auto kStridedWriteOpName = "StridedWrite"; | ||||
| constexpr auto kFusedAdamWeightDecayName = "FusedAdamWeightDecay"; | constexpr auto kFusedAdamWeightDecayName = "FusedAdamWeightDecay"; | ||||
| constexpr auto kFusedAdamName = "FusedAdam"; | constexpr auto kFusedAdamName = "FusedAdam"; | ||||
| constexpr auto kFusedSparseAdamName = "FusedSparseAdam"; | |||||
| constexpr auto kApplyAdagradV2OpName = "ApplyAdagradV2"; | constexpr auto kApplyAdagradV2OpName = "ApplyAdagradV2"; | ||||
| constexpr auto kSparseApplyAdagradV2OpName = "SparseApplyAdagradV2"; | constexpr auto kSparseApplyAdagradV2OpName = "SparseApplyAdagradV2"; | ||||
| constexpr auto kSparseApplyFtrlOpName = "SparseApplyFtrl"; | constexpr auto kSparseApplyFtrlOpName = "SparseApplyFtrl"; | ||||
| @@ -385,6 +386,7 @@ const std::set<std::string> kOptOperatorSet = { | |||||
| kApplyRMSPropOpName, | kApplyRMSPropOpName, | ||||
| kFusedAdamWeightDecayName, | kFusedAdamWeightDecayName, | ||||
| kFusedAdamName, | kFusedAdamName, | ||||
| kFusedSparseAdamName, | |||||
| kFusedWeightScaleApplyMomentum, | kFusedWeightScaleApplyMomentum, | ||||
| kFusedScaleApplyMomentum, | kFusedScaleApplyMomentum, | ||||
| kPullOpName, | kPullOpName, | ||||
| @@ -306,12 +306,16 @@ class Tensor : public MetaTensor { | |||||
| bool NeedSyncHostToDevice() const { return sync_status_ == kNeedSyncHostToDevice; } | bool NeedSyncHostToDevice() const { return sync_status_ == kNeedSyncHostToDevice; } | ||||
| bool IsGraphOutput() { return graph_output_; } | |||||
| void SetIsGraphOutput() { graph_output_ = true; } | |||||
| private: | private: | ||||
| bool init_flag_{false}; | bool init_flag_{false}; | ||||
| TensorDataPtr data_{nullptr}; | TensorDataPtr data_{nullptr}; | ||||
| std::string id_{""}; | std::string id_{""}; | ||||
| mutable std::shared_ptr<WaitEvent> event_{nullptr}; | mutable std::shared_ptr<WaitEvent> event_{nullptr}; | ||||
| mutable TensorSyncStatus sync_status_{kNeedSyncHostToDevice}; | mutable TensorSyncStatus sync_status_{kNeedSyncHostToDevice}; | ||||
| bool graph_output_{false}; | |||||
| DeviceSyncPtr device_sync_{nullptr}; | DeviceSyncPtr device_sync_{nullptr}; | ||||
| std::vector<Axis> padding_type_; | std::vector<Axis> padding_type_; | ||||
| TypePtr cast_dtype_{nullptr}; | TypePtr cast_dtype_{nullptr}; | ||||