diff --git a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc index ad8b95b625..a86633e5b4 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/batch_op.cc @@ -56,8 +56,10 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, int32_t op_queue_size, int32_t n } Status BatchOp::operator()() { - RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); + Status rc = LaunchThreadsAndInitOp(); + // Synchronize with TaskManager TaskManager::FindMe()->Post(); + RETURN_IF_NOT_OK(rc); int64_t epoch_num = 0, batch_num = 0, cnt = 0; TensorRow new_row; std::unique_ptr table = std::make_unique(); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc index 5ede8ad6f4..238c58bbb0 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/filter_op.cc @@ -59,10 +59,10 @@ FilterOp::FilterOp(const std::vector &in_col_names, int32_t num_wor : ParallelOp(num_workers, op_queue_size), predicate_func_(std::move(predicate_func)), in_columns_(in_col_names) {} Status FilterOp::operator()() { - // The operator class just starts off threads by calling the tree_ function. - RETURN_UNEXPECTED_IF_NULL(tree_); // Synchronize with TaskManager. TaskManager::FindMe()->Post(); + // The operator class just starts off threads by calling the tree_ function. + RETURN_UNEXPECTED_IF_NULL(tree_); filter_queues_.Init(num_workers_, oc_queue_size_); RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1))); diff --git a/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc index 4cbe2ac603..9c5ecddf72 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/map_op.cc @@ -110,13 +110,18 @@ Status MapOp::operator()() { if (perf_mode_) { // Create and register the local queues. local_queues_.Init(num_workers_, oc_queue_size_); - RETURN_IF_NOT_OK(local_queues_.Register(tree_->AllTasks())); + Status rc = local_queues_.Register(tree_->AllTasks()); + if (rc.IsError()) { + TaskManager::FindMe()->Post(); + return rc; + } } // The operator class just starts off threads by calling the tree_ function - RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1))); + Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1)); // Synchronize with TaskManager TaskManager::FindMe()->Post(); + RETURN_IF_NOT_OK(rc); if (perf_mode_) { int64_t que_id = 0;