From bb7a7236a8ff9df3f892a4f2dcacc50c6418f49b Mon Sep 17 00:00:00 2001 From: xuyongfei Date: Mon, 1 Mar 2021 10:37:54 +0800 Subject: [PATCH] Serving, bugfix 0301 --- mindspore_serving/ccsrc/python/serving_py.cc | 3 +- .../ccsrc/python/worker/worker_py.cc | 15 +++++ .../ccsrc/python/worker/worker_py.h | 2 + .../ccsrc/worker/inference/inference.cc | 31 +++++++++ .../ccsrc/worker/inference/inference.h | 1 + .../worker/inference/mindspore_model_wrap.cc | 63 ++++++++++--------- .../worker/inference/mindspore_model_wrap.h | 1 + .../worker/local_servable/local_sevable.cc | 20 +----- mindspore_serving/worker/_check_version.py | 35 +++++++++-- mindspore_serving/worker/_worker.py | 1 + .../worker/distributed/agent_startup.py | 59 ++++++++++++----- mindspore_serving/worker/init_mindspore.py | 10 ++- requirements_test.txt | 3 +- setup.py | 3 +- tests/ut/stub/stub_inference.cc | 31 +++++++++ 15 files changed, 206 insertions(+), 72 deletions(-) diff --git a/mindspore_serving/ccsrc/python/serving_py.cc b/mindspore_serving/ccsrc/python/serving_py.cc index e4c20c0..5e9e950 100644 --- a/mindspore_serving/ccsrc/python/serving_py.cc +++ b/mindspore_serving/ccsrc/python/serving_py.cc @@ -163,7 +163,8 @@ void PyRegWorker(pybind11::module *m_ptr) { .def_static("push_preprocess_result", &PyWorker::PushPreprocessPyResult) .def_static("push_preprocess_failed", &PyWorker::PushPreprocessPyFailed) .def_static("push_postprocess_result", &PyWorker::PushPostprocessPyResult) - .def_static("push_postprocess_failed", &PyWorker::PushPostprocessPyFailed); + .def_static("push_postprocess_failed", &PyWorker::PushPostprocessPyFailed) + .def_static("get_device_type", &PyWorker::GetDeviceType); py::class_>(m, "Context_") .def(py::init<>()) diff --git a/mindspore_serving/ccsrc/python/worker/worker_py.cc b/mindspore_serving/ccsrc/python/worker/worker_py.cc index 02880cf..48b6464 100644 --- a/mindspore_serving/ccsrc/python/worker/worker_py.cc +++ b/mindspore_serving/ccsrc/python/worker/worker_py.cc @@ -25,6 +25,7 @@ #include "worker/distributed_worker/distributed_servable.h" #include "worker/grpc/worker_server.h" #include "worker/distributed_worker/distributed_process/distributed_server.h" +#include "worker/inference/inference.h" namespace mindspore::serving { @@ -220,4 +221,18 @@ void PyWorker::StopAndClear() { int PyWorker::GetBatchSize() { return Worker::GetInstance().GetBatchSize(); } +std::string PyWorker::GetDeviceType() { + auto device_type = InferenceLoader::Instance().GetSupportDeviceType(kDeviceTypeNotSpecified, kUnknownType); + if (device_type == kDeviceTypeAscend || device_type == kDeviceTypeAscendMS || device_type == kDeviceTypeAscendCL) { + return "Ascend"; + } + if (device_type == kDeviceTypeGpu) { + return "Gpu"; + } + if (device_type == kDeviceTypeCpu) { + return "Cpu"; + } + return std::string(); +} + } // namespace mindspore::serving diff --git a/mindspore_serving/ccsrc/python/worker/worker_py.h b/mindspore_serving/ccsrc/python/worker/worker_py.h index 51a4f43..80289c1 100644 --- a/mindspore_serving/ccsrc/python/worker/worker_py.h +++ b/mindspore_serving/ccsrc/python/worker/worker_py.h @@ -56,6 +56,8 @@ class MS_API PyWorker { static void PushPostprocessPyResult(const py::tuple &output_batch); static void PushPostprocessPyFailed(int count); + static std::string GetDeviceType(); + private: static void OnEndStartServable(const std::string &servable_directory, const std::string &servable_name, uint32_t spec_version_number, uint32_t started_version_number); diff --git a/mindspore_serving/ccsrc/worker/inference/inference.cc b/mindspore_serving/ccsrc/worker/inference/inference.cc index 06bbecb..3fd8823 100644 --- a/mindspore_serving/ccsrc/worker/inference/inference.cc +++ b/mindspore_serving/ccsrc/worker/inference/inference.cc @@ -113,4 +113,35 @@ Status InferenceLoader::LoadMindSporeModelWrap() { } return SUCCESS; } + +DeviceType InferenceLoader::GetSupportDeviceType(DeviceType device_type, ModelType model_type) { + auto mindspore_infer = CreateMindSporeInfer(); + if (mindspore_infer == nullptr) { + MSI_LOG_ERROR << "Create MindSpore infer failed"; + return kDeviceTypeNotSpecified; + } + if (model_type == kUnknownType) { + model_type = kMindIR; + } + if (device_type == kDeviceTypeNotSpecified) { + auto ascend_list = {kDeviceTypeAscendCL, kDeviceTypeAscendMS, kDeviceTypeGpu}; + for (auto item : ascend_list) { + if (mindspore_infer->CheckModelSupport(item, model_type)) { + return item; + } + } + } else if (device_type == kDeviceTypeAscend) { + auto ascend_list = {kDeviceTypeAscendCL, kDeviceTypeAscendMS}; + for (auto item : ascend_list) { + if (mindspore_infer->CheckModelSupport(item, model_type)) { + return item; + } + } + } else { + if (mindspore_infer->CheckModelSupport(device_type, model_type)) { + return device_type; + } + } + return kDeviceTypeNotSpecified; +} } // namespace mindspore::serving diff --git a/mindspore_serving/ccsrc/worker/inference/inference.h b/mindspore_serving/ccsrc/worker/inference/inference.h index 07c780f..33c2552 100644 --- a/mindspore_serving/ccsrc/worker/inference/inference.h +++ b/mindspore_serving/ccsrc/worker/inference/inference.h @@ -135,6 +135,7 @@ class MS_API InferenceLoader { ~InferenceLoader(); static InferenceLoader &Instance(); std::shared_ptr CreateMindSporeInfer(); + DeviceType GetSupportDeviceType(DeviceType device_type, ModelType model_type); private: typedef InferenceBase *(*CreateInferHandle)(); diff --git a/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.cc b/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.cc index 4ca3504..bbece03 100644 --- a/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.cc +++ b/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.cc @@ -78,13 +78,9 @@ Status MindSporeModelWrap::LoadModelFromFile(serving::DeviceType device_type, ui const std::string &file_name, ModelType model_type, bool with_batch_dim, const std::vector &without_batch_dim_inputs, const std::map &other_options) { - std::string device_type_str; - if (device_type == kDeviceTypeAscendMS) { - device_type_str = mindspore::kDeviceTypeAscend910; - } else if (device_type == kDeviceTypeAscendCL) { - device_type_str = mindspore::kDeviceTypeAscend310; - } else { - MSI_LOG_EXCEPTION << "Only support Ascend310 or Ascend910 in MindSporeModelWrap"; + std::string ms_device_type = GetMsDeviceType(device_type); + if (ms_device_type.empty()) { + return INFER_STATUS_LOG_ERROR(FAILED) << "Invalid device type " << device_type; } auto ms_model_type = GetMsModelType(model_type); if (ms_model_type == mindspore::kUnknownType) { @@ -93,26 +89,26 @@ Status MindSporeModelWrap::LoadModelFromFile(serving::DeviceType device_type, ui std::shared_ptr model = nullptr; try { - mindspore::GlobalContext::SetGlobalDeviceTarget(device_type_str); + mindspore::GlobalContext::SetGlobalDeviceTarget(ms_device_type); mindspore::GlobalContext::SetGlobalDeviceID(device_id); auto graph = mindspore::Serialization::LoadModel(file_name, ms_model_type); auto context = TransformModelContext(other_options); model = std::make_shared(mindspore::GraphCell(graph), context); } catch (std::runtime_error &ex) { return INFER_STATUS_LOG_ERROR(FAILED) - << "Load model from file failed, model file: " << file_name << ", device_type: '" << device_type_str + << "Load model from file failed, model file: " << file_name << ", device_type: '" << ms_device_type << "', device_id: " << device_id << ", model type: " << model_type << ", options: " << other_options; } mindspore::Status status = model->Build(); if (!status.IsOk()) { return INFER_STATUS_LOG_ERROR(FAILED) - << "Load model from file failed, model file: " << file_name << ", device_type: '" << device_type_str + << "Load model from file failed, model file: " << file_name << ", device_type: '" << ms_device_type << "', device_id: " << device_id << ", model type: " << model_type << ", options: " << other_options << ", build error detail: " << status.ToString(); } ApiModelInfo api_model_info; api_model_info.model = model; - api_model_info.device_type = device_type_str; + api_model_info.device_type = ms_device_type; api_model_info.device_id = device_id; api_model_info.with_batch_dim = with_batch_dim; api_model_info.without_batch_dim_inputs = without_batch_dim_inputs; @@ -122,7 +118,7 @@ Status MindSporeModelWrap::LoadModelFromFile(serving::DeviceType device_type, ui } GetModelBatchSize(&api_model_info); model_ = api_model_info; - MSI_LOG_INFO << "Load model from file success, model file: " << file_name << ", device_type: '" << device_type_str + MSI_LOG_INFO << "Load model from file success, model file: " << file_name << ", device_type: '" << ms_device_type << "', device_id: " << device_id << ", model type: " << model_type << ", options: " << other_options; return SUCCESS; } @@ -346,24 +342,15 @@ std::vector MindSporeModelWrap::GetOutputInfos() const { re ssize_t MindSporeModelWrap::GetBatchSize() const { return model_.batch_size; } bool MindSporeModelWrap::CheckModelSupport(DeviceType device_type, ModelType model_type) const { - std::string device_type_str; - switch (device_type) { - case kDeviceTypeAscendMS: - if (model_type != kMindIR) { - return false; - } - device_type_str = mindspore::kDeviceTypeAscend910; - break; - case kDeviceTypeAscendCL: - if (model_type != kMindIR && model_type != kOM) { - return false; - } - device_type_str = mindspore::kDeviceTypeAscend310; - break; - default: - return false; + std::string ms_device_type = GetMsDeviceType(device_type); + if (ms_device_type.empty()) { + return false; + } + auto ms_model_type = GetMsModelType(model_type); + if (ms_model_type == mindspore::kUnknownType) { + return false; } - return mindspore::Model::CheckModelSupport(device_type_str, GetMsModelType(model_type)); + return mindspore::Model::CheckModelSupport(ms_device_type, ms_model_type); } mindspore::ModelType MindSporeModelWrap::GetMsModelType(serving::ModelType model_type) { @@ -387,6 +374,24 @@ mindspore::ModelType MindSporeModelWrap::GetMsModelType(serving::ModelType model return ms_model_type; } +std::string MindSporeModelWrap::GetMsDeviceType(serving::DeviceType device_type) { + std::string device_type_str; + switch (device_type) { + case kDeviceTypeAscendMS: + device_type_str = mindspore::kDeviceTypeAscend910; + break; + case kDeviceTypeAscendCL: + device_type_str = mindspore::kDeviceTypeAscend310; + break; + case kDeviceTypeGpu: + device_type_str = mindspore::kDeviceTypeGPU; + break; + default: + break; + } + return device_type_str; +} + ApiBufferTensorWrap::ApiBufferTensorWrap() = default; ApiBufferTensorWrap::ApiBufferTensorWrap(const mindspore::MSTensor &tensor) : tensor_(tensor) {} diff --git a/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.h b/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.h index 1d53565..f2a4c0b 100644 --- a/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.h +++ b/mindspore_serving/ccsrc/worker/inference/mindspore_model_wrap.h @@ -79,6 +79,7 @@ class MindSporeModelWrap : public InferenceBase { std::shared_ptr TransformModelContext(const std::map &other_options); void GetModelBatchSize(ApiModelInfo *model_info); static mindspore::ModelType GetMsModelType(serving::ModelType model_type); + static std::string GetMsDeviceType(serving::DeviceType device_type); }; class ApiBufferTensorWrap : public TensorBase { diff --git a/mindspore_serving/ccsrc/worker/local_servable/local_sevable.cc b/mindspore_serving/ccsrc/worker/local_servable/local_sevable.cc index 1d54499..0221a12 100644 --- a/mindspore_serving/ccsrc/worker/local_servable/local_sevable.cc +++ b/mindspore_serving/ccsrc/worker/local_servable/local_sevable.cc @@ -200,27 +200,11 @@ Status LocalModelServable::InitDevice(ModelType model_type, const std::mapGetDeviceType(); - auto get_support_device_type = [this, device_type, model_type]() { - std::vector support_device_list; - if (device_type == kDeviceTypeNotSpecified || device_type == kDeviceTypeAscend) { - auto ascend_list = {kDeviceTypeAscendCL, kDeviceTypeAscendMS}; - for (auto item : ascend_list) { - if (session_->CheckModelSupport(item, model_type)) { - return item; - } - } - } else if (device_type == kDeviceTypeAscendCL || device_type == kDeviceTypeAscendMS) { - if (session_->CheckModelSupport(device_type, model_type)) { - return device_type; - } - } - return kDeviceTypeNotSpecified; - }; - auto support_device_type = get_support_device_type(); + auto support_device_type = InferenceLoader::Instance().GetSupportDeviceType(device_type, model_type); if (support_device_type == kDeviceTypeNotSpecified) { return INFER_STATUS_LOG_ERROR(FAILED) << "Not support device type " << device_type << " and model type " << model_type - << ". Ascend 910 supports MindIR model and Ascend 310 supports OM, MindIR model"; + << ". Ascend 910, Ascend 310 and GPU supports MindIR model, and Ascend 310 supports OM model"; } context->SetDeviceType(support_device_type); return SUCCESS; diff --git a/mindspore_serving/worker/_check_version.py b/mindspore_serving/worker/_check_version.py index b31c68b..1296fec 100644 --- a/mindspore_serving/worker/_check_version.py +++ b/mindspore_serving/worker/_check_version.py @@ -127,6 +127,19 @@ class AscendEnvChecker: logger.warning(f"No such directory: {self.op_path}, Please check if Ascend 910 AI software package is " f"installed correctly.") + def try_set_env_lib(self): + """try set env but with no warning: LD_LIBRARY_PATH""" + try: + # pylint: disable=unused-import + import te + # pylint: disable=broad-except + except Exception: + if Path(self.tbe_path).is_dir(): + if os.getenv('LD_LIBRARY_PATH'): + os.environ['LD_LIBRARY_PATH'] = self.tbe_path + ":" + os.environ['LD_LIBRARY_PATH'] + else: + os.environ['LD_LIBRARY_PATH'] = self.tbe_path + def _check_env(self): """ascend dependence path check""" if self.path is None or self.path_check not in self.path: @@ -151,11 +164,21 @@ class AscendEnvChecker: "you can reference to the installation guidelines https://www.mindspore.cn/install") -def check_version_and_env_config(): +def check_version_and_env_config(device_type): """check version and env config""" - env_checker = AscendEnvChecker() + if device_type == "Ascend": + env_checker = AscendEnvChecker() + try: + env_checker.set_env() + except ImportError as e: + env_checker.check_env(e) + elif device_type == "Gpu": + pass + elif device_type == "Cpu": + pass + - try: - env_checker.set_env() - except ImportError as e: - env_checker.check_env(e) +def check_version_and_try_set_env_lib(): + """check version and try set env LD_LIBRARY_PATH""" + env_checker = AscendEnvChecker() + env_checker.try_set_env_lib() diff --git a/mindspore_serving/worker/_worker.py b/mindspore_serving/worker/_worker.py index c778af9..b037f3d 100644 --- a/mindspore_serving/worker/_worker.py +++ b/mindspore_serving/worker/_worker.py @@ -197,6 +197,7 @@ def start_servable_in_master(servable_directory, servable_name, version_number=0 "Ascend" means the device type can be Ascend910 or Ascend310, etc. "Davinci" has the same meaning as "Ascend". None means the device type is determined by the MindSpore environment. + device_id (int): The id of the device the model loads into and runs in. Examples: >>> import os diff --git a/mindspore_serving/worker/distributed/agent_startup.py b/mindspore_serving/worker/distributed/agent_startup.py index e763b5d..63315fd 100644 --- a/mindspore_serving/worker/distributed/agent_startup.py +++ b/mindspore_serving/worker/distributed/agent_startup.py @@ -18,7 +18,9 @@ import os import time import sys import traceback +import signal from multiprocessing import Process, Pipe +import psutil from mindspore_serving._mindspore_serving import ExitSignalHandle_ from mindspore_serving._mindspore_serving import WorkerAgent_, AgentStartUpConfig_ @@ -163,23 +165,52 @@ def _send_exit_msg_to_children(send_pipe_list, subprocess_list): logger.warning(f"Child {index} is not alive") index += 1 - wait_seconds = 10 - for i in range(wait_seconds): - all_exit = True - for process in subprocess_list: - if process.is_alive(): - logger.warning(f"There are still child processes that have not exited and will be forcibly killed in " - f"{wait_seconds - i} seconds.") - time.sleep(1) - all_exit = False - break - if all_exit: - logger.info(f"All Child process exited") - return + def wait_exit(wait_seconds, msg): + for i in range(wait_seconds): + all_exit = True + for process in subprocess_list: + if process.is_alive(): + logger.warning(f"There are still child processes that have not exited and {msg} in " + f"{wait_seconds - i} seconds.") + time.sleep(1) + all_exit = False + break + if all_exit: + logger.info(f"All Child process exited") + return True + return False + + if wait_exit(3, "SIGINT will be sent"): + return + # Send signal SIGINT + for index, process in enumerate(subprocess_list): + if process.is_alive(): + logger.warning(f"Send signal SIGINT to {index}") + try: + child_process = psutil.Process(process.pid) + children_of_child = child_process.children(recursive=True) + for item in children_of_child: + os.kill(item.pid, signal.SIGINT) + # pylint: disable=broad-except + except Exception as e: + logger.warning(f"Get exception when send signal SIGINT to children of child {index}, exception: {e}") + os.kill(process.pid, signal.SIGINT) + + if wait_exit(10, "will be forcibly killed"): + return + for index, process in enumerate(subprocess_list): if process.is_alive(): logger.warning(f"Kill Child process {index}") - process.kill() + try: + child_process = psutil.Process(process.pid) + children_of_child = child_process.children(recursive=True) + for item in children_of_child: + os.kill(item.pid, signal.SIGKILL) + # pylint: disable=broad-except + except Exception as e: + logger.warning(f"Get exception when send signal SIGINT to children of child {index}, exception: {e}") + os.kill(process.pid, signal.SIGKILL) def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list): diff --git a/mindspore_serving/worker/init_mindspore.py b/mindspore_serving/worker/init_mindspore.py index 41680a4..3f8f93d 100644 --- a/mindspore_serving/worker/init_mindspore.py +++ b/mindspore_serving/worker/init_mindspore.py @@ -16,7 +16,8 @@ import os import importlib from mindspore_serving import log as logger -from ._check_version import check_version_and_env_config +from mindspore_serving._mindspore_serving import Worker_ +from ._check_version import check_version_and_env_config, check_version_and_try_set_env_lib _flag_set_mindspore_cxx_env = False @@ -50,5 +51,10 @@ def init_mindspore_cxx_env(): if _flag_set_mindspore_cxx_env: return _flag_set_mindspore_cxx_env = True - check_version_and_env_config() + check_version_and_try_set_env_lib() # try set env LD_LIBRARY_PATH _set_mindspore_cxx_env() + device_type = Worker_.get_device_type() + if not device_type: + logger.warning("Failed to get device type") + return + check_version_and_env_config(device_type) diff --git a/requirements_test.txt b/requirements_test.txt index 876298a..aff48eb 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,4 +1,5 @@ numpy>=1.17.0 protobuf>=3.8.0 grpcio>=1.27.3 -requests>=2.22.0 \ No newline at end of file +requests>=2.22.0 +psutil >= 5.8.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 32e14d0..fe7eff9 100644 --- a/setup.py +++ b/setup.py @@ -98,7 +98,8 @@ build_dependencies() required_package = [ 'numpy >= 1.17.0', 'protobuf >= 3.8.0', - 'grpcio >= 1.27.3' + 'grpcio >= 1.27.3', + 'psutil >= 5.8.0' ] package_data = { diff --git a/tests/ut/stub/stub_inference.cc b/tests/ut/stub/stub_inference.cc index 33127ff..f812707 100644 --- a/tests/ut/stub/stub_inference.cc +++ b/tests/ut/stub/stub_inference.cc @@ -32,4 +32,35 @@ std::shared_ptr InferenceLoader::CreateMindSporeInfer() { } Status InferenceLoader::LoadMindSporeModelWrap() { return SUCCESS; } + +DeviceType InferenceLoader::GetSupportDeviceType(DeviceType device_type, ModelType model_type) { + auto mindspore_infer = CreateMindSporeInfer(); + if (mindspore_infer == nullptr) { + MSI_LOG_ERROR << "Create MindSpore infer failed"; + return kDeviceTypeNotSpecified; + } + if (model_type == kUnknownType) { + model_type = kMindIR; + } + if (device_type == kDeviceTypeNotSpecified) { + auto ascend_list = {kDeviceTypeAscendCL, kDeviceTypeAscendMS, kDeviceTypeGpu}; + for (auto item : ascend_list) { + if (mindspore_infer->CheckModelSupport(item, model_type)) { + return item; + } + } + } else if (device_type == kDeviceTypeAscend) { + auto ascend_list = {kDeviceTypeAscendCL, kDeviceTypeAscendMS}; + for (auto item : ascend_list) { + if (mindspore_infer->CheckModelSupport(item, model_type)) { + return item; + } + } + } else { + if (mindspore_infer->CheckModelSupport(device_type, model_type)) { + return device_type; + } + } + return kDeviceTypeNotSpecified; +} } // namespace mindspore::serving