|
|
|
@@ -65,9 +65,6 @@ MapOp::MapOp(const std::vector<std::string> &in_col_names, const std::vector<std |
|
|
|
tfuncs_(std::move(tensor_funcs)), |
|
|
|
in_columns_(in_col_names), |
|
|
|
out_columns_(out_col_names), |
|
|
|
#if defined(_WIN32) || defined(_WIN64) |
|
|
|
eof_worker_id_(0), |
|
|
|
#endif |
|
|
|
perf_mode_(perf_mode) { |
|
|
|
// If caller didn't specify the out_col_names, assume they are same as the in_columns. |
|
|
|
if (out_columns_.empty() || out_columns_[0].empty()) { |
|
|
|
@@ -123,17 +120,6 @@ Status MapOp::operator()() { |
|
|
|
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0)); |
|
|
|
is_eof = buff->eof(); |
|
|
|
RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(buff))); |
|
|
|
#if defined(_WIN32) || defined(_WIN64) |
|
|
|
if (is_eof) { |
|
|
|
eof_worker_id_ = que_id; |
|
|
|
for (int32_t id = 0; id < num_workers_; id++) { |
|
|
|
if (id != eof_worker_id_) { |
|
|
|
auto eof_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF); |
|
|
|
RETURN_IF_NOT_OK(local_queues_[id]->Add(std::move(eof_buffer))); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
#endif |
|
|
|
que_id = (que_id + 1) % num_workers_; |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -173,14 +159,6 @@ Status MapOp::WorkerEntry(int32_t worker_id) { |
|
|
|
continue; |
|
|
|
} else if (in_buffer->eof()) { |
|
|
|
// Calling base class EofReceived to forward eof buffer. |
|
|
|
#if defined(_WIN32) || defined(_Win64) |
|
|
|
if (perf_mode_) { |
|
|
|
if (eof_worker_id_ == worker_id) { |
|
|
|
RETURN_IF_NOT_OK(EofReceived(worker_id)); |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
#endif |
|
|
|
RETURN_IF_NOT_OK(EofReceived(worker_id)); |
|
|
|
break; |
|
|
|
} |
|
|
|
|