| @@ -63,9 +63,10 @@ Status FilterOp::operator()() { | |||||
| RETURN_UNEXPECTED_IF_NULL(tree_); | RETURN_UNEXPECTED_IF_NULL(tree_); | ||||
| filter_queues_.Init(num_workers_, oc_queue_size_); | filter_queues_.Init(num_workers_, oc_queue_size_); | ||||
| RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); | RETURN_IF_NOT_OK(filter_queues_.Register(tree_->AllTasks())); | ||||
| RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1))); | |||||
| Status rc = tree_->LaunchWorkers(num_workers_, std::bind(&FilterOp::WorkerEntry, this, std::placeholders::_1)); | |||||
| // Synchronize with TaskManager. | // Synchronize with TaskManager. | ||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| RETURN_IF_NOT_OK(rc); | |||||
| RETURN_IF_NOT_OK(Collector()); | RETURN_IF_NOT_OK(Collector()); | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -62,7 +62,7 @@ class DbConnector : public Connector<std::unique_ptr<DataBuffer>> { | |||||
| "[ERROR] nullptr detected when getting data from db connector"); | "[ERROR] nullptr detected when getting data from db connector"); | ||||
| } else { | } else { | ||||
| std::unique_lock<std::mutex> lk(m_); | std::unique_lock<std::mutex> lk(m_); | ||||
| RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; })); | |||||
| RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return (expect_consumer_ == worker_id) || end_of_file_; })); | |||||
| // Once an EOF message is encountered this flag will be set and we can return early. | // Once an EOF message is encountered this flag will be set and we can return early. | ||||
| if (end_of_file_) { | if (end_of_file_) { | ||||
| *result = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF); | *result = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF); | ||||