Browse Source

!69 fix memory leak, fix worker and master grpc max msg size

From: @xu-yfei
Reviewed-by: @zhangyinxia,@zhoufeng54
Signed-off-by:
tags/v1.1.0
mindspore-ci-bot Gitee 5 years ago
parent
commit
6704bdd8bc
6 changed files with 23 additions and 8 deletions
  1. +9
    -0
      mindspore_serving/ccsrc/common/grpc_server.cc
  2. +1
    -0
      mindspore_serving/ccsrc/common/grpc_server.h
  3. +3
    -2
      mindspore_serving/ccsrc/master/dispacther.cc
  4. +2
    -1
      mindspore_serving/ccsrc/master/server.cc
  5. +2
    -1
      mindspore_serving/ccsrc/worker/notfiy_master/grpc_notfiy.cc
  6. +6
    -4
      mindspore_serving/ccsrc/worker/worker.cc

+ 9
- 0
mindspore_serving/ccsrc/common/grpc_server.cc View File

@@ -35,6 +35,7 @@ Status GrpcServer::Start(std::shared_ptr<grpc::Service> service, const std::stri
grpc::ServerBuilder serverBuilder;
serverBuilder.SetOption(std::move(option));
if (max_msg_mb_size > 0) {
serverBuilder.SetMaxSendMessageSize(static_cast<int>(max_msg_mb_size * (1u << 20)));
serverBuilder.SetMaxReceiveMessageSize(static_cast<int>(max_msg_mb_size * (1u << 20)));
}
serverBuilder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
@@ -66,4 +67,12 @@ void GrpcServer::Stop() {
in_running_ = false;
}

std::shared_ptr<grpc::Channel> GrpcServer::CreateChannel(const std::string &target_str) {
grpc::ChannelArguments channel_args;
channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, gRpcMaxMBMsgSize * 1024 * 1024);
std::shared_ptr<grpc::Channel> channel =
grpc::CreateCustomChannel(target_str, grpc::InsecureChannelCredentials(), channel_args);
return channel;
}

} // namespace mindspore::serving

+ 1
- 0
mindspore_serving/ccsrc/common/grpc_server.h View File

@@ -39,6 +39,7 @@ class MS_API GrpcServer {
Status Start(std::shared_ptr<grpc::Service> service, const std::string &ip, uint32_t grpc_port, int max_msg_size,
const std::string &server_tag);
void Stop();
static std::shared_ptr<grpc::Channel> CreateChannel(const std::string &target_str);

private:
std::unique_ptr<grpc::Server> server_;


+ 3
- 2
mindspore_serving/ccsrc/master/dispacther.cc View File

@@ -101,7 +101,7 @@ Status Dispatcher::RegisterServable(const proto::RegisterRequest &request, proto
auto target_str = request.address();
auto it = servable_map_.find(worker_spec.servable_name);

std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials());
std::shared_ptr<grpc::Channel> channel = GrpcServer::CreateChannel(target_str);

bool find_registered = false;
if (it != servable_map_.end()) {
@@ -178,7 +178,8 @@ Status Dispatcher::AddServable(const proto::AddWorkerRequest &request, proto::Ad
}
DispatcherWorkerContext context;
context.worker_spec = worker_spec;
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials());

std::shared_ptr<grpc::Channel> channel = GrpcServer::CreateChannel(target_str);
context.stub_ = proto::MSWorker::NewStub(channel);
servable_map_[worker_spec.servable_name].push_back(context);
return SUCCESS;


+ 2
- 1
mindspore_serving/ccsrc/master/server.cc View File

@@ -45,7 +45,8 @@ Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int ma
}

Status Server::StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port) {
return grpc_manager_server_.Start(std::make_shared<MSMasterImpl>(dispatcher_), ip, grpc_port, -1, "Master");
return grpc_manager_server_.Start(std::make_shared<MSMasterImpl>(dispatcher_), ip, grpc_port, gRpcMaxMBMsgSize,
"Master");
}

Status Server::StartRestfulServer(const std::string &ip, uint32_t restful_port, int max_msg_mb_size,


+ 2
- 1
mindspore_serving/ccsrc/worker/notfiy_master/grpc_notfiy.cc View File

@@ -19,6 +19,7 @@
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <thread>
#include "common/exit_handle.h"
#include "common/grpc_server.h"

namespace mindspore {
namespace serving {
@@ -28,7 +29,7 @@ GrpcNotfiyMaster::GrpcNotfiyMaster(const std::string &master_ip, uint32_t master
: master_ip_(master_ip), master_port_(master_port), host_ip_(host_ip), host_port_(host_port) {
master_address_ = master_ip_ + ":" + std::to_string(master_port);
worker_address_ = host_ip_ + ":" + std::to_string(host_port_);
auto channel = grpc::CreateChannel(master_address_, grpc::InsecureChannelCredentials());
auto channel = GrpcServer::CreateChannel(master_address_);
stub_ = proto::MSMaster::NewStub(channel);
}



+ 6
- 4
mindspore_serving/ccsrc/worker/worker.cc View File

@@ -172,12 +172,14 @@ std::pair<Status, std::shared_ptr<AsyncResult>> Worker::RunAsync(const RequestSp
if (worker.worker_service == nullptr) {
return {INFER_STATUS_LOG_ERROR(FAILED) << "Cannot find servable match " << request_spec.Repr(), nullptr};
}
WorkCallBack on_process_done = [result](const Instance &output, const Status &error_msg) {
std::weak_ptr<AsyncResult> result_weak = result; // avoid memory leak
WorkCallBack on_process_done = [result_weak](const Instance &output, const Status &error_msg) {
auto output_index = output.context.instance_index;
if (output_index < result->result_.size()) {
result->result_[output_index].error_msg = error_msg;
auto result_ptr = result_weak.lock();
if (result_ptr && output_index < result_ptr->result_.size()) {
result_ptr->result_[output_index].error_msg = error_msg;
if (error_msg == SUCCESS) {
result->result_[output_index] = output;
result_ptr->result_[output_index] = output;
}
}
};


Loading…
Cancel
Save