| @@ -56,8 +56,10 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, int32_t op_queue_size, int32_t n | |||||
| } | } | ||||
| Status BatchOp::operator()() { | Status BatchOp::operator()() { | ||||
| RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); | |||||
| Status rc = LaunchThreadsAndInitOp(); | |||||
| // Synchronize with TaskManager | |||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| RETURN_IF_NOT_OK(rc); | |||||
| int64_t epoch_num = 0, batch_num = 0, cnt = 0; | int64_t epoch_num = 0, batch_num = 0, cnt = 0; | ||||
| TensorRow new_row; | TensorRow new_row; | ||||
| std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>(); | std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>(); | ||||
| @@ -59,10 +59,10 @@ FilterOp::FilterOp(const std::vector<std::string> &in_col_names, int32_t num_wor | |||||
| : ParallelOp(num_workers, op_queue_size), predicate_func_(std::move(predicate_func)), in_columns_(in_col_names) {} | : ParallelOp(num_workers, op_queue_size), predicate_func_(std::move(predicate_func)), in_columns_(in_col_names) {} | ||||
| Status FilterOp::operator()() { | Status FilterOp::operator()() { | ||||
| // The operator class just starts off threads by calling the tree_ function. | |||||
| RETURN_UNEXPECTED_IF_NULL(tree_); | |||||
| // Synchronize with TaskManager. | // Synchronize with TaskManager. | ||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| // The operator class just starts off threads by calling the tree_ function. | |||||
| RETURN_UNEXPECTED_IF_NULL(tree_); | |||||
| filter_queues_.Init(num_workers_, oc_queue_size_); | filter_queues_.Init(num_workers_, oc_queue_size_); | ||||
| RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); | RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); | ||||
| RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1))); | RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1))); | ||||
| @@ -110,13 +110,18 @@ Status MapOp::operator()() { | |||||
| if (perf_mode_) { | if (perf_mode_) { | ||||
| // Create and register the local queues. | // Create and register the local queues. | ||||
| local_queues_.Init(num_workers_, oc_queue_size_); | local_queues_.Init(num_workers_, oc_queue_size_); | ||||
| RETURN_IF_NOT_OK(local_queues_.Register(tree_->AllTasks())); | |||||
| Status rc = local_queues_.Register(tree_->AllTasks()); | |||||
| if (rc.IsError()) { | |||||
| TaskManager::FindMe()->Post(); | |||||
| return rc; | |||||
| } | |||||
| } | } | ||||
| // The operator class just starts off threads by calling the tree_ function | // The operator class just starts off threads by calling the tree_ function | ||||
| RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1))); | |||||
| Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1)); | |||||
| // Synchronize with TaskManager | // Synchronize with TaskManager | ||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| RETURN_IF_NOT_OK(rc); | |||||
| if (perf_mode_) { | if (perf_mode_) { | ||||
| int64_t que_id = 0; | int64_t que_id = 0; | ||||