From 2a8bf9f00b32f8070fdd380717a718a899233e0c Mon Sep 17 00:00:00 2001
From: lichen_101010
Date: Tue, 11 Aug 2020 20:16:31 -0400
Subject: [PATCH] addressed sendgraph issue
change buffer type and chunk size
addressed sendtensor issue for large tensors
fix sendtensor part 2
fix bugs for sendtensor issue
fix CI errors
removes a print line
address comments
---
.../ccsrc/debug/debugger/debug_grpc.proto | 6 +-
mindspore/ccsrc/debug/debugger/debugger.cc | 57 ++++++++++++-------
mindspore/ccsrc/debug/debugger/debugger.h | 1 +
mindspore/ccsrc/debug/debugger/grpc_client.cc | 39 ++++++++++++-
4 files changed, 81 insertions(+), 22 deletions(-)
diff --git a/mindspore/ccsrc/debug/debugger/debug_grpc.proto b/mindspore/ccsrc/debug/debugger/debug_grpc.proto
index 27c93787b8..88ca35ff1d 100644
--- a/mindspore/ccsrc/debug/debugger/debug_grpc.proto
+++ b/mindspore/ccsrc/debug/debugger/debug_grpc.proto
@@ -23,7 +23,7 @@ import "debug_graph.proto";
service EventListener {
rpc WaitCMD (Metadata) returns (EventReply) {};
rpc SendMetadata (Metadata) returns (EventReply) {};
- rpc SendGraph (GraphProto) returns (EventReply) {};
+ rpc SendGraph (stream Chunk) returns (EventReply) {};
rpc SendTensors (stream TensorProto) returns (EventReply) {};
rpc SendWatchpointHits (stream WatchpointHit) returns (EventReply) {};
}
@@ -37,6 +37,10 @@ message Metadata {
string cur_node = 4;
}
+message Chunk {
+ bytes buffer = 1;
+}
+
message EventReply {
enum Status {
OK = 0;
diff --git a/mindspore/ccsrc/debug/debugger/debugger.cc b/mindspore/ccsrc/debug/debugger/debugger.cc
index 18e7d54374..522b0d7f64 100644
--- a/mindspore/ccsrc/debug/debugger/debugger.cc
+++ b/mindspore/ccsrc/debug/debugger/debugger.cc
@@ -33,6 +33,8 @@ using debugger::WatchCondition_Condition_nan;
using debugger::WatchNode;
using debugger::WatchpointHit;
+#define CHUNK_SIZE 1024 * 1024 * 3
+
namespace mindspore {
DebuggerPtr Debugger::debugger_ = nullptr;
@@ -412,6 +414,16 @@ void Debugger::CommandLoop() {
}
}
+void AddTensorProtoInfo(TensorProto *tensor_item, TensorProto tensor) {
+ tensor_item->set_node_name(tensor.node_name());
+ tensor_item->set_slot(tensor.slot());
+ tensor_item->set_iter(tensor.iter());
+ tensor_item->set_truncate(tensor.truncate());
+ tensor_item->clear_tensor_content();
+ tensor_item->clear_data_type();
+ tensor_item->clear_dims();
+}
+
void Debugger::SetWatchpoint(const ProtoVector &nodes, const WatchCondition &condition, const int32_t id) {
std::vector> check_node_list;
std::transform(nodes.begin(), nodes.end(), std::back_inserter(check_node_list),
@@ -436,35 +448,40 @@ std::list Debugger::LoadTensors(const ProtoVector &ten
// ret_name will contain tensor names that are found in TensorLoader
// items in ret_name will be in the same order with tensors if found
debug_services_->ReadNodesTensors(name, &ret_name, &data_ptr, &data_size, &dtype, &shape);
-
std::list tensor_list;
unsigned int result_index = 0;
+
for (auto tensor : tensors) {
- TensorProto tensor_item;
- tensor_item.set_node_name(tensor.node_name());
- tensor_item.set_slot(tensor.slot());
- tensor_item.set_iter(tensor.iter());
- tensor_item.set_truncate(tensor.truncate());
- tensor_item.clear_tensor_content();
- tensor_item.clear_data_type();
- tensor_item.clear_dims();
- // always set finished to true before big tensor splitting is supported
- tensor_item.set_finished(true);
-
- // return empty tensor if didn't find the requested tensor
+ int size_iter = 0;
if (result_index >= ret_name.size() || ret_name[result_index] != GetTensorFullName(tensor)) {
+ TensorProto tensor_item;
+ tensor_item.set_finished(true);
+ AddTensorProtoInfo(&tensor_item, tensor);
tensor_list.push_back(tensor_item);
continue;
}
+ int tensor_size = data_size[result_index];
+ while (size_iter < tensor_size) {
+ int chunk_size = CHUNK_SIZE;
+ TensorProto tensor_item;
+ tensor_item.set_finished(false);
+ if (tensor_size - size_iter <= CHUNK_SIZE) {
+ chunk_size = tensor_size - size_iter;
+ tensor_item.set_finished(true);
+ }
+ AddTensorProtoInfo(&tensor_item, tensor);
+ // return empty tensor if didn't find the requested tensor
- tensor_item.set_tensor_content(data_ptr[result_index], data_size[result_index]);
- tensor_item.set_data_type(GetDebuggerNumberDataType(dtype[result_index]));
- for (auto &elem : shape[result_index]) {
- tensor_item.add_dims(elem);
- }
+ tensor_item.set_tensor_content(data_ptr[result_index] + size_iter, chunk_size);
- // add tensor to result list and increment result_index to check next item in ret_name
- tensor_list.push_back(tensor_item);
+ tensor_item.set_data_type(GetDebuggerNumberDataType(dtype[result_index]));
+ for (auto &elem : shape[result_index]) {
+ tensor_item.add_dims(elem);
+ }
+ // add tensor to result list and increment result_index to check next item in ret_name
+ tensor_list.push_back(tensor_item);
+ size_iter += CHUNK_SIZE;
+ }
result_index++;
}
return tensor_list;
diff --git a/mindspore/ccsrc/debug/debugger/debugger.h b/mindspore/ccsrc/debug/debugger/debugger.h
index ea035708ea..0df67620ae 100644
--- a/mindspore/ccsrc/debug/debugger/debugger.h
+++ b/mindspore/ccsrc/debug/debugger/debugger.h
@@ -23,6 +23,7 @@
#include "debug/debugger/grpc_client.h"
#include "debug/debug_services.h"
+using debugger::Chunk;
using debugger::DataType;
using debugger::EventReply;
using debugger::GraphProto;
diff --git a/mindspore/ccsrc/debug/debugger/grpc_client.cc b/mindspore/ccsrc/debug/debugger/grpc_client.cc
index 7709f4c0d1..8677e9051f 100644
--- a/mindspore/ccsrc/debug/debugger/grpc_client.cc
+++ b/mindspore/ccsrc/debug/debugger/grpc_client.cc
@@ -15,9 +15,11 @@
*/
#include
+#include
#include "debug/debugger/grpc_client.h"
#include "utils/log_adapter.h"
+using debugger::Chunk;
using debugger::EventListener;
using debugger::EventReply;
using debugger::EventReply_Status_FAILED;
@@ -26,6 +28,8 @@ using debugger::Metadata;
using debugger::TensorProto;
using debugger::WatchpointHit;
+#define CHUNK_SIZE 1024 * 1024 * 3
+
namespace mindspore {
GrpcClient::GrpcClient(const std::string &host, const std::string &port) : stub_(nullptr) { Init(host, port); }
@@ -65,10 +69,43 @@ EventReply GrpcClient::SendMetadata(const Metadata &metadata) {
return reply;
}
+std::vector ChunkString(std::string str, int graph_size) {
+ std::vector buf;
+ int size_iter = 0;
+ while (size_iter < graph_size) {
+ int chunk_size = CHUNK_SIZE;
+ if (graph_size - size_iter < CHUNK_SIZE) {
+ chunk_size = graph_size - size_iter;
+ }
+ std::string buffer;
+ buffer.resize(chunk_size);
+ memcpy(reinterpret_cast(buffer.data()), str.data() + size_iter, chunk_size);
+ buf.push_back(buffer);
+ size_iter += CHUNK_SIZE;
+ }
+ return buf;
+}
+
EventReply GrpcClient::SendGraph(const GraphProto &graph) {
EventReply reply;
grpc::ClientContext context;
- grpc::Status status = stub_->SendGraph(&context, graph, &reply);
+ Chunk chunk;
+
+ std::unique_ptr > writer(stub_->SendGraph(&context, &reply));
+ std::string str = graph.SerializeAsString();
+ int graph_size = graph.ByteSize();
+ auto buf = ChunkString(str, graph_size);
+
+ for (unsigned int i = 0; i < buf.size(); i++) {
+ MS_LOG(INFO) << "RPC:sending the " << i << "chunk in graph";
+ chunk.set_buffer(buf[i]);
+ if (!writer->Write(chunk)) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ writer->WritesDone();
+ grpc::Status status = writer->Finish();
if (!status.ok()) {
MS_LOG(ERROR) << "RPC failed: SendGraph";