Browse Source

fix longrunning segment fault

tags/v1.2.0-rc1
kswang 4 years ago
parent
commit
93b3055a47
2 changed files with 23 additions and 24 deletions
  1. +22
    -22
      mindspore/ccsrc/backend/session/executor.cc
  2. +1
    -2
      mindspore/ccsrc/backend/session/executor.h

+ 22
- 22
mindspore/ccsrc/backend/session/executor.cc View File

@@ -277,34 +277,39 @@ void Executor::ClearDoneTasks() {
done_tasks_.clear(); done_tasks_.clear();
} }


void Executor::RunTask(const std::shared_ptr<Task> &task, bool sync) {
void Executor::RunTask(const std::shared_ptr<Task> &task, bool sync, bool long_run) {
{ {
std::lock_guard<std::mutex> lock(task_mutex_); std::lock_guard<std::mutex> lock(task_mutex_);
ready_tasks_.push(task); ready_tasks_.push(task);
} }
sync_run_task_finished_ = false; sync_run_task_finished_ = false;
task_cond_var_.notify_all(); task_cond_var_.notify_all();
ClearDoneTasks();
if (sync && !sync_run_task_finished_) { if (sync && !sync_run_task_finished_) {
std::unique_lock<std::mutex> lock(task_mutex_); std::unique_lock<std::mutex> lock(task_mutex_);
sync_cond_var_.wait(lock, [this] {
bool finished = sync_run_task_finished_;
return finished;
});
if (long_run) {
mindspore::ScopedLongRunning long_running;
sync_cond_var_.wait(lock, [this] {
bool finished = sync_run_task_finished_;
return finished;
});
} else {
sync_cond_var_.wait(lock, [this] {
bool finished = sync_run_task_finished_;
return finished;
});
}
} }
ClearDoneTasks(); ClearDoneTasks();
MsException::Instance().CheckException(); MsException::Instance().CheckException();
} }


void Executor::SyncRunTask(const std::shared_ptr<Task> &task) { RunTask(task, true); }

GraphId Executor::CompileGraph(const SessionPtr &session, const GraphSegmentPtr &segment, GraphId Executor::CompileGraph(const SessionPtr &session, const GraphSegmentPtr &segment,
const AnfNodePtrList &outputs) { const AnfNodePtrList &outputs) {
auto task = std::make_shared<CompileNodesTask>(); auto task = std::make_shared<CompileNodesTask>();
task->session_ = session; task->session_ = session;
task->segment_ = segment; task->segment_ = segment;
task->output_nodes_ = outputs; task->output_nodes_ = outputs;
SyncRunTask(task);
RunTask(task, true);
return task->graph_id_; return task->graph_id_;
} }


@@ -312,7 +317,7 @@ GraphId Executor::CompileGraph(const SessionPtr &session, NotNull<FuncGraphPtr>
auto task = std::make_shared<CompileGraphTask>(); auto task = std::make_shared<CompileGraphTask>();
task->session_ = session; task->session_ = session;
task->func_graph_ = func_graph.get(); task->func_graph_ = func_graph.get();
SyncRunTask(task);
RunTask(task, true);
return task->graph_id_; return task->graph_id_;
} }


@@ -320,7 +325,7 @@ void Executor::BuildGraph(const SessionPtr &session, GraphId graphId) {
auto task = std::make_shared<BuildGraphTask>(); auto task = std::make_shared<BuildGraphTask>();
task->session_ = session; task->session_ = session;
task->graph_id_ = graphId; task->graph_id_ = graphId;
SyncRunTask(task);
RunTask(task, true);
} }


void Executor::RunGraph(const SessionPtr &session, const GraphId &graph_id, void Executor::RunGraph(const SessionPtr &session, const GraphId &graph_id,
@@ -334,8 +339,7 @@ void Executor::RunGraph(const SessionPtr &session, const GraphId &graph_id,
session->CreateOutputTensors(graph_id, inputs, outputs, &task->tensor_to_node_); session->CreateOutputTensors(graph_id, inputs, outputs, &task->tensor_to_node_);
task->outputs_ = *outputs; task->outputs_ = *outputs;
task->sync_run_ = true; task->sync_run_ = true;
mindspore::ScopedLongRunning long_running;
SyncRunTask(task);
RunTask(task, true, true);
} }


void Executor::WaitTaskGraphAvailable(const SessionPtr &session, const std::shared_ptr<RunGraphTask> &task) { void Executor::WaitTaskGraphAvailable(const SessionPtr &session, const std::shared_ptr<RunGraphTask> &task) {
@@ -350,7 +354,6 @@ void Executor::WaitTaskGraphAvailable(const SessionPtr &session, const std::shar
} }
} }
if (need_lock) { if (need_lock) {
ClearDoneTasks();
mindspore::ScopedLongRunning long_running; mindspore::ScopedLongRunning long_running;
for (auto &tensor : task->input_tensors_) { for (auto &tensor : task->input_tensors_) {
if (tensor->NeedWait() && !tensor->IsGraphOutput()) { if (tensor->NeedWait() && !tensor->IsGraphOutput()) {
@@ -365,7 +368,6 @@ void Executor::WaitTaskGraphAvailable(const SessionPtr &session, const std::shar
} }
auto graph = session->GetGraph(task->graph_id_); auto graph = session->GetGraph(task->graph_id_);
if (graph != nullptr && !graph->IsPostGraphFinished()) { if (graph != nullptr && !graph->IsPostGraphFinished()) {
ClearDoneTasks();
mindspore::ScopedLongRunning long_running; mindspore::ScopedLongRunning long_running;
std::unique_lock<std::mutex> lock(reenter_mutex_); std::unique_lock<std::mutex> lock(reenter_mutex_);
reenter_cond_var_.wait(lock, [&graph] { return graph->IsPostGraphFinished(); }); reenter_cond_var_.wait(lock, [&graph] { return graph->IsPostGraphFinished(); });
@@ -388,8 +390,7 @@ void Executor::RunGraphAsync(const SessionPtr &session, const GraphId &graph_id,
// sync run graph without output tensor(int dataset graph) // sync run graph without output tensor(int dataset graph)
if (!TensorInVector(outputs)) { if (!TensorInVector(outputs)) {
task->sync_run_ = true; task->sync_run_ = true;
mindspore::ScopedLongRunning long_running;
SyncRunTask(task);
RunTask(task, true, true);
return; return;
} }
WaitTaskGraphAvailable(session, task); WaitTaskGraphAvailable(session, task);
@@ -415,8 +416,7 @@ void Executor::RunOp(const SessionPtr &session, OpRunInfo *op_run_info, const Gr
tensor->Wait(); tensor->Wait();
} }
} }
mindspore::ScopedLongRunning long_running;
SyncRunTask(task);
RunTask(task, true, true);
*outputs = task->outputs_; *outputs = task->outputs_;
} }


@@ -428,7 +428,7 @@ void Executor::RunOpsInGraph(const SessionPtr &session, const GraphId &graph_id,
task->session_ = session; task->session_ = session;
task->graph_id_ = graph_id; task->graph_id_ = graph_id;
task->input_tensors_ = inputs; task->input_tensors_ = inputs;
SyncRunTask(task);
RunTask(task, true);
*outputs = task->outputs_; *outputs = task->outputs_;
} }


@@ -436,14 +436,14 @@ bool Executor::CreateCommGroup(const std::string &group_name, std::vector<uint32
auto task = std::make_shared<CreateCommGroupTask>(); auto task = std::make_shared<CreateCommGroupTask>();
task->group_name_ = group_name; task->group_name_ = group_name;
task->ranks_ = ranks; task->ranks_ = ranks;
SyncRunTask(task);
RunTask(task, true);
return task->result_; return task->result_;
} }


bool Executor::DestroyCommGroup(const std::string &group_name) { bool Executor::DestroyCommGroup(const std::string &group_name) {
auto task = std::make_shared<DestroyCommGroupTask>(); auto task = std::make_shared<DestroyCommGroupTask>();
task->group_name_ = group_name; task->group_name_ = group_name;
SyncRunTask(task);
RunTask(task, true);
return task->result_; return task->result_;
} }




+ 1
- 2
mindspore/ccsrc/backend/session/executor.h View File

@@ -172,8 +172,7 @@ class Executor {
void OnEvent(const ExecutorEvent &event); void OnEvent(const ExecutorEvent &event);


private: private:
void RunTask(const std::shared_ptr<Task> &task, bool sync);
void SyncRunTask(const std::shared_ptr<Task> &task);
void RunTask(const std::shared_ptr<Task> &task, bool sync, bool long_run = false);
void UpdateOutputTensors(VectorRef *outputs, void UpdateOutputTensors(VectorRef *outputs,
const std::map<tensor::TensorPtr, session::KernelWithIndex> &tensor_to_node); const std::map<tensor::TensorPtr, session::KernelWithIndex> &tensor_to_node);
std::vector<std::shared_ptr<RunGraphTask>> GetNewReadyTasks(); std::vector<std::shared_ptr<RunGraphTask>> GetNewReadyTasks();


Loading…
Cancel
Save