diff --git a/cmake/package.cmake b/cmake/package.cmake index 459dda0..85eca9c 100644 --- a/cmake/package.cmake +++ b/cmake/package.cmake @@ -63,6 +63,7 @@ install( DIRECTORY ${CMAKE_SOURCE_DIR}/mindspore_serving/master ${CMAKE_SOURCE_DIR}/mindspore_serving/worker + ${CMAKE_SOURCE_DIR}/mindspore_serving/common ${CMAKE_SOURCE_DIR}/mindspore_serving/client DESTINATION ${INSTALL_PY_DIR} COMPONENT mindspore_serving diff --git a/mindspore_serving/ccsrc/common/log.h b/mindspore_serving/ccsrc/common/log.h index 8d0e4de..ccc80b1 100644 --- a/mindspore_serving/ccsrc/common/log.h +++ b/mindspore_serving/ccsrc/common/log.h @@ -130,8 +130,8 @@ class MS_API LogWriter { std::string GetOutputMsg(const std::ostringstream &msg) const { std::string msg_str = msg.str(); - constexpr int max_log_size = 256; - constexpr int msg_log_start_size = 128; + constexpr int max_log_size = 384; + constexpr int msg_log_start_size = 192; if (msg_str.length() > max_log_size) { msg_str = msg_str.substr(0, msg_log_start_size) + "..." + msg_str.substr(msg_str.length() - msg_log_start_size); } diff --git a/mindspore_serving/ccsrc/master/dispacther.cc b/mindspore_serving/ccsrc/master/dispacther.cc index c70ce26..363a687 100644 --- a/mindspore_serving/ccsrc/master/dispacther.cc +++ b/mindspore_serving/ccsrc/master/dispacther.cc @@ -18,6 +18,7 @@ #include #include "common/proto_tensor.h" +#include "master/master_context.h" #include "master/notify_worker/grpc_notify.h" #include "master/notify_worker/local_notify.h" @@ -54,14 +55,11 @@ DispatcherWorkerContext Dispatcher::GetWorkSession(const RequestSpec &request_sp } return context; } -void Dispatcher::SetMaxInferNum(uint32_t max_infer_num) { - if (max_infer_num != 0) { - max_infer_num_ = max_infer_num; - } -} + Status Dispatcher::JudgeInferNum() { - if (infer_num_ >= max_infer_num_) { - return INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: infer number exceeds the limit " << max_infer_num_; + auto max_infer_num = MasterContext::Instance()->GetMaxRequestBufferCount(); + if (infer_num_ >= max_infer_num) { + return INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: request buffer number exceeds the limit " << max_infer_num; } return SUCCESS; } diff --git a/mindspore_serving/ccsrc/master/dispacther.h b/mindspore_serving/ccsrc/master/dispacther.h index 96e409b..5748987 100644 --- a/mindspore_serving/ccsrc/master/dispacther.h +++ b/mindspore_serving/ccsrc/master/dispacther.h @@ -30,7 +30,6 @@ #include "common/grpc_client.h" namespace mindspore::serving { -constexpr uint32_t g_max_infer_num_ = 10000; struct DispatcherWorkerContext { WorkerSpec worker_spec; std::shared_ptr notify_worker_ = nullptr; @@ -63,7 +62,6 @@ class MS_API Dispatcher { // avoid invoke Clear and then UnregisterServable is invoked by Clear in other thread std::atomic_bool clearing_flag = false; std::atomic_uint32_t infer_num_ = 0; - uint32_t max_infer_num_ = g_max_infer_num_; Status JudgeInferNum(); DispatcherWorkerContext GetWorkSession(const RequestSpec &request_spec) const; diff --git a/mindspore_serving/ccsrc/master/master_context.cc b/mindspore_serving/ccsrc/master/master_context.cc new file mode 100644 index 0000000..6148cb5 --- /dev/null +++ b/mindspore_serving/ccsrc/master/master_context.cc @@ -0,0 +1,35 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "master/master_context.h" + +namespace mindspore::serving { + +std::shared_ptr MasterContext::Instance() { + static std::shared_ptr instance; + if (instance == nullptr) { + instance = std::make_shared(); + } + return instance; +} + +void MasterContext::SetMaxRequestBufferCount(uint32_t max_request_buffer_count) { + max_request_buffer_count_ = max_request_buffer_count; +} + +uint32_t MasterContext::GetMaxRequestBufferCount() const { return max_request_buffer_count_; } + +} // namespace mindspore::serving diff --git a/mindspore_serving/ccsrc/master/master_context.h b/mindspore_serving/ccsrc/master/master_context.h new file mode 100644 index 0000000..9640b4c --- /dev/null +++ b/mindspore_serving/ccsrc/master/master_context.h @@ -0,0 +1,40 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_SERVING_MASTER_CONTEXT_H +#define MINDSPORE_SERVING_MASTER_CONTEXT_H + +#include +#include +#include +#include "common/serving_common.h" + +namespace mindspore::serving { + +class MS_API MasterContext { + public: + static std::shared_ptr Instance(); + + void SetMaxRequestBufferCount(uint32_t max_request_buffer_count); + uint32_t GetMaxRequestBufferCount() const; + + private: + uint32_t max_request_buffer_count_ = 10000; // default 10000 +}; + +} // namespace mindspore::serving + +#endif // MINDSPORE_SERVING_MASTER_CONTEXT_H diff --git a/mindspore_serving/ccsrc/master/server.cc b/mindspore_serving/ccsrc/master/server.cc index 95f65f5..dedbed7 100644 --- a/mindspore_serving/ccsrc/master/server.cc +++ b/mindspore_serving/ccsrc/master/server.cc @@ -35,7 +35,7 @@ namespace mindspore { namespace serving { -Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size, uint32_t max_infer_num) { +Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size) { if (grpc_async_server_ != nullptr) { return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Serving Error: Serving gRPC server is already running"; } @@ -45,7 +45,6 @@ Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int ma max_msg_mb_size = gRpcMaxMBMsgSize; } grpc_async_server_ = std::make_unique(std::make_shared(dispatcher_), ip, grpc_port); - dispatcher_->SetMaxInferNum(max_infer_num); return grpc_async_server_->Init(max_msg_mb_size); } @@ -56,8 +55,7 @@ Status Server::StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port) } Status Server::StartRestfulServer(const std::string &ip, uint32_t restful_port, int max_msg_mb_size, - uint32_t max_infer_num, int time_out_second) { - dispatcher_->SetMaxInferNum(max_infer_num); + int time_out_second) { return restful_server_.Start(ip, restful_port, max_msg_mb_size, time_out_second); } diff --git a/mindspore_serving/ccsrc/master/server.h b/mindspore_serving/ccsrc/master/server.h index 825b954..9faa8cb 100644 --- a/mindspore_serving/ccsrc/master/server.h +++ b/mindspore_serving/ccsrc/master/server.h @@ -30,11 +30,10 @@ class MS_API Server { public: Server(); ~Server(); - Status StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = gRpcDefaultMsgMBSize, - uint32_t max_infer_num = g_max_infer_num_); + Status StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = gRpcDefaultMsgMBSize); Status StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port); Status StartRestfulServer(const std::string &ip, uint32_t restful_port, int max_msg_mb_size = gRpcDefaultMsgMBSize, - uint32_t max_infer_num = g_max_infer_num_, int time_out_second = 100); + int time_out_second = 100); void Clear(); std::shared_ptr GetDispatcher() { return dispatcher_; } diff --git a/mindspore_serving/ccsrc/python/master/master_py.cc b/mindspore_serving/ccsrc/python/master/master_py.cc index 6877bf2..622fe1d 100644 --- a/mindspore_serving/ccsrc/python/master/master_py.cc +++ b/mindspore_serving/ccsrc/python/master/master_py.cc @@ -20,8 +20,8 @@ namespace mindspore::serving { -void PyMaster::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size, uint32_t max_infer_num) { - auto status = Server::Instance().StartGrpcServer(ip, grpc_port, max_msg_mb_size, max_infer_num); +void PyMaster::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size) { + auto status = Server::Instance().StartGrpcServer(ip, grpc_port, max_msg_mb_size); if (status != SUCCESS) { MSI_LOG_EXCEPTION << "Raise failed: " << status.StatusMessage(); } @@ -34,9 +34,8 @@ void PyMaster::StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port) } } -void PyMaster::StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size, - uint32_t max_infer_num) { - auto status = Server::Instance().StartRestfulServer(ip, grpc_port, max_msg_mb_size, max_infer_num); +void PyMaster::StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size) { + auto status = Server::Instance().StartRestfulServer(ip, grpc_port, max_msg_mb_size); if (status != SUCCESS) { MSI_LOG_EXCEPTION << "Raise failed: " << status.StatusMessage(); } diff --git a/mindspore_serving/ccsrc/python/master/master_py.h b/mindspore_serving/ccsrc/python/master/master_py.h index 3466d56..1fc05ce 100644 --- a/mindspore_serving/ccsrc/python/master/master_py.h +++ b/mindspore_serving/ccsrc/python/master/master_py.h @@ -36,11 +36,9 @@ namespace serving { class MS_API PyMaster { public: - static void StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100, - uint32_t max_infer_num = 10000); + static void StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100); static void StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port); - static void StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100, - uint32_t max_infer_num = 10000); + static void StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100); static void WaitAndClear(); static void StopAndClear(); }; diff --git a/mindspore_serving/ccsrc/python/serving_py.cc b/mindspore_serving/ccsrc/python/serving_py.cc index eb20a2b..08193eb 100644 --- a/mindspore_serving/ccsrc/python/serving_py.cc +++ b/mindspore_serving/ccsrc/python/serving_py.cc @@ -21,6 +21,7 @@ #include "python/worker/servable_py.h" #include "python/tensor_py.h" #include "common/servable.h" +#include "master/master_context.h" #include "worker/context.h" #include "python/master/master_py.h" #include "python/agent/agent_py.h" @@ -166,7 +167,7 @@ void PyRegWorker(pybind11::module *m_ptr) { .def_static("push_postprocess_failed", &PyWorker::PushPostprocessPyFailed) .def_static("get_device_type", &PyWorker::GetDeviceType); - py::class_>(m, "Context_") + py::class_>(m, "ServableContext_") .def(py::init<>()) .def_static("get_instance", &ServableContext::Instance) .def("set_device_type_str", @@ -177,6 +178,11 @@ void PyRegWorker(pybind11::module *m_ptr) { } }) .def("set_device_id", &ServableContext::SetDeviceId); + + py::class_>(m, "MasterContext_") + .def(py::init<>()) + .def_static("get_instance", &MasterContext::Instance) + .def("set_max_request_buffer_count", &MasterContext::SetMaxRequestBufferCount); } void PyRegWorkerAgent(pybind11::module *m_ptr) { diff --git a/mindspore_serving/common/__init__.py b/mindspore_serving/common/__init__.py new file mode 100644 index 0000000..acc2ebe --- /dev/null +++ b/mindspore_serving/common/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""MindSpore Serving.""" diff --git a/mindspore_serving/worker/check_type.py b/mindspore_serving/common/check_type.py similarity index 91% rename from mindspore_serving/worker/check_type.py rename to mindspore_serving/common/check_type.py index 28155da..2c946a1 100644 --- a/mindspore_serving/worker/check_type.py +++ b/mindspore_serving/common/check_type.py @@ -53,7 +53,7 @@ def check_bool(arg_name, bool_val): raise RuntimeError(f"Parameter '{arg_name}' should be bool, but actually {type(bool_val)}") -def check_int(arg_name, int_val, mininum=None, maximum=None, is_tuple_item=False): +def check_int(arg_name, int_val, minimum=None, maximum=None, is_tuple_item=False): """Check whether the input parameters are reasonable int input""" if not is_tuple_item: prefix = f"Parameter '{arg_name}'" @@ -64,13 +64,13 @@ def check_int(arg_name, int_val, mininum=None, maximum=None, is_tuple_item=False raise RuntimeError(f"{prefix} should be int, but actually {type(int_val)}") if not isinstance(int_val, int): raise RuntimeError(f"{prefix} should be int, but actually {type(int_val)}") - if mininum is not None and int_val < mininum: + if minimum is not None and int_val < minimum: if maximum is not None: - raise RuntimeError(f"{prefix} should be in range [{mininum},{maximum}]") - raise RuntimeError(f"{prefix} should be >= {mininum}") + raise RuntimeError(f"{prefix} should be in range [{minimum},{maximum}]") + raise RuntimeError(f"{prefix} should be >= {minimum}") if maximum is not None and int_val > maximum: - if mininum is not None: - raise RuntimeError(f"{prefix} should be in range [{mininum},{maximum}]") + if minimum is not None: + raise RuntimeError(f"{prefix} should be in range [{minimum},{maximum}]") raise RuntimeError(f"{prefix} should be <= {maximum}") @@ -79,7 +79,7 @@ def check_ip_port(arg_name, port): check_int(arg_name, port, 1, 65535) -def check_and_as_int_tuple_list(arg_name, ints, mininum=None, maximum=None): +def check_and_as_int_tuple_list(arg_name, ints, minimum=None, maximum=None): """Check whether the input parameters are reasonable multiple str inputs, which can be single str, tuple or list of str. finally, return tuple of str""" @@ -94,7 +94,7 @@ def check_and_as_int_tuple_list(arg_name, ints, mininum=None, maximum=None): for item in ints: if item in int_list: raise RuntimeError(f"The item value '{item}' in parameter '{arg_name}' should not be repeated") - check_int(arg_name, item, mininum, maximum, True) + check_int(arg_name, item, minimum, maximum, True) int_list.append(item) return tuple(ints) diff --git a/mindspore_serving/master/__init__.py b/mindspore_serving/master/__init__.py index 618f4c3..8d4e8f2 100644 --- a/mindspore_serving/master/__init__.py +++ b/mindspore_serving/master/__init__.py @@ -15,11 +15,13 @@ """MindSpore Serving Master""" from ._master import start_grpc_server, start_restful_server, start_master_server, stop +from . import context __all__ = [] __all__.extend([ "start_grpc_server", 'start_restful_server', 'start_master_server', - 'stop' + 'stop', + 'context' ]) diff --git a/mindspore_serving/master/_master.py b/mindspore_serving/master/_master.py index dc29045..16bb909 100644 --- a/mindspore_serving/master/_master.py +++ b/mindspore_serving/master/_master.py @@ -16,7 +16,7 @@ import threading from functools import wraps -from mindspore_serving.worker import check_type +from mindspore_serving.common import check_type from mindspore_serving import log as logger from mindspore_serving._mindspore_serving import ExitSignalHandle_ from mindspore_serving._mindspore_serving import Master_ @@ -70,7 +70,7 @@ def stop_on_except(func): @stop_on_except -def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100, max_infer_num=10000): +def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100): r""" Start gRPC server for the communication between client and serving. @@ -79,8 +79,6 @@ def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100, max_inf grpc_port (int): gRPC port ip, default 5500, ip port range [1, 65535]. max_msg_mb_size (int): The maximum acceptable gRPC message size in megabytes(MB), default 100, value range [1, 512]. - max_infer_num (int): The maximum acceptable infer message size in number, default 10000, - Max infer number should be a positive integer. Raises: RuntimeError: Fail to start the gRPC server. @@ -93,9 +91,8 @@ def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100, max_inf check_type.check_str('ip', ip) check_type.check_ip_port('grpc_port', grpc_port) check_type.check_int('max_msg_mb_size', max_msg_mb_size, 1, 512) - check_type.check_int('max_infer_num', max_infer_num, 0) - Master_.start_grpc_server(ip, grpc_port, max_msg_mb_size, max_infer_num) + Master_.start_grpc_server(ip, grpc_port, max_msg_mb_size) _start_wait_and_clear() @@ -129,7 +126,7 @@ def start_master_server(ip="127.0.0.1", master_port=6100): @stop_on_except -def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100, max_infer_num=10000): +def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100): r""" Start RESTful server for the communication between client and serving. @@ -138,8 +135,6 @@ def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100, m restful_port (int): gRPC port ip, default 5900, ip port range [1, 65535]. max_msg_mb_size (int): The maximum acceptable RESTful message size in megabytes(MB), default 100, value range [1, 512]. - max_infer_num (int): The maximum acceptable infer message size in number, default 10000, - Max infer number should be a positive integer. Raises: RuntimeError: Fail to start the RESTful server. @@ -151,7 +146,6 @@ def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100, m check_type.check_str('ip', ip) check_type.check_ip_port('restful_port', restful_port) check_type.check_int('max_msg_mb_size', max_msg_mb_size, 1, 512) - check_type.check_int('max_infer_num', max_infer_num, 0) - Master_.start_restful_server(ip, restful_port, max_msg_mb_size, max_infer_num) + Master_.start_restful_server(ip, restful_port, max_msg_mb_size) _start_wait_and_clear() diff --git a/mindspore_serving/master/context.py b/mindspore_serving/master/context.py new file mode 100644 index 0000000..2d32d87 --- /dev/null +++ b/mindspore_serving/master/context.py @@ -0,0 +1,34 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Set context of serving""" +from mindspore_serving._mindspore_serving import MasterContext_ +from mindspore_serving.common import check_type + +_context = MasterContext_.get_instance() + + +def set_max_request_buffer_count(max_request_buffer_count): + r""" + Set the maximum number of requests waiting to be processed. + + Args: + max_request_buffer_count (int): The maximum acceptable infer message size in number, default 10000, + Max infer number should be a positive integer. + + Raises: + RuntimeError: The type or value of the parameters is invalid, or other error happened. + """ + check_type.check_int("max_request_buffer_count", max_request_buffer_count, 1) + _context.set_max_request_buffer_count(max_request_buffer_count) diff --git a/mindspore_serving/worker/_worker.py b/mindspore_serving/worker/_worker.py index b037f3d..2309f12 100644 --- a/mindspore_serving/worker/_worker.py +++ b/mindspore_serving/worker/_worker.py @@ -17,23 +17,38 @@ import threading from functools import wraps from mindspore_serving import log as logger +from mindspore_serving.common import check_type from mindspore_serving.worker import init_mindspore from mindspore_serving._mindspore_serving import ExitSignalHandle_ from mindspore_serving._mindspore_serving import Worker_ +from mindspore_serving._mindspore_serving import ServableContext_ from .register.preprocess import preprocess_storage from .register.postprocess import postprocess_storage -from . import context from .task import _start_py_task, _join_py_task -from . import check_type _wait_and_clear_thread = None def _clear_python(): + """Clear python storage data""" preprocess_storage.clear() postprocess_storage.clear() +def _set_device_id(device_id): + """Set device id, default 0""" + ServableContext_.get_instance().set_device_id(device_id) + + +def _set_device_type(device_type): + """Set device type, now can be 'None'(default), 'GPU' and 'Ascend', 'Davinci'(same as 'Ascend'), case ignored. """ + if device_type is not None: + check_type.check_str('device_type', device_type) + ServableContext_.get_instance().set_device_type_str(device_type) + else: + ServableContext_.get_instance().set_device_type_str('None') # depend on MindSpore build target + + def _start_wait_and_clear(): """Waiting for Ctrl+C, and clear up environment""" @@ -162,13 +177,8 @@ def start_servable(servable_directory, servable_name, version_number=0, init_mindspore.init_mindspore_cxx_env() _load_servable_config(servable_directory, servable_name) - if device_type is not None: - check_type.check_str('device_type', device_type) - context.set_context(device_type=device_type) - else: - context.set_context(device_type='None') # depend on register implement - - context.set_context(device_id=device_id) + _set_device_type(device_type) + _set_device_id(device_id) Worker_.start_servable(servable_directory, servable_name, version_number, master_ip, master_port, worker_ip, worker_port) _start_py_task(Worker_.get_batch_size()) @@ -219,13 +229,8 @@ def start_servable_in_master(servable_directory, servable_name, version_number=0 init_mindspore.init_mindspore_cxx_env() _load_servable_config(servable_directory, servable_name) - if device_type is not None: - check_type.check_str('device_type', device_type) - context.set_context(device_type=device_type) - else: - context.set_context(device_type='None') # depend on register implement - - context.set_context(device_id=device_id) + _set_device_type(device_type) + _set_device_id(device_id) Worker_.start_servable_in_master(servable_directory, servable_name, version_number) _start_py_task(Worker_.get_batch_size()) _start_wait_and_clear() diff --git a/mindspore_serving/worker/context.py b/mindspore_serving/worker/context.py deleted file mode 100644 index c67b39c..0000000 --- a/mindspore_serving/worker/context.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2020 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -"""Context setting interface""" -from mindspore_serving._mindspore_serving import Context_ - - -class Context: - """Set context of device, including device id and device type, can only set once currently.""" - - def __init__(self): - self.context_ = Context_.get_instance() - - def set_device_type(self, device_type): - """Set device type, now can be 'None'(default) and 'Ascend', 'Davinci'(same as 'Ascend'), case ignored. """ - self.context_.set_device_type_str(device_type) - - def set_device_id(self, device_id): - """Set device id, default 0""" - self.context_.set_device_id(device_id) - - -_k_context = None - - -def _context(): - """ - Get the global _context, if context is not created, create a new one. - - Returns: - _Context, the global context in PyNative mode. - """ - global _k_context - if _k_context is None: - _k_context = Context() - return _k_context - - -def set_context(**kwargs): - """The context setting interface. The acceptable parameters including: - device_type: 'Ascend','Davinci', 'None'. Case ignored. - - Davinci' and 'Ascend' are the same, means Ascend910 or Ascend310. - - 'None' means depend on MindSpore. - device_id: reasonable device id - """ - context = _context() - for (k, w) in kwargs.items(): - if k == "device_type": - context.set_device_type(w) - elif k == "device_id": - context.set_device_id(w) - else: - raise RuntimeError(f"Not support context key '{k}'") diff --git a/mindspore_serving/worker/distributed/agent_startup.py b/mindspore_serving/worker/distributed/agent_startup.py index 5ddca78..68b8fe2 100644 --- a/mindspore_serving/worker/distributed/agent_startup.py +++ b/mindspore_serving/worker/distributed/agent_startup.py @@ -27,7 +27,7 @@ from mindspore_serving._mindspore_serving import ExitSignalHandle_ from mindspore_serving._mindspore_serving import WorkerAgent_, AgentStartUpConfig_ from mindspore_serving import log as logger -from mindspore_serving.worker import check_type +from mindspore_serving.common import check_type from mindspore_serving.worker.distributed import worker_agent @@ -160,7 +160,8 @@ def _agent_process(send_pipe, recv_pipe, index, start_config): except Exception as e: traceback.print_exc() logger.error(f"Child {index}: Catch exception and notify exit of others") - send_pipe.send((index, e)) + exception = RuntimeError(f"Child {index} exception happen: {e}") + send_pipe.send((index, exception)) _recv_parent(parent_process, index, recv_pipe, False) logger.error(f"Child {index}: end send message to parent") @@ -251,24 +252,23 @@ def _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list) if ExitSignalHandle_.has_stopped(): logger.warning("Fail to start agents because of Ctrl+C") _send_exit_msg_to_children(send_pipe_list, subprocess_list) - return False + raise RuntimeError("Fail to start agents because of Ctrl+C") for send_pipe, process in zip(send_pipe_list, subprocess_list): if process.is_alive(): continue logger.warning("Fail to start agents because of death of one agent") _send_exit_msg_to_children(send_pipe_list, subprocess_list) - return False + raise RuntimeError("Fail to start agents because of death of one agent") index, msg = p_recv_pipe.recv() logger.info(f"Receive msg from Child {index}: {msg}") if isinstance(msg, Exception): logger.warning("Fail to start agents because of exception raise by one agent") _send_exit_msg_to_children(send_pipe_list, subprocess_list) - return False + raise msg for send_pipe in send_pipe_list: _send_pipe_msg(send_pipe, signal_success) - return True def _listening_agents_after_startup(subprocess_list, worker_ip, worker_port, agent_ip): @@ -328,16 +328,19 @@ def _startup_agents(common_meta, worker_ip, worker_port, name=f"{servable_name}_worker_agent_rank{rank_id}_device{device_id}") process.start() subprocess_list.append(process) - ret = _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list) msg = f"worker_ip: {worker_ip}, worker_port: {worker_port}, agent_ip: {agent_ip}, " \ f"agent_start_port: {agent_start_port}, device ids: {device_id_list}, rank ids: {rank_id_list}, " \ f"rank table file: {rank_table_file}, model files: {model_files}, group config files: {group_config_files}" - if not ret: + + try: + _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list) + # pylint: disable=broad-except + except Exception as e: WorkerAgent_.notify_failed(worker_ip, worker_port) logger.error(f"Failed to start agents, {msg}") print(f"Failed to start agents, {msg}") - raise RuntimeError("Failed to start agents") + raise e logger.info(f"Success to start agents, {msg}") print(f"Success to start agents, {msg}") diff --git a/mindspore_serving/worker/distributed/distributed_worker.py b/mindspore_serving/worker/distributed/distributed_worker.py index c27f962..ef77d27 100644 --- a/mindspore_serving/worker/distributed/distributed_worker.py +++ b/mindspore_serving/worker/distributed/distributed_worker.py @@ -18,7 +18,7 @@ import os from mindspore_serving._mindspore_serving import Worker_ from mindspore_serving import log as logger -from mindspore_serving.worker import check_type +from mindspore_serving.common import check_type from mindspore_serving.worker._worker import _start_py_task, _start_wait_and_clear from mindspore_serving.worker._worker import stop_on_except, _load_servable_config diff --git a/mindspore_serving/worker/distributed/register.py b/mindspore_serving/worker/distributed/register.py index 9e8f571..1e5cd63 100644 --- a/mindspore_serving/worker/distributed/register.py +++ b/mindspore_serving/worker/distributed/register.py @@ -15,7 +15,7 @@ """Serving, distributed worker register""" from mindspore_serving._mindspore_serving import ServableMeta_, ServableStorage_ -from mindspore_serving.worker import check_type +from mindspore_serving.common import check_type from mindspore_serving.worker.common import get_servable_dir from mindspore_serving import log as logger diff --git a/mindspore_serving/worker/register/method.py b/mindspore_serving/worker/register/method.py index ccde72c..063227c 100644 --- a/mindspore_serving/worker/register/method.py +++ b/mindspore_serving/worker/register/method.py @@ -21,7 +21,7 @@ from easydict import EasyDict from mindspore_serving._mindspore_serving import ServableStorage_, MethodSignature_, PredictPhaseTag_ from mindspore_serving.worker.common import get_func_name, get_servable_dir -from mindspore_serving.worker import check_type +from mindspore_serving.common import check_type from mindspore_serving import log as logger from .preprocess import register_preprocess, check_preprocess from .postprocess import register_postprocess, check_postprocess diff --git a/mindspore_serving/worker/register/servable.py b/mindspore_serving/worker/register/servable.py index 162944a..1b7368a 100644 --- a/mindspore_serving/worker/register/servable.py +++ b/mindspore_serving/worker/register/servable.py @@ -15,7 +15,7 @@ """Servable declaration interface""" from mindspore_serving._mindspore_serving import ServableMeta_, ServableStorage_ -from mindspore_serving.worker import check_type +from mindspore_serving.common import check_type from mindspore_serving.worker.common import get_servable_dir from mindspore_serving import log as logger diff --git a/tests/ut/python/runtest.sh b/tests/ut/python/runtest.sh index 17f07d1..003c96e 100755 --- a/tests/ut/python/runtest.sh +++ b/tests/ut/python/runtest.sh @@ -32,6 +32,7 @@ cp ../mindspore_serving/proto/ms_service*.py mindspore_serving/proto/ cp _mindspore_serving*.so mindspore_serving/ cp -r ${PROJECT_PATH}/mindspore_serving/master mindspore_serving/ cp -r ${PROJECT_PATH}/mindspore_serving/worker mindspore_serving/ +cp -r ${PROJECT_PATH}/mindspore_serving/common mindspore_serving/ cp -r ${PROJECT_PATH}/mindspore_serving/client mindspore_serving/ cp ${PROJECT_PATH}/mindspore_serving/*.py mindspore_serving/ diff --git a/tests/ut/python/tests/common.py b/tests/ut/python/tests/common.py index eaca83d..2ea1637 100644 --- a/tests/ut/python/tests/common.py +++ b/tests/ut/python/tests/common.py @@ -130,6 +130,7 @@ def serving_test(func): try: func(*args, **kwargs) finally: + master.context.set_max_request_buffer_count(10000) master.stop() worker.stop() servable_dir = os.path.join(os.getcwd(), "serving_python_ut_servables") diff --git a/tests/ut/python/tests/test_distributed_worker.py b/tests/ut/python/tests/test_distributed_worker.py index c1d2f7e..70afb23 100644 --- a/tests/ut/python/tests/test_distributed_worker.py +++ b/tests/ut/python/tests/test_distributed_worker.py @@ -92,23 +92,37 @@ def start_distributed_worker(base): return worker -def start_agents(model_file_list, group_config_list): +def start_agents(model_file_list, group_config_list, start_port): send_pipe, recv_pipe = Pipe() def agent_process(send_pipe): - distributed.startup_worker_agents(worker_ip="127.0.0.1", worker_port=6200, model_files=model_file_list, - group_config_files=group_config_list) - send_pipe.send("Success") + try: + distributed.startup_worker_agents(worker_ip="127.0.0.1", worker_port=6200, model_files=model_file_list, + group_config_files=group_config_list, agent_start_port=start_port) + send_pipe.send("Success") + # pylint: disable=broad-except + except Exception as e: + send_pipe.send(e) send_pipe.close() agent = Process(target=agent_process, args=(send_pipe,)) agent.start() index = 0 - while index < 50 and agent.is_alive(): # wait max 5 s + while index < 100 and agent.is_alive(): # wait max 10 s index += 1 if recv_pipe.poll(0.1): + msg = recv_pipe.recv() + print(f"Receive agent process msg: {msg} {agent.is_alive()}") + if isinstance(msg, Exception): + raise msg break - assert index < 50 + + if recv_pipe.poll(0.1): + msg = recv_pipe.recv() + print(f"Receive agent process msg: {msg} {agent.is_alive()}") + if isinstance(msg, Exception): + raise msg + assert index < 100 assert agent.is_alive() return agent @@ -138,7 +152,7 @@ def test_distributed_worker_worker_exit_success(): base = start_distributed_grpc_server() worker_process = start_distributed_worker(base) base.add_on_exit(lambda: send_exit(worker_process)) - agent_process = start_agents(base.model_file_list, base.group_config_list) + agent_process = start_agents(base.model_file_list, base.group_config_list, 7000) base.add_on_exit(lambda: send_exit(agent_process)) client = create_client("localhost", 5500, base.servable_name, "predict") @@ -179,7 +193,7 @@ def test_distributed_worker_agent_exit_success(): base = start_distributed_grpc_server() worker_process = start_distributed_worker(base) base.add_on_exit(lambda: send_exit(worker_process)) - agent_process = start_agents(base.model_file_list, base.group_config_list) + agent_process = start_agents(base.model_file_list, base.group_config_list, 7008) base.add_on_exit(lambda: send_exit(agent_process)) client = create_client("localhost", 5500, base.servable_name, "predict") @@ -217,7 +231,7 @@ def test_distributed_worker_agent_startup_killed_exit_success(): base = start_distributed_grpc_server() worker_process = start_distributed_worker(base) base.add_on_exit(lambda: send_exit(worker_process)) - agent_process = start_agents(base.model_file_list, base.group_config_list) + agent_process = start_agents(base.model_file_list, base.group_config_list, 7016) base.add_on_exit(lambda: send_exit(agent_process)) client = create_client("localhost", 5500, base.servable_name, "predict") @@ -256,7 +270,7 @@ def test_distributed_worker_agent_killed_exit_success(): base = start_distributed_grpc_server() worker_process = start_distributed_worker(base) base.add_on_exit(lambda: send_exit(worker_process)) - agent_process = start_agents(base.model_file_list, base.group_config_list) + agent_process = start_agents(base.model_file_list, base.group_config_list, 7024) base.add_on_exit(lambda: send_exit(agent_process)) client = create_client("localhost", 5500, base.servable_name, "predict") @@ -289,3 +303,17 @@ def test_distributed_worker_agent_killed_exit_success(): assert not worker_process.is_alive() assert not agent_process.is_alive() assert not agents_alive() + + +@serving_test +def test_distributed_worker_agent_invalid_model_files_failed(): + base = start_distributed_grpc_server() + worker_process = start_distributed_worker(base) + base.add_on_exit(lambda: send_exit(worker_process)) + base.model_file_list[0] = base.model_file_list[0] + "_error" + try: + start_agents(base.model_file_list, base.group_config_list, 7036) + assert False + # pylint: disable=broad-except + except Exception as e: + assert "Cannot access model file" in str(e) diff --git a/tests/ut/python/tests/test_mater_worker_client.py b/tests/ut/python/tests/test_mater_worker_client.py index 2d33868..150bbf0 100644 --- a/tests/ut/python/tests/test_mater_worker_client.py +++ b/tests/ut/python/tests/test_mater_worker_client.py @@ -1089,12 +1089,106 @@ def test_servable_worker_alone_servable_not_available(): instances = [] y_data_list = [] for i in range(instance_count): - x1 = np.asarray([[1.1], [3.3]]).astype(np.float32) * (i + 1) - x2 = np.asarray([[5.5], [7.7]]).astype(np.float32) * (i + 1) + x1 = np.asarray([[1.1, 2.2], [3.3, 4.4]]).astype(np.float32) * (i + 1) + x2 = np.asarray([[5.5, 6.6], [7.7, 8.8]]).astype(np.float32) * (i + 1) y_data_list.append(x1 + x2) - instances.append({"x3": x1, "x2": x2}) + instances.append({"x1": x1, "x2": x2}) client = create_client("localhost", 5500, base.servable_name + "error", "add_common") result = client.infer(instances) print(result) assert "servable is not available" in result["error"] + + +@serving_test +def test_servable_worker_with_master_max_request_count(): + # fail returned from Worker::RunAsync + base = ServingTestBase() + servable_content = servable_config_import + servable_content += servable_config_declare_servable + servable_content += r""" +import time +def preprocess(x1, x2): + time.sleep(1) + return x1, x2 + +@register.register_method(output_names=["y"]) +def add_common(x1, x2): + x1, x2 = register.call_preprocess(preprocess, x1, x2) + y = register.call_servable(x1, x2) + return y +""" + base.init_servable_with_servable_config(1, servable_content) + master.context.set_max_request_buffer_count(1) + worker.start_servable_in_master(base.servable_dir, base.servable_name) + master.start_grpc_server("0.0.0.0", 5500) + # Client + x1 = np.asarray([[1.1, 2.2], [3.3, 4.4]]).astype(np.float32) + x2 = np.asarray([[5.5, 6.6], [7.7, 8.8]]).astype(np.float32) + instance = {"x1": x1, "x2": x2} + + client = create_client("localhost", 5500, base.servable_name, "add_common") + result_list = [] + for _ in range(2): + result = client.infer_async(instance) + result_list.append(result) + + result0 = result_list[0].result() + result1 = result_list[1].result() + print(result0) + print(result1) + assert "error" in result0 or "error" in result1 + if "error" in result0: + assert "error" not in result1 + assert "Serving Error: request buffer number exceeds the limit 1" in result0["error"] + else: + assert "error" not in result0 + assert "Serving Error: request buffer number exceeds the limit 1" in result1["error"] + + +@serving_test +def test_servable_worker_alone_max_request_count(): + # fail returned from Worker::RunAsync + base = ServingTestBase() + servable_content = servable_config_import + servable_content += servable_config_declare_servable + servable_content += r""" +import time +def preprocess(x1, x2): + time.sleep(1) + return x1, x2 + +@register.register_method(output_names=["y"]) +def add_common(x1, x2): + x1, x2 = register.call_preprocess(preprocess, x1, x2) + y = register.call_servable(x1, x2) + return y +""" + base.init_servable_with_servable_config(1, servable_content) + master.context.set_max_request_buffer_count(1) + + master.start_master_server("0.0.0.0", 6100) + worker.start_servable(base.servable_dir, base.servable_name, worker_port=6200, master_port=6100) + master.start_grpc_server("0.0.0.0", 5500) + # Client + x1 = np.asarray([[1.1, 2.2], [3.3, 4.4]]).astype(np.float32) + x2 = np.asarray([[5.5, 6.6], [7.7, 8.8]]).astype(np.float32) + instance = {"x1": x1, "x2": x2} + + client = create_client("localhost", 5500, base.servable_name, "add_common") + result_list = [] + for _ in range(2): + result = client.infer_async(instance) + result_list.append(result) + + result0 = result_list[0].result() + result1 = result_list[1].result() + print(result0) + print(result1) + assert "error" in result0 or "error" in result1 + if "error" in result0: + assert "error" not in result1 + assert "Serving Error: request buffer number exceeds the limit 1" in result0["error"] + else: + assert "error" not in result0 + assert "Serving Error: request buffer number exceeds the limit 1" in result1["error"]