diff --git a/mindspore_serving/ccsrc/common/grpc_server.cc b/mindspore_serving/ccsrc/common/grpc_server.cc index d05b5df..53be5db 100644 --- a/mindspore_serving/ccsrc/common/grpc_server.cc +++ b/mindspore_serving/ccsrc/common/grpc_server.cc @@ -35,6 +35,7 @@ Status GrpcServer::Start(std::shared_ptr service, const std::stri grpc::ServerBuilder serverBuilder; serverBuilder.SetOption(std::move(option)); if (max_msg_mb_size > 0) { + serverBuilder.SetMaxSendMessageSize(static_cast(max_msg_mb_size * (1u << 20))); serverBuilder.SetMaxReceiveMessageSize(static_cast(max_msg_mb_size * (1u << 20))); } serverBuilder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); @@ -66,4 +67,12 @@ void GrpcServer::Stop() { in_running_ = false; } +std::shared_ptr GrpcServer::CreateChannel(const std::string &target_str) { + grpc::ChannelArguments channel_args; + channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, gRpcMaxMBMsgSize * 1024 * 1024); + std::shared_ptr channel = + grpc::CreateCustomChannel(target_str, grpc::InsecureChannelCredentials(), channel_args); + return channel; +} + } // namespace mindspore::serving diff --git a/mindspore_serving/ccsrc/common/grpc_server.h b/mindspore_serving/ccsrc/common/grpc_server.h index 662cc59..82caae0 100644 --- a/mindspore_serving/ccsrc/common/grpc_server.h +++ b/mindspore_serving/ccsrc/common/grpc_server.h @@ -39,6 +39,7 @@ class MS_API GrpcServer { Status Start(std::shared_ptr service, const std::string &ip, uint32_t grpc_port, int max_msg_size, const std::string &server_tag); void Stop(); + static std::shared_ptr CreateChannel(const std::string &target_str); private: std::unique_ptr server_; diff --git a/mindspore_serving/ccsrc/master/dispacther.cc b/mindspore_serving/ccsrc/master/dispacther.cc index bb536d8..5c3975c 100644 --- a/mindspore_serving/ccsrc/master/dispacther.cc +++ b/mindspore_serving/ccsrc/master/dispacther.cc @@ -101,7 +101,7 @@ Status Dispatcher::RegisterServable(const proto::RegisterRequest &request, proto auto target_str = request.address(); auto it = servable_map_.find(worker_spec.servable_name); - std::shared_ptr channel = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()); + std::shared_ptr channel = GrpcServer::CreateChannel(target_str); bool find_registered = false; if (it != servable_map_.end()) { @@ -178,7 +178,8 @@ Status Dispatcher::AddServable(const proto::AddWorkerRequest &request, proto::Ad } DispatcherWorkerContext context; context.worker_spec = worker_spec; - std::shared_ptr channel = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()); + + std::shared_ptr channel = GrpcServer::CreateChannel(target_str); context.stub_ = proto::MSWorker::NewStub(channel); servable_map_[worker_spec.servable_name].push_back(context); return SUCCESS; diff --git a/mindspore_serving/ccsrc/master/server.cc b/mindspore_serving/ccsrc/master/server.cc index b51782c..118edc2 100644 --- a/mindspore_serving/ccsrc/master/server.cc +++ b/mindspore_serving/ccsrc/master/server.cc @@ -45,7 +45,8 @@ Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int ma } Status Server::StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port) { - return grpc_manager_server_.Start(std::make_shared(dispatcher_), ip, grpc_port, -1, "Master"); + return grpc_manager_server_.Start(std::make_shared(dispatcher_), ip, grpc_port, gRpcMaxMBMsgSize, + "Master"); } Status Server::StartRestfulServer(const std::string &ip, uint32_t restful_port, int max_msg_mb_size, diff --git a/mindspore_serving/ccsrc/worker/notfiy_master/grpc_notfiy.cc b/mindspore_serving/ccsrc/worker/notfiy_master/grpc_notfiy.cc index 4e6c60e..b889d04 100644 --- a/mindspore_serving/ccsrc/worker/notfiy_master/grpc_notfiy.cc +++ b/mindspore_serving/ccsrc/worker/notfiy_master/grpc_notfiy.cc @@ -19,6 +19,7 @@ #include #include #include "common/exit_handle.h" +#include "common/grpc_server.h" namespace mindspore { namespace serving { @@ -28,7 +29,7 @@ GrpcNotfiyMaster::GrpcNotfiyMaster(const std::string &master_ip, uint32_t master : master_ip_(master_ip), master_port_(master_port), host_ip_(host_ip), host_port_(host_port) { master_address_ = master_ip_ + ":" + std::to_string(master_port); worker_address_ = host_ip_ + ":" + std::to_string(host_port_); - auto channel = grpc::CreateChannel(master_address_, grpc::InsecureChannelCredentials()); + auto channel = GrpcServer::CreateChannel(master_address_); stub_ = proto::MSMaster::NewStub(channel); } diff --git a/mindspore_serving/ccsrc/worker/worker.cc b/mindspore_serving/ccsrc/worker/worker.cc index cb6143b..6f1e043 100644 --- a/mindspore_serving/ccsrc/worker/worker.cc +++ b/mindspore_serving/ccsrc/worker/worker.cc @@ -172,12 +172,14 @@ std::pair> Worker::RunAsync(const RequestSp if (worker.worker_service == nullptr) { return {INFER_STATUS_LOG_ERROR(FAILED) << "Cannot find servable match " << request_spec.Repr(), nullptr}; } - WorkCallBack on_process_done = [result](const Instance &output, const Status &error_msg) { + std::weak_ptr result_weak = result; // avoid memory leak + WorkCallBack on_process_done = [result_weak](const Instance &output, const Status &error_msg) { auto output_index = output.context.instance_index; - if (output_index < result->result_.size()) { - result->result_[output_index].error_msg = error_msg; + auto result_ptr = result_weak.lock(); + if (result_ptr && output_index < result_ptr->result_.size()) { + result_ptr->result_[output_index].error_msg = error_msg; if (error_msg == SUCCESS) { - result->result_[output_index] = output; + result_ptr->result_[output_index] = output; } } };