| @@ -100,12 +100,6 @@ To run MindSpore Serving, configure the following environment variables: | |||
| - MindSpore Serving depends on MindSpore. You need to configure [environment variables](https://gitee.com/mindspore/docs/blob/master/install/mindspore_ascend_install_source_en.md#configuring-environment-variables) to run MindSpore. | |||
| - MindSpore Serving depends on the MindSpore library file. You need to specify the `lib` path in the build or installation path of the MindSpore software package to LD_LIBRARY_PATH. For the meaning of `$MINDSPORE_LIB_PATH`, see [MindSpore-based Inference Service Deployment](https://gitee.com/mindspore/serving/blob/master/README.md#installation). | |||
| ```shell | |||
| export LD_LIBRARY_PATH=$MINDSPORE_LIB_PATH:${LD_LIBRARY_PATH} | |||
| ``` | |||
| ## Quick Start | |||
| [MindSpore-based Inference Service Deployment](https://www.mindspore.cn/tutorial/inference/en/master/serving_example.html) is used to demonstrate how to use MindSpore Serving. | |||
| @@ -100,12 +100,6 @@ MindSpore Serving运行需要配置以下环境变量: | |||
| - MindSpore Serving依赖MindSpore正确运行,运行MindSpore需要完成[环境变量配置](https://gitee.com/mindspore/docs/blob/master/install/mindspore_ascend_install_pip.md#%E9%85%8D%E7%BD%AE%E7%8E%AF%E5%A2%83%E5%8F%98%E9%87%8F)。 | |||
| - MindSpore Serving依赖MindSpore库文件,需指定MindSpore软件包的编译或安装路径下的`lib`路径到LD_LIBRARY_PATH。其中,`$MINDSPORE_LIB_PATH`含义参考[安装Serving](https://gitee.com/mindspore/serving#%E5%AE%89%E8%A3%85serving)。 | |||
| ```shell | |||
| export LD_LIBRARY_PATH=$MINDSPORE_LIB_PATH:${LD_LIBRARY_PATH} | |||
| ``` | |||
| ## 快速入门 | |||
| 以一个简单的[Add网络示例](https://www.mindspore.cn/tutorial/inference/zh-CN/master/serving_example.html),演示MindSpore Serving如何使用。 | |||
| @@ -56,7 +56,6 @@ Status DistributedServable::PredictInner(const std::vector<TensorBasePtr> &input | |||
| if (!model_loaded_) { | |||
| MSI_LOG_EXCEPTION << "Model has not been loaded"; | |||
| } | |||
| MSI_EXCEPTION_IF_NULL(output); | |||
| proto::DistributedPredictRequest request; | |||
| proto::DistributedPredictRequest empty_request; | |||
| @@ -69,6 +68,9 @@ Status DistributedServable::PredictInner(const std::vector<TensorBasePtr> &input | |||
| auto rank_size = config_.distributed_meta.rank_size; | |||
| auto stage_size = config_.distributed_meta.stage_size; | |||
| if (rank_size != agent_spec_map_.size()) { | |||
| MSI_LOG_EXCEPTION << "agent_spec_map_ size " << agent_spec_map_.size() << " not match rank size " << rank_size; | |||
| } | |||
| auto agent_num_per_stage = rank_size / stage_size; | |||
| auto result_agent_id = agent_num_per_stage * (stage_size - 1); | |||
| @@ -86,25 +88,34 @@ Status DistributedServable::PredictInner(const std::vector<TensorBasePtr> &input | |||
| } | |||
| } | |||
| for (size_t i = 0; i < msg_list->size(); ++i) { | |||
| while (true) { | |||
| auto state = msg_list->at(i).future.wait_for(std::chrono::milliseconds(1000)); | |||
| if (state == std::future_status::timeout) { | |||
| if (ExitSignalHandle::Instance().HasStopped()) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) << "WorkerAgent(rank_id is " << i << " ) has stopped"; | |||
| } | |||
| continue; | |||
| } else if (state == std::future_status::ready) { | |||
| auto status = msg_list->at(i).status; | |||
| if (status != SUCCESS) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) << "WorkerAgent(rank_id is " << i << " ) infers failed"; | |||
| } | |||
| for (size_t rank_id = 0; rank_id < msg_list->size(); ++rank_id) { | |||
| auto &future = msg_list->at(rank_id).future; | |||
| const uint64_t kWaitMaxHundredMs = 10 * 10; // waiting for 10s | |||
| uint64_t k; | |||
| for (k = 0; k < kWaitMaxHundredMs; k++) { | |||
| if (ExitSignalHandle::Instance().HasStopped()) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) << "Worker has stopped"; | |||
| } | |||
| // waiting for 100ms | |||
| if (future.wait_for(std::chrono::milliseconds(100)) == std::future_status::ready) { | |||
| break; | |||
| } | |||
| } | |||
| if (k >= kWaitMaxHundredMs) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) << "Failed to wait for result of rank " << rank_id; | |||
| } | |||
| auto status = msg_list->at(rank_id).status; | |||
| if (status != SUCCESS) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) | |||
| << "Error happened on get result of rank " << rank_id << ": " << status.StatusMessage(); | |||
| } | |||
| } | |||
| auto &reply = msg_list->at(result_agent_id).reply; | |||
| if (reply.has_error_msg() && reply.error_msg().error_code() != 0) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) | |||
| << "Error happened on get result of rank " << result_agent_id << ": " << reply.error_msg().error_msg(); | |||
| } | |||
| for (int i = 0; i < reply.outputs_size(); ++i) { | |||
| auto p = std::make_shared<ProtoTensor>(reply.mutable_outputs(i)); | |||
| auto tensor_ptr = std::make_shared<Tensor>(p->data_type(), p->shape(), p->data(), p->data_size()); | |||
| @@ -112,6 +123,7 @@ Status DistributedServable::PredictInner(const std::vector<TensorBasePtr> &input | |||
| } | |||
| return SUCCESS; | |||
| } | |||
| std::vector<TensorInfo> DistributedServable::GetInputInfos() const { | |||
| if (!model_loaded_) { | |||
| MSI_LOG_EXCEPTION << "Model has not been loaded"; | |||
| @@ -45,35 +45,6 @@ Status WorkerAgent::Clear() { | |||
| return SUCCESS; | |||
| } | |||
| Status WorkerAgent::Run(const proto::DistributedPredictRequest &request, proto::DistributedPredictReply *reply) { | |||
| if (session_ == nullptr) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) << "Model is not loaded"; | |||
| } | |||
| // todo : DistributedPredictRequest->RequestBase | |||
| // todo : DistributedPredictReply->ReplyBase | |||
| Status status; | |||
| try { | |||
| MSI_TIME_STAMP_START(ExecuteModel) | |||
| // status = session_.ExecuteModel(request_wrap, &reply_wrap); | |||
| MSI_TIME_STAMP_END(ExecuteModel) | |||
| } catch (const std::bad_alloc &ex) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: malloc memory failed"; | |||
| } catch (const std::runtime_error &ex) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: runtime error occurred: " << ex.what(); | |||
| } catch (const std::exception &ex) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred: " << ex.what(); | |||
| } catch (...) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred"; | |||
| } | |||
| if (status != SUCCESS) { | |||
| reply->Clear(); | |||
| auto error_msg = reply->mutable_error_msg(); | |||
| error_msg->set_error_code(status.StatusCode()); | |||
| error_msg->set_error_msg(status.StatusMessage()); | |||
| } | |||
| return status; | |||
| } | |||
| Status WorkerAgent::StartAgent(const AgentStartUpConfig &config) { | |||
| session_ = InferenceLoader::Instance().CreateMindSporeInfer(); | |||
| if (session_ == nullptr) { | |||
| @@ -188,5 +159,34 @@ class ProtoDistributedPredictReply : public ReplyBase { | |||
| std::vector<ProtoTensor> tensor_list_; | |||
| }; | |||
| Status WorkerAgent::Run(const proto::DistributedPredictRequest &request, proto::DistributedPredictReply *reply) { | |||
| if (session_ == nullptr) { | |||
| return INFER_STATUS_LOG_ERROR(FAILED) << "Model is not loaded"; | |||
| } | |||
| Status status; | |||
| try { | |||
| MSI_TIME_STAMP_START(ExecuteModel) | |||
| ProtoDistributedPredictRequest request_wrap(request); | |||
| ProtoDistributedPredictReply reply_wrap(reply); | |||
| status = session_->ExecuteModel(request_wrap, &reply_wrap); | |||
| MSI_TIME_STAMP_END(ExecuteModel) | |||
| } catch (const std::bad_alloc &ex) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: malloc memory failed"; | |||
| } catch (const std::runtime_error &ex) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: runtime error occurred: " << ex.what(); | |||
| } catch (const std::exception &ex) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred: " << ex.what(); | |||
| } catch (...) { | |||
| status = INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: exception occurred"; | |||
| } | |||
| if (status != SUCCESS) { | |||
| reply->Clear(); | |||
| auto error_msg = reply->mutable_error_msg(); | |||
| error_msg->set_error_code(status.StatusCode()); | |||
| error_msg->set_error_msg(status.StatusMessage()); | |||
| } | |||
| return status; | |||
| } | |||
| } // namespace serving | |||
| } // namespace mindspore | |||
| @@ -145,6 +145,7 @@ class PyTask: | |||
| logger.warning(f"{self.task_name} get result catch exception: {e}") | |||
| logging.exception(e) | |||
| self.push_failed(1) # push success results and a failed result | |||
| yield self.index # end current coroutine, switch to next coroutine | |||
| result = self._handle_task(instance_list[self.index:]) | |||
| def _handle_task(self, instance_list): | |||