Merge pull request !5860 from xiefangqi/fix_dataset_register_issuetags/v1.0.0
| @@ -221,6 +221,10 @@ Status ClueOp::LoadFile(const std::string &file, const int64_t start_offset, con | |||||
| Status ClueOp::operator()() { | Status ClueOp::operator()() { | ||||
| RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); | RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); | ||||
| // Move register to the front of launching thread, this will fix the problem | |||||
| // when thread exit unnormally register will failed occasionally. | |||||
| RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); | |||||
| // launch one thread, responsible for filling IoBlockQueue | // launch one thread, responsible for filling IoBlockQueue | ||||
| RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&ClueOp::WaitToFillIOBlockQueue, this))); | RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&ClueOp::WaitToFillIOBlockQueue, this))); | ||||
| @@ -228,7 +232,6 @@ Status ClueOp::operator()() { | |||||
| // must be called after launching workers. | // must be called after launching workers. | ||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); | |||||
| NotifyToFillIOBlockQueue(); | NotifyToFillIOBlockQueue(); | ||||
| while (!finished_reading_dataset_) { | while (!finished_reading_dataset_) { | ||||
| @@ -518,6 +518,10 @@ Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, cons | |||||
| Status CsvOp::operator()() { | Status CsvOp::operator()() { | ||||
| RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); | RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); | ||||
| // Move register to the front of launching thread, this will fix the problem | |||||
| // when thread exit unnormally register will failed occasionally. | |||||
| RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); | |||||
| // launch one thread, responsible for filling IoBlockQueue | // launch one thread, responsible for filling IoBlockQueue | ||||
| RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&CsvOp::WaitToFillIOBlockQueue, this))); | RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&CsvOp::WaitToFillIOBlockQueue, this))); | ||||
| @@ -525,7 +529,6 @@ Status CsvOp::operator()() { | |||||
| // must be called after launching workers. | // must be called after launching workers. | ||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); | |||||
| NotifyToFillIOBlockQueue(); | NotifyToFillIOBlockQueue(); | ||||
| while (!finished_reading_dataset_) { | while (!finished_reading_dataset_) { | ||||
| @@ -378,6 +378,10 @@ void TextFileOp::NotifyToFillIOBlockQueue() { io_block_queue_wait_post_.Set(); } | |||||
| Status TextFileOp::operator()() { | Status TextFileOp::operator()() { | ||||
| RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); | RETURN_IF_NOT_OK(CalculateNumRowsPerShard()); | ||||
| // Move register to the front of launching thread, this will fix the problem | |||||
| // when thread exit unnormally register will failed occasionally. | |||||
| RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); | |||||
| // launch one thread, responsible for filling IoBlockQueue | // launch one thread, responsible for filling IoBlockQueue | ||||
| RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TextFileOp::WaitToFillIOBlockQueue, this))); | RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&TextFileOp::WaitToFillIOBlockQueue, this))); | ||||
| @@ -387,8 +391,6 @@ Status TextFileOp::operator()() { | |||||
| // must be called after launching workers. | // must be called after launching workers. | ||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks())); | |||||
| NotifyToFillIOBlockQueue(); | NotifyToFillIOBlockQueue(); | ||||
| while (!finished_reading_dataset_) { | while (!finished_reading_dataset_) { | ||||
| int64_t buffer_id = 0; | int64_t buffer_id = 0; | ||||