|
|
|
@@ -101,8 +101,9 @@ Status BatchOp::operator()() { |
|
|
|
table->emplace_back(new_row); |
|
|
|
// if # of rows is enough to make 1 batch (1 batch is buffer), send it to worker_queue |
|
|
|
if (table->size() == static_cast<size_t>(cur_batch_size)) { |
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt++ % num_workers_]->EmplaceBack( |
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt - epoch_num)))); |
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack( |
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num)))); |
|
|
|
cnt++; |
|
|
|
table = std::make_unique<TensorQTable>(); |
|
|
|
RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num))); |
|
|
|
} |
|
|
|
@@ -110,8 +111,9 @@ Status BatchOp::operator()() { |
|
|
|
} |
|
|
|
// Reminder logic, execute only when there is a remainder (table is non empty) and don't drop |
|
|
|
if (drop_ == false && table->empty() == false) { |
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt++ % num_workers_]->EmplaceBack( |
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt - epoch_num)))); |
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack( |
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num)))); |
|
|
|
cnt++; |
|
|
|
} |
|
|
|
table = std::make_unique<TensorQTable>(); // this drops when drop == true |
|
|
|
// end of the current epoch, batch_num should start from 0 again |
|
|
|
|