|
|
|
@@ -403,7 +403,7 @@ Status CacheServer::BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuil |
|
|
|
auto p = flatbuffers::GetRoot<BatchDataLocatorMsg>(fbb->GetBufferPointer()); |
|
|
|
const auto num_elements = p->rows()->size(); |
|
|
|
auto connection_id = p->connection_id(); |
|
|
|
BatchWait batch_wait = BatchWait(num_elements); |
|
|
|
auto batch_wait = std::make_unique<BatchWait>(num_elements); |
|
|
|
int64_t data_offset = (num_elements + 1) * sizeof(int64_t); |
|
|
|
auto *offset_array = reinterpret_cast<int64_t *>(out->GetMutablePointer()); |
|
|
|
offset_array[0] = data_offset; |
|
|
|
@@ -439,17 +439,17 @@ Status CacheServer::BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuil |
|
|
|
auto offset = bld.Finish(); |
|
|
|
fb2.Finish(offset); |
|
|
|
cache_rq->rq_.add_buf_data(fb2.GetBufferPointer(), fb2.GetSize()); |
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(&batch_wait))); |
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(batch_wait.get()))); |
|
|
|
RETURN_IF_NOT_OK(PushRequest(worker_id, cache_rq)); |
|
|
|
} else { |
|
|
|
// Nothing to fetch but we still need to post something back into the wait area. |
|
|
|
RETURN_IF_NOT_OK(batch_wait.Set(Status::OK())); |
|
|
|
RETURN_IF_NOT_OK(batch_wait->Set(Status::OK())); |
|
|
|
} |
|
|
|
} |
|
|
|
// Now wait for all of them to come back. |
|
|
|
RETURN_IF_NOT_OK(batch_wait.Wait()); |
|
|
|
RETURN_IF_NOT_OK(batch_wait->Wait()); |
|
|
|
// Return the result |
|
|
|
return batch_wait.GetRc(); |
|
|
|
return batch_wait->GetRc(); |
|
|
|
} |
|
|
|
|
|
|
|
Status CacheServer::BatchFetchRows(CacheRequest *rq, CacheReply *reply) { |
|
|
|
@@ -733,7 +733,7 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { |
|
|
|
offset_addr = strtoll(rq->buf_data(1).data(), nullptr, 10); |
|
|
|
auto p = reinterpret_cast<char *>(reinterpret_cast<int64_t>(base) + offset_addr); |
|
|
|
num_elem = strtol(rq->buf_data(2).data(), nullptr, 10); |
|
|
|
BatchWait batch_wait = BatchWait(num_elem); |
|
|
|
auto batch_wait = std::make_unique<BatchWait>(num_elem); |
|
|
|
// Get a set of free request and push into the queues. |
|
|
|
for (auto i = 0; i < num_elem; ++i) { |
|
|
|
auto start = reinterpret_cast<int64_t>(p); |
|
|
|
@@ -754,13 +754,13 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { |
|
|
|
cache_rq->rq_.add_buf_data(cookie); |
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(start - reinterpret_cast<int64_t>(base))); |
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(p - start))); |
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(&batch_wait))); |
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(batch_wait.get()))); |
|
|
|
RETURN_IF_NOT_OK(PushRequest(GetRandomWorker(), cache_rq)); |
|
|
|
} |
|
|
|
// Now wait for all of them to come back. |
|
|
|
RETURN_IF_NOT_OK(batch_wait.Wait()); |
|
|
|
RETURN_IF_NOT_OK(batch_wait->Wait()); |
|
|
|
// Return the result |
|
|
|
return batch_wait.GetRc(); |
|
|
|
return batch_wait->GetRc(); |
|
|
|
} catch (const std::exception &e) { |
|
|
|
RETURN_STATUS_UNEXPECTED(e.what()); |
|
|
|
} |
|
|
|
|