|
|
@@ -362,7 +362,7 @@ Status CacheServer::FastCacheRow(CacheRequest *rq, CacheReply *reply) { |
|
|
CacheService *cs = GetService(connection_id); |
|
|
CacheService *cs = GetService(connection_id); |
|
|
auto *base = SharedMemoryBaseAddr(); |
|
|
auto *base = SharedMemoryBaseAddr(); |
|
|
// Ensure we got 3 pieces of data coming in |
|
|
// Ensure we got 3 pieces of data coming in |
|
|
CHECK_FAIL_RETURN_UNEXPECTED(rq->buf_data_size() == 3, "Incomplete data"); |
|
|
|
|
|
|
|
|
CHECK_FAIL_RETURN_UNEXPECTED(rq->buf_data_size() >= 3, "Incomplete data"); |
|
|
// First piece of data is the cookie and is required |
|
|
// First piece of data is the cookie and is required |
|
|
auto &cookie = rq->buf_data(0); |
|
|
auto &cookie = rq->buf_data(0); |
|
|
// Second piece of data is the address where we can find the serialized data |
|
|
// Second piece of data is the address where we can find the serialized data |
|
|
@@ -400,11 +400,10 @@ Status CacheServer::BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuil |
|
|
auto rng = GetRandomDevice(); |
|
|
auto rng = GetRandomDevice(); |
|
|
std::uniform_int_distribution<session_id_type> distribution(0, numQ - 1); |
|
|
std::uniform_int_distribution<session_id_type> distribution(0, numQ - 1); |
|
|
int32_t qID = distribution(rng); |
|
|
int32_t qID = distribution(rng); |
|
|
std::vector<CacheServerRequest *> cache_rq_list; |
|
|
|
|
|
auto p = flatbuffers::GetRoot<BatchDataLocatorMsg>(fbb->GetBufferPointer()); |
|
|
auto p = flatbuffers::GetRoot<BatchDataLocatorMsg>(fbb->GetBufferPointer()); |
|
|
const auto num_elements = p->rows()->size(); |
|
|
const auto num_elements = p->rows()->size(); |
|
|
auto connection_id = p->connection_id(); |
|
|
auto connection_id = p->connection_id(); |
|
|
cache_rq_list.reserve(num_elements); |
|
|
|
|
|
|
|
|
BatchWait batch_wait = BatchWait(num_elements); |
|
|
int64_t data_offset = (num_elements + 1) * sizeof(int64_t); |
|
|
int64_t data_offset = (num_elements + 1) * sizeof(int64_t); |
|
|
auto *offset_array = reinterpret_cast<int64_t *>(out->GetMutablePointer()); |
|
|
auto *offset_array = reinterpret_cast<int64_t *>(out->GetMutablePointer()); |
|
|
offset_array[0] = data_offset; |
|
|
offset_array[0] = data_offset; |
|
|
@@ -425,7 +424,6 @@ Status CacheServer::BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuil |
|
|
worker_id_t worker_id = IsNumaAffinityOn() ? GetWorkerByNumaId(node_id) : GetRandomWorker(); |
|
|
worker_id_t worker_id = IsNumaAffinityOn() ? GetWorkerByNumaId(node_id) : GetRandomWorker(); |
|
|
CacheServerRequest *cache_rq; |
|
|
CacheServerRequest *cache_rq; |
|
|
RETURN_IF_NOT_OK(GetFreeRequestTag(qID++ % numQ, &cache_rq)); |
|
|
RETURN_IF_NOT_OK(GetFreeRequestTag(qID++ % numQ, &cache_rq)); |
|
|
cache_rq_list.push_back(cache_rq); |
|
|
|
|
|
// Set up all the necessarily field. |
|
|
// Set up all the necessarily field. |
|
|
cache_rq->type_ = BaseRequest::RequestType::kInternalFetchRow; |
|
|
cache_rq->type_ = BaseRequest::RequestType::kInternalFetchRow; |
|
|
cache_rq->st_ = CacheServerRequest::STATE::PROCESS; |
|
|
cache_rq->st_ = CacheServerRequest::STATE::PROCESS; |
|
|
@@ -441,19 +439,17 @@ Status CacheServer::BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuil |
|
|
auto offset = bld.Finish(); |
|
|
auto offset = bld.Finish(); |
|
|
fb2.Finish(offset); |
|
|
fb2.Finish(offset); |
|
|
cache_rq->rq_.add_buf_data(fb2.GetBufferPointer(), fb2.GetSize()); |
|
|
cache_rq->rq_.add_buf_data(fb2.GetBufferPointer(), fb2.GetSize()); |
|
|
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(&batch_wait))); |
|
|
RETURN_IF_NOT_OK(PushRequest(worker_id, cache_rq)); |
|
|
RETURN_IF_NOT_OK(PushRequest(worker_id, cache_rq)); |
|
|
|
|
|
} else { |
|
|
|
|
|
// Nothing to fetch but we still need to post something back into the wait area. |
|
|
|
|
|
RETURN_IF_NOT_OK(batch_wait.Set(Status::OK())); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
// Now wait for all of them to come back. |
|
|
// Now wait for all of them to come back. |
|
|
Status rc; |
|
|
|
|
|
for (CacheServerRequest *rq : cache_rq_list) { |
|
|
|
|
|
RETURN_IF_NOT_OK(rq->Wait()); |
|
|
|
|
|
if (rq->rc_.IsError() && !rq->rc_.IsInterrupted() && rc.IsOk()) { |
|
|
|
|
|
rc = rq->rc_; |
|
|
|
|
|
} |
|
|
|
|
|
RETURN_IF_NOT_OK(ReturnRequestTag(rq)); |
|
|
|
|
|
} |
|
|
|
|
|
return rc; |
|
|
|
|
|
|
|
|
RETURN_IF_NOT_OK(batch_wait.Wait()); |
|
|
|
|
|
// Return the result |
|
|
|
|
|
return batch_wait.GetRc(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Status CacheServer::BatchFetchRows(CacheRequest *rq, CacheReply *reply) { |
|
|
Status CacheServer::BatchFetchRows(CacheRequest *rq, CacheReply *reply) { |
|
|
@@ -727,7 +723,6 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { |
|
|
auto rng = GetRandomDevice(); |
|
|
auto rng = GetRandomDevice(); |
|
|
std::uniform_int_distribution<session_id_type> distribution(0, numQ - 1); |
|
|
std::uniform_int_distribution<session_id_type> distribution(0, numQ - 1); |
|
|
int32_t qID = distribution(rng); |
|
|
int32_t qID = distribution(rng); |
|
|
std::vector<CacheServerRequest *> cache_rq_list; |
|
|
|
|
|
try { |
|
|
try { |
|
|
auto &cookie = rq->buf_data(0); |
|
|
auto &cookie = rq->buf_data(0); |
|
|
auto connection_id = rq->connection_id(); |
|
|
auto connection_id = rq->connection_id(); |
|
|
@@ -738,7 +733,7 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { |
|
|
offset_addr = strtoll(rq->buf_data(1).data(), nullptr, 10); |
|
|
offset_addr = strtoll(rq->buf_data(1).data(), nullptr, 10); |
|
|
auto p = reinterpret_cast<char *>(reinterpret_cast<int64_t>(base) + offset_addr); |
|
|
auto p = reinterpret_cast<char *>(reinterpret_cast<int64_t>(base) + offset_addr); |
|
|
num_elem = strtol(rq->buf_data(2).data(), nullptr, 10); |
|
|
num_elem = strtol(rq->buf_data(2).data(), nullptr, 10); |
|
|
cache_rq_list.reserve(num_elem); |
|
|
|
|
|
|
|
|
BatchWait batch_wait = BatchWait(num_elem); |
|
|
// Get a set of free request and push into the queues. |
|
|
// Get a set of free request and push into the queues. |
|
|
for (auto i = 0; i < num_elem; ++i) { |
|
|
for (auto i = 0; i < num_elem; ++i) { |
|
|
auto start = reinterpret_cast<int64_t>(p); |
|
|
auto start = reinterpret_cast<int64_t>(p); |
|
|
@@ -749,7 +744,6 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { |
|
|
} |
|
|
} |
|
|
CacheServerRequest *cache_rq; |
|
|
CacheServerRequest *cache_rq; |
|
|
RETURN_IF_NOT_OK(GetFreeRequestTag(qID++ % numQ, &cache_rq)); |
|
|
RETURN_IF_NOT_OK(GetFreeRequestTag(qID++ % numQ, &cache_rq)); |
|
|
cache_rq_list.push_back(cache_rq); |
|
|
|
|
|
// Fill in details. |
|
|
// Fill in details. |
|
|
cache_rq->type_ = BaseRequest::RequestType::kInternalCacheRow; |
|
|
cache_rq->type_ = BaseRequest::RequestType::kInternalCacheRow; |
|
|
cache_rq->st_ = CacheServerRequest::STATE::PROCESS; |
|
|
cache_rq->st_ = CacheServerRequest::STATE::PROCESS; |
|
|
@@ -760,25 +754,20 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { |
|
|
cache_rq->rq_.add_buf_data(cookie); |
|
|
cache_rq->rq_.add_buf_data(cookie); |
|
|
cache_rq->rq_.add_buf_data(std::to_string(start - reinterpret_cast<int64_t>(base))); |
|
|
cache_rq->rq_.add_buf_data(std::to_string(start - reinterpret_cast<int64_t>(base))); |
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(p - start))); |
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(p - start))); |
|
|
|
|
|
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(&batch_wait))); |
|
|
RETURN_IF_NOT_OK(PushRequest(GetRandomWorker(), cache_rq)); |
|
|
RETURN_IF_NOT_OK(PushRequest(GetRandomWorker(), cache_rq)); |
|
|
} |
|
|
} |
|
|
// Now wait for all of them to come back. |
|
|
// Now wait for all of them to come back. |
|
|
Status rc; |
|
|
|
|
|
for (CacheServerRequest *cache_rq : cache_rq_list) { |
|
|
|
|
|
RETURN_IF_NOT_OK(cache_rq->Wait()); |
|
|
|
|
|
if (cache_rq->rc_.IsError() && !cache_rq->rc_.IsInterrupted() && rc.IsOk()) { |
|
|
|
|
|
rc = cache_rq->rc_; |
|
|
|
|
|
} |
|
|
|
|
|
RETURN_IF_NOT_OK(ReturnRequestTag(cache_rq)); |
|
|
|
|
|
} |
|
|
|
|
|
return rc; |
|
|
|
|
|
|
|
|
RETURN_IF_NOT_OK(batch_wait.Wait()); |
|
|
|
|
|
// Return the result |
|
|
|
|
|
return batch_wait.GetRc(); |
|
|
} catch (const std::exception &e) { |
|
|
} catch (const std::exception &e) { |
|
|
RETURN_STATUS_UNEXPECTED(e.what()); |
|
|
RETURN_STATUS_UNEXPECTED(e.what()); |
|
|
} |
|
|
} |
|
|
return Status::OK(); |
|
|
return Status::OK(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void CacheServer::ProcessRequest(CacheServerRequest *cache_req) { |
|
|
|
|
|
|
|
|
Status CacheServer::ProcessRequest(CacheServerRequest *cache_req) { |
|
|
bool internal_request = false; |
|
|
bool internal_request = false; |
|
|
auto &rq = cache_req->rq_; |
|
|
auto &rq = cache_req->rq_; |
|
|
auto &reply = cache_req->reply_; |
|
|
auto &reply = cache_req->reply_; |
|
|
@@ -792,6 +781,17 @@ void CacheServer::ProcessRequest(CacheServerRequest *cache_req) { |
|
|
if (BitTest(flag, kDataIsInSharedMemory)) { |
|
|
if (BitTest(flag, kDataIsInSharedMemory)) { |
|
|
cache_req->rc_ = FastCacheRow(&rq, &reply); |
|
|
cache_req->rc_ = FastCacheRow(&rq, &reply); |
|
|
internal_request = (cache_req->type_ == BaseRequest::RequestType::kInternalCacheRow); |
|
|
internal_request = (cache_req->type_ == BaseRequest::RequestType::kInternalCacheRow); |
|
|
|
|
|
if (internal_request) { |
|
|
|
|
|
// This is an internal request and is not tied to rpc. But need to post because there |
|
|
|
|
|
// is a thread waiting on the completion of this request. |
|
|
|
|
|
try { |
|
|
|
|
|
int64_t addr = strtol(rq.buf_data(3).data(), nullptr, 10); |
|
|
|
|
|
auto *bw = reinterpret_cast<BatchWait *>(addr); |
|
|
|
|
|
RETURN_IF_NOT_OK(bw->Set(std::move(cache_req->rc_))); |
|
|
|
|
|
} catch (const std::exception &e) { |
|
|
|
|
|
RETURN_STATUS_UNEXPECTED(e.what()); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
cache_req->rc_ = CacheRow(&rq, &reply); |
|
|
cache_req->rc_ = CacheRow(&rq, &reply); |
|
|
} |
|
|
} |
|
|
@@ -815,6 +815,15 @@ void CacheServer::ProcessRequest(CacheServerRequest *cache_req) { |
|
|
cache_req->rc_ = Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, errMsg); |
|
|
cache_req->rc_ = Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, errMsg); |
|
|
} else { |
|
|
} else { |
|
|
cache_req->rc_ = cs->InternalFetchRow(flatbuffers::GetRoot<FetchRowMsg>(rq.buf_data(0).data())); |
|
|
cache_req->rc_ = cs->InternalFetchRow(flatbuffers::GetRoot<FetchRowMsg>(rq.buf_data(0).data())); |
|
|
|
|
|
// This is an internal request and is not tied to rpc. But need to post because there |
|
|
|
|
|
// is a thread waiting on the completion of this request. |
|
|
|
|
|
try { |
|
|
|
|
|
int64_t addr = strtol(rq.buf_data(1).data(), nullptr, 10); |
|
|
|
|
|
auto *bw = reinterpret_cast<BatchWait *>(addr); |
|
|
|
|
|
RETURN_IF_NOT_OK(bw->Set(std::move(cache_req->rc_))); |
|
|
|
|
|
} catch (const std::exception &e) { |
|
|
|
|
|
RETURN_STATUS_UNEXPECTED(e.what()); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
break; |
|
|
break; |
|
|
} |
|
|
} |
|
|
@@ -912,10 +921,10 @@ void CacheServer::ProcessRequest(CacheServerRequest *cache_req) { |
|
|
if (!internal_request) { |
|
|
if (!internal_request) { |
|
|
cache_req->responder_.Finish(reply, grpc::Status::OK, cache_req); |
|
|
cache_req->responder_.Finish(reply, grpc::Status::OK, cache_req); |
|
|
} else { |
|
|
} else { |
|
|
// This is an internal request and is not tied to rpc. But need to post because there |
|
|
|
|
|
// is a thread waiting on the completion of this request. |
|
|
|
|
|
cache_req->wp_.Set(); |
|
|
|
|
|
|
|
|
// We can free up the request now. |
|
|
|
|
|
RETURN_IF_NOT_OK(ReturnRequestTag(cache_req)); |
|
|
} |
|
|
} |
|
|
|
|
|
return Status::OK(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/// \brief This is the main loop the cache server thread(s) are running. |
|
|
/// \brief This is the main loop the cache server thread(s) are running. |
|
|
@@ -929,7 +938,7 @@ Status CacheServer::ServerRequest(worker_id_t worker_id) { |
|
|
while (!global_shutdown_) { |
|
|
while (!global_shutdown_) { |
|
|
CacheServerRequest *cache_req = nullptr; |
|
|
CacheServerRequest *cache_req = nullptr; |
|
|
RETURN_IF_NOT_OK(my_que->PopFront(&cache_req)); |
|
|
RETURN_IF_NOT_OK(my_que->PopFront(&cache_req)); |
|
|
ProcessRequest(cache_req); |
|
|
|
|
|
|
|
|
RETURN_IF_NOT_OK(ProcessRequest(cache_req)); |
|
|
} |
|
|
} |
|
|
return Status::OK(); |
|
|
return Status::OK(); |
|
|
} |
|
|
} |
|
|
|