From 378fee2cd009aefc44f7a59c80a112ee7f7afe1a Mon Sep 17 00:00:00 2001 From: xuyongfei Date: Tue, 23 Feb 2021 11:23:00 +0800 Subject: [PATCH] Serving gpt3 fix --- README.md | 6 -- README_CN.md | 6 -- .../distributed_servable.cc | 40 ++++++++----- .../worker/distributed_worker/worker_agent.cc | 58 +++++++++---------- mindspore_serving/worker/task.py | 1 + 5 files changed, 56 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 0e8feb8..c5bb6f8 100644 --- a/README.md +++ b/README.md @@ -100,12 +100,6 @@ To run MindSpore Serving, configure the following environment variables: - MindSpore Serving depends on MindSpore. You need to configure [environment variables](https://gitee.com/mindspore/docs/blob/master/install/mindspore_ascend_install_source_en.md#configuring-environment-variables) to run MindSpore. -- MindSpore Serving depends on the MindSpore library file. You need to specify the `lib` path in the build or installation path of the MindSpore software package to LD_LIBRARY_PATH. For the meaning of `$MINDSPORE_LIB_PATH`, see [MindSpore-based Inference Service Deployment](https://gitee.com/mindspore/serving/blob/master/README.md#installation). - - ```shell - export LD_LIBRARY_PATH=$MINDSPORE_LIB_PATH:${LD_LIBRARY_PATH} - ``` - ## Quick Start [MindSpore-based Inference Service Deployment](https://www.mindspore.cn/tutorial/inference/en/master/serving_example.html) is used to demonstrate how to use MindSpore Serving. diff --git a/README_CN.md b/README_CN.md index 00dde67..a54dbb4 100644 --- a/README_CN.md +++ b/README_CN.md @@ -100,12 +100,6 @@ MindSpore Serving运行需要配置以下环境变量: - MindSpore Serving依赖MindSpore正确运行,运行MindSpore需要完成[环境变量配置](https://gitee.com/mindspore/docs/blob/master/install/mindspore_ascend_install_pip.md#%E9%85%8D%E7%BD%AE%E7%8E%AF%E5%A2%83%E5%8F%98%E9%87%8F)。 -- MindSpore Serving依赖MindSpore库文件,需指定MindSpore软件包的编译或安装路径下的`lib`路径到LD_LIBRARY_PATH。其中,`$MINDSPORE_LIB_PATH`含义参考[安装Serving](https://gitee.com/mindspore/serving#%E5%AE%89%E8%A3%85serving)。 - - ```shell - export LD_LIBRARY_PATH=$MINDSPORE_LIB_PATH:${LD_LIBRARY_PATH} - ``` - ## 快速入门 以一个简单的[Add网络示例](https://www.mindspore.cn/tutorial/inference/zh-CN/master/serving_example.html),演示MindSpore Serving如何使用。 diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc index 2fbc3f5..47244d0 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc +++ b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc @@ -56,7 +56,6 @@ Status DistributedServable::PredictInner(const std::vector &input if (!model_loaded_) { MSI_LOG_EXCEPTION << "Model has not been loaded"; } - MSI_EXCEPTION_IF_NULL(output); proto::DistributedPredictRequest request; proto::DistributedPredictRequest empty_request; @@ -69,6 +68,9 @@ Status DistributedServable::PredictInner(const std::vector &input auto rank_size = config_.distributed_meta.rank_size; auto stage_size = config_.distributed_meta.stage_size; + if (rank_size != agent_spec_map_.size()) { + MSI_LOG_EXCEPTION << "agent_spec_map_ size " << agent_spec_map_.size() << " not match rank size " << rank_size; + } auto agent_num_per_stage = rank_size / stage_size; auto result_agent_id = agent_num_per_stage * (stage_size - 1); @@ -86,25 +88,34 @@ Status DistributedServable::PredictInner(const std::vector &input } } - for (size_t i = 0; i < msg_list->size(); ++i) { - while (true) { - auto state = msg_list->at(i).future.wait_for(std::chrono::milliseconds(1000)); - if (state == std::future_status::timeout) { - if (ExitSignalHandle::Instance().HasStopped()) { - return INFER_STATUS_LOG_ERROR(FAILED) << "WorkerAgent(rank_id is " << i << " ) has stopped"; - } - continue; - } else if (state == std::future_status::ready) { - auto status = msg_list->at(i).status; - if (status != SUCCESS) { - return INFER_STATUS_LOG_ERROR(FAILED) << "WorkerAgent(rank_id is " << i << " ) infers failed"; - } + for (size_t rank_id = 0; rank_id < msg_list->size(); ++rank_id) { + auto &future = msg_list->at(rank_id).future; + const uint64_t kWaitMaxHundredMs = 10 * 10; // waiting for 10s + uint64_t k; + for (k = 0; k < kWaitMaxHundredMs; k++) { + if (ExitSignalHandle::Instance().HasStopped()) { + return INFER_STATUS_LOG_ERROR(FAILED) << "Worker has stopped"; + } + // waiting for 100ms + if (future.wait_for(std::chrono::milliseconds(100)) == std::future_status::ready) { break; } } + if (k >= kWaitMaxHundredMs) { + return INFER_STATUS_LOG_ERROR(FAILED) << "Failed to wait for result of rank " << rank_id; + } + auto status = msg_list->at(rank_id).status; + if (status != SUCCESS) { + return INFER_STATUS_LOG_ERROR(FAILED) + << "Error happened on get result of rank " << rank_id << ": " << status.StatusMessage(); + } } auto &reply = msg_list->at(result_agent_id).reply; + if (reply.has_error_msg() && reply.error_msg().error_code() != 0) { + return INFER_STATUS_LOG_ERROR(FAILED) + << "Error happened on get result of rank " << result_agent_id << ": " << reply.error_msg().error_msg(); + } for (int i = 0; i < reply.outputs_size(); ++i) { auto p = std::make_shared(reply.mutable_outputs(i)); auto tensor_ptr = std::make_shared(p->data_type(), p->shape(), p->data(), p->data_size()); @@ -112,6 +123,7 @@ Status DistributedServable::PredictInner(const std::vector &input } return SUCCESS; } + std::vector DistributedServable::GetInputInfos() const { if (!model_loaded_) { MSI_LOG_EXCEPTION << "Model has not been loaded"; diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/worker_agent.cc b/mindspore_serving/ccsrc/worker/distributed_worker/worker_agent.cc index a246032..26465e8 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/worker_agent.cc +++ b/mindspore_serving/ccsrc/worker/distributed_worker/worker_agent.cc @@ -45,35 +45,6 @@ Status WorkerAgent::Clear() { return SUCCESS; } -Status WorkerAgent::Run(const proto::DistributedPredictRequest &request, proto::DistributedPredictReply *reply) { - if (session_ == nullptr) { - return INFER_STATUS_LOG_ERROR(FAILED) << "Model is not loaded"; - } - // todo : DistributedPredictRequest->RequestBase - // todo : DistributedPredictReply->ReplyBase - Status status; - try { - MSI_TIME_STAMP_START(ExecuteModel) - // status = session_.ExecuteModel(request_wrap, &reply_wrap); - MSI_TIME_STAMP_END(ExecuteModel) - } catch (const std::bad_alloc &ex) { - status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: malloc memory failed"; - } catch (const std::runtime_error &ex) { - status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: runtime error occurred: " << ex.what(); - } catch (const std::exception &ex) { - status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred: " << ex.what(); - } catch (...) { - status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred"; - } - if (status != SUCCESS) { - reply->Clear(); - auto error_msg = reply->mutable_error_msg(); - error_msg->set_error_code(status.StatusCode()); - error_msg->set_error_msg(status.StatusMessage()); - } - return status; -} - Status WorkerAgent::StartAgent(const AgentStartUpConfig &config) { session_ = InferenceLoader::Instance().CreateMindSporeInfer(); if (session_ == nullptr) { @@ -188,5 +159,34 @@ class ProtoDistributedPredictReply : public ReplyBase { std::vector tensor_list_; }; +Status WorkerAgent::Run(const proto::DistributedPredictRequest &request, proto::DistributedPredictReply *reply) { + if (session_ == nullptr) { + return INFER_STATUS_LOG_ERROR(FAILED) << "Model is not loaded"; + } + Status status; + try { + MSI_TIME_STAMP_START(ExecuteModel) + ProtoDistributedPredictRequest request_wrap(request); + ProtoDistributedPredictReply reply_wrap(reply); + status = session_->ExecuteModel(request_wrap, &reply_wrap); + MSI_TIME_STAMP_END(ExecuteModel) + } catch (const std::bad_alloc &ex) { + status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: malloc memory failed"; + } catch (const std::runtime_error &ex) { + status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: runtime error occurred: " << ex.what(); + } catch (const std::exception &ex) { + status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred: " << ex.what(); + } catch (...) { + status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred"; + } + if (status != SUCCESS) { + reply->Clear(); + auto error_msg = reply->mutable_error_msg(); + error_msg->set_error_code(status.StatusCode()); + error_msg->set_error_msg(status.StatusMessage()); + } + return status; +} + } // namespace serving } // namespace mindspore diff --git a/mindspore_serving/worker/task.py b/mindspore_serving/worker/task.py index 8ab5b84..99b78bf 100644 --- a/mindspore_serving/worker/task.py +++ b/mindspore_serving/worker/task.py @@ -145,6 +145,7 @@ class PyTask: logger.warning(f"{self.task_name} get result catch exception: {e}") logging.exception(e) self.push_failed(1) # push success results and a failed result + yield self.index # end current coroutine, switch to next coroutine result = self._handle_task(instance_list[self.index:]) def _handle_task(self, instance_list):