diff --git a/mindspore_serving/ccsrc/common/grpc_server.cc b/mindspore_serving/ccsrc/common/grpc_server.cc index 18d705d..9edb394 100644 --- a/mindspore_serving/ccsrc/common/grpc_server.cc +++ b/mindspore_serving/ccsrc/common/grpc_server.cc @@ -23,7 +23,7 @@ Status GrpcServer::Start(std::shared_ptr service, const std::stri service_ = service; Status status; if (in_running_) { - return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Grpc server is running"; + return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Serving Error: " << server_tag << " server is already running"; } std::string server_address = ip + ":" + std::to_string(grpc_port); @@ -42,13 +42,13 @@ Status GrpcServer::Start(std::shared_ptr service, const std::stri server_ = serverBuilder.BuildAndStart(); if (server_ == nullptr) { - return INFER_STATUS_LOG_ERROR(FAILED) - << "Serving Error: create grpc server failed, gRPC address " << server_address; + return INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: " << server_tag + << " server start failed, create server failed, address " << server_address; } auto grpc_server_run = [this, server_address, server_tag]() { - MSI_LOG(INFO) << server_tag << " start success, listening on " << server_address; - std::cout << "Serving: " << server_tag << " start success, listening on " << server_address << std::endl; + MSI_LOG(INFO) << server_tag << " server start success, listening on " << server_address; + std::cout << "Serving: " << server_tag << " server start success, listening on " << server_address << std::endl; server_->Wait(); }; diff --git a/mindspore_serving/ccsrc/master/restful/restful_server.cc b/mindspore_serving/ccsrc/master/restful/restful_server.cc index 60de127..3bc6fda 100644 --- a/mindspore_serving/ccsrc/master/restful/restful_server.cc +++ b/mindspore_serving/ccsrc/master/restful/restful_server.cc @@ -171,12 +171,14 @@ Status RestfulServer::StartRestfulServer() { event_base_dispatch(event_base_); }; event_thread_ = std::thread(event_http_run); - in_running_ = true; return SUCCESS; } Status RestfulServer::Start(const std::string &ip, uint32_t restful_port, int max_msg_size, int time_out_second) { Status status(SUCCESS); + if (in_running_) { + return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Serving Error: RESTful server is already running"; + } restful_ip_ = ip; restful_port_ = restful_port; @@ -190,6 +192,7 @@ Status RestfulServer::Start(const std::string &ip, uint32_t restful_port, int ma if (status != SUCCESS) { return status; } + in_running_ = true; return status; } diff --git a/mindspore_serving/ccsrc/master/server.cc b/mindspore_serving/ccsrc/master/server.cc index 2e21017..b51782c 100644 --- a/mindspore_serving/ccsrc/master/server.cc +++ b/mindspore_serving/ccsrc/master/server.cc @@ -45,13 +45,12 @@ Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int ma } Status Server::StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port) { - return grpc_manager_server_.Start(std::make_shared(dispatcher_), ip, grpc_port, -1, "Master gRPC"); + return grpc_manager_server_.Start(std::make_shared(dispatcher_), ip, grpc_port, -1, "Master"); } Status Server::StartRestfulServer(const std::string &ip, uint32_t restful_port, int max_msg_mb_size, int time_out_second) { return restful_server_.Start(ip, restful_port, max_msg_mb_size, time_out_second); - // restful_server.Start(http_handler_msg, ip, restful_port, max_msg_mb_size, time_out_second); } void Server::Clear() { dispatcher_->Clear(); } diff --git a/mindspore_serving/master/_master.py b/mindspore_serving/master/_master.py index 21802ac..12d63e7 100644 --- a/mindspore_serving/master/_master.py +++ b/mindspore_serving/master/_master.py @@ -16,6 +16,7 @@ import threading from functools import wraps +from mindspore_serving.worker import check_type from mindspore_serving._mindspore_serving import Master_ _wait_and_clear_thread = None @@ -23,6 +24,7 @@ _wait_and_clear_thread = None # waiting for Ctrl+C, and clear def _start_wait_and_clear(): + """Start thread waiting for catch ctrl+c, and clear env""" def thread_func(): print("Serving master: wait for Ctrl+C to exit ------------------------------------") Master_.wait_and_clear() @@ -34,10 +36,12 @@ def _start_wait_and_clear(): def stop(): + """Stop master""" Master_.stop() def stop_on_except(func): + """mmon wrap clear and exit on Serving exception""" @wraps(func) def handle_except(*args, **kwargs): try: @@ -53,20 +57,32 @@ def stop_on_except(func): def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100): """start grpc server for the communication between client and serving. the ip should be accessible to the client.""" + check_type.check_str('ip', ip) + check_type.check_ip_port('grpc_port', grpc_port) + check_type.check_int('max_msg_mb_size', max_msg_mb_size, 1, 512) + Master_.start_grpc_server(ip, grpc_port, max_msg_mb_size) _start_wait_and_clear() @stop_on_except -def start_master_server(ip="0.0.0.0", grpc_port=6100): +def start_master_server(ip="0.0.0.0", master_port=6100): """start grpc server for the communication between workers and the master. the ip is expected to be accessed only by workers.""" - Master_.start_grpc_master_server(ip, grpc_port) + check_type.check_str('ip', ip) + check_type.check_ip_port('master_port', master_port) + + Master_.start_grpc_master_server(ip, master_port) + _start_wait_and_clear() @stop_on_except def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100): """start restful server for the communication between client and serving. the ip should be accessible to the client.""" + check_type.check_str('ip', ip) + check_type.check_ip_port('restful_port', restful_port) + check_type.check_int('max_msg_mb_size', max_msg_mb_size, 1, 512) + Master_.start_restful_server(ip, restful_port, max_msg_mb_size) _start_wait_and_clear() diff --git a/mindspore_serving/worker/_worker.py b/mindspore_serving/worker/_worker.py index 49a87d5..6352e24 100644 --- a/mindspore_serving/worker/_worker.py +++ b/mindspore_serving/worker/_worker.py @@ -63,7 +63,7 @@ def _load_servable_config(servable_directory, servable_name): @stop_on_except def start_servable(servable_directory, servable_name, version_number=0, device_type=None, device_id=0, - master_ip="0.0.0.0", master_port=6100, host_ip="0.0.0.0", host_port=6200): + master_ip="0.0.0.0", master_port=6100, worker_ip="0.0.0.0", worker_port=6200): r""" Start up the servable named 'servable_name' defined in 'servable_directory', and the servable linked to the master through gRPC (master_ip, master_port). @@ -71,7 +71,7 @@ def start_servable(servable_directory, servable_name, version_number=0, Serving has two running modes. One is running in a single process, providing the Serving service of a single model. The other includes a master and multiple workers. The master is responsible for providing the Serving access interface for client, the worker is responsible for providing the service of the specific model, and the master - and worker communicate through gPRC defined as (master_ip, master_port) and (host_ip, host_port). + and worker communicate through gPRC defined as (master_ip, master_port) and (worker_ip, worker_port). Args: servable_directory (str): The directory where the servable located in, there expected to has a directory named @@ -95,22 +95,22 @@ def start_servable(servable_directory, servable_name, version_number=0, device_id (int): The id of the device the model loads into and runs in. master_ip (str): The master ip the worker linked to. master_port (int): The master port the worker linked to. - host_ip (str): The worker ip the master linked to. - host_port (int): The worker port the master linked to. + worker_ip (str): The worker ip the master linked to. + worker_port (int): The worker port the master linked to. """ - check_type.check_str(servable_directory) - check_type.check_str(servable_name) - check_type.check_int(version_number) + check_type.check_str('servable_directory', servable_directory) + check_type.check_str('servable_name', servable_name) + check_type.check_int('version_number', version_number, 0) if device_type: - check_type.check_str(device_type) - check_type.check_int(device_id) + check_type.check_str('device_type', device_type) + check_type.check_int('device_id', device_id, 0) - check_type.check_str(master_ip) - check_type.check_int(master_port) + check_type.check_str('master_ip', master_ip) + check_type.check_ip_port('master_port', master_port) - check_type.check_str(host_ip) - check_type.check_int(host_port) + check_type.check_str('worker_ip', worker_ip) + check_type.check_ip_port('worker_port', worker_port) _load_servable_config(servable_directory, servable_name) @@ -122,7 +122,7 @@ def start_servable(servable_directory, servable_name, version_number=0, context.set_context(device_id=device_id) Worker_.start_servable(servable_directory, servable_name, version_number, master_ip, master_port, - host_ip, host_port) + worker_ip, worker_port) start_py_task(Worker_.get_batch_size()) _start_wait_and_clear() @@ -161,13 +161,13 @@ def start_servable_in_master(servable_directory, servable_name, version_number=0 Default: None. device_id (int): The id of the device the model loads into and runs in. """ - check_type.check_str(servable_directory) - check_type.check_str(servable_name) - check_type.check_int(version_number) + check_type.check_str('servable_directory', servable_directory) + check_type.check_str('servable_name', servable_name) + check_type.check_int('version_number', version_number, 0) if device_type: - check_type.check_int(device_type) - check_type.check_int(device_id) + check_type.check_str('device_type', device_type) + check_type.check_int('device_id', device_id, 0) _load_servable_config(servable_directory, servable_name) diff --git a/mindspore_serving/worker/check_type.py b/mindspore_serving/worker/check_type.py index 8f05e03..bbb430f 100644 --- a/mindspore_serving/worker/check_type.py +++ b/mindspore_serving/worker/check_type.py @@ -15,7 +15,7 @@ """T check for worker""" -def check_and_as_str_tuple_list(strs): +def check_and_as_str_tuple_list(arg_name, strs): """Check whether the input parameters are reasonable multiple str inputs, which can be single str, tuple or list of str. finally, return tuple of str""" @@ -23,29 +23,46 @@ def check_and_as_str_tuple_list(strs): strs = (strs,) if not isinstance(strs, (tuple, list)): - raise RuntimeError("Check failed, expecting str or tuple/list of str, actually", type(strs)) + raise RuntimeError(f"Parameter '{arg_name}' should be str or tuple/list of str, but actually", type(strs)) if isinstance(strs, (tuple, list)): for item in strs: if not isinstance(item, str): - raise RuntimeError("Check failed, expecting tuple/st to be str, actually", type(item)) + raise RuntimeError(f"The item of parameter '{arg_name}' should be str, but actually", type(item)) + if not item: + raise RuntimeError(f"The item of parameter '{arg_name}' should not be empty str") return tuple(strs) -def check_str(str_val): +def check_str(arg_name, str_val): """Check whether the input parameters are reasonable str input""" if not isinstance(str_val, str): - raise RuntimeError("Check str failed, expecting str, actually", type(str_val)) + raise RuntimeError(f"Parameter '{arg_name}' should be str, but actually", type(str_val)) + if not str_val: + raise RuntimeError(f"Parameter '{arg_name}' should not be empty str") -def check_bool(bool_val): +def check_bool(arg_name, bool_val): """Check whether the input parameters are reasonable bool input""" if not isinstance(bool_val, bool): - raise RuntimeError("Check bool failed, expecting bool, actually", type(bool_val)) + raise RuntimeError(f"Parameter '{arg_name}' should be bool, but actually", type(bool_val)) -def check_int(int_val): +def check_int(arg_name, int_val, mininum=None, maximum=None): """Check whether the input parameters are reasonable int input""" if not isinstance(int_val, int): - raise RuntimeError("Check failed, expecting int, actually", {type(int_val)}) + raise RuntimeError(f"Parameter '{arg_name}' should be int, but actually", type(int_val)) + if mininum is not None and int_val < mininum: + if maximum is not None: + raise RuntimeError(f"Parameter '{arg_name}' should be in range [{mininum},{maximum}]") + raise RuntimeError(f"Parameter '{arg_name}' should be >= {mininum}") + if maximum is not None and int_val > maximum: + if mininum is not None: + raise RuntimeError(f"Parameter '{arg_name}' should be in range [{mininum},{maximum}]") + raise RuntimeError(f"Parameter '{arg_name}' should be <= {maximum}") + + +def check_ip_port(arg_name, port): + """Check whether the input parameters are reasonable ip port""" + check_int(arg_name, port, 0, 65535) diff --git a/mindspore_serving/worker/register/method.py b/mindspore_serving/worker/register/method.py index 87b156c..65b74cf 100644 --- a/mindspore_serving/worker/register/method.py +++ b/mindspore_serving/worker/register/method.py @@ -34,22 +34,26 @@ method_tag_postprocess = PredictPhaseTag_.kPredictPhaseTag_Postprocess class _ServableStorage: + """Declare servable info""" def __init__(self): self.methods = {} self.servable_metas = {} self.storage = ServableStorage_.get_instance() def declare_servable(self, servable_meta): + """Declare servable info excluding method, input and output count""" self.storage.declare_servable(servable_meta) self.servable_metas[servable_meta.servable_name] = servable_meta def declare_servable_input_output(self, servable_name, inputs_count, outputs_count): + """Declare input and output count of servable""" self.storage.register_servable_input_output_info(servable_name, inputs_count, outputs_count) servable_meta = self.servable_metas[servable_name] servable_meta.inputs_count = inputs_count servable_meta.outputs_count = outputs_count def register_method(self, method_signature): + """Declare method of servable""" self.storage.register_method(method_signature) self.methods[method_signature.method_name] = method_signature @@ -70,6 +74,7 @@ _servable_storage = _ServableStorage() class _TensorDef: + """Data flow item, for definitions of data flow in a method""" def __init__(self, tag, tensor_index): self.tag = tag self.tensor_index = tensor_index @@ -79,6 +84,7 @@ class _TensorDef: def _create_tensor_def_outputs(tag, outputs_cnt): + """Create data flow item for output""" result = [_TensorDef(tag, i) for i in range(outputs_cnt)] if len(result) == 1: return result[0] @@ -98,7 +104,7 @@ def call_preprocess(preprocess_fun, *args): preprocess_name = preprocess_fun if inspect.isfunction(preprocess_fun): - register_preprocess(inputs_count=inputs_count, outputs_count=outputs_count)(preprocess_fun) + register_preprocess(preprocess_fun, inputs_count=inputs_count, outputs_count=outputs_count) preprocess_name = get_servable_dir() + "." + get_func_name(preprocess_fun) else: if not isinstance(preprocess_name, str): @@ -144,7 +150,7 @@ def call_postprocess(postprocess_fun, *args): postprocess_name = postprocess_fun if inspect.isfunction(postprocess_fun): - register_postprocess(inputs_count=inputs_count, outputs_count=outputs_count)(postprocess_fun) + register_postprocess(postprocess_fun, inputs_count=inputs_count, outputs_count=outputs_count) postprocess_name = get_servable_dir() + "." + get_func_name(postprocess_fun) else: if not isinstance(postprocess_name, str): @@ -165,6 +171,7 @@ _call_postprocess_name = call_postprocess.__name__ def _get_method_def_func_meta(method_def_func): + """Parse register_method func, and get the input and output count of preproces, servable and postprocess""" source = inspect.getsource(method_def_func) call_list = ast.parse(source).body[0].body func_meta = EasyDict() @@ -212,17 +219,17 @@ def _get_method_def_func_meta(method_def_func): def register_method(output_names): """register method for servable. - Define the data flow of preprocess, model inference and postprocess in the method. - Preprocess and postprocess are optional. - Example: - @register_method(output_names="y") - def method_name(x1, x2): + Define the data flow of preprocess, model inference and postprocess in the method. + Preprocess and postprocess are optional. + Example: + @register_method(output_names="y") + def method_name(x1, x2): x1, x2 = call_preprocess(preprocess_fun, x1, x2) y = call_servable(y) y = call_postprocess(postprocess_fun, y) return y """ - output_names = check_type.check_and_as_str_tuple_list(output_names) + output_names = check_type.check_and_as_str_tuple_list('output_names', output_names) def register(func): name = get_func_name(func) diff --git a/mindspore_serving/worker/register/postprocess.py b/mindspore_serving/worker/register/postprocess.py index 3d624a0..11dca91 100644 --- a/mindspore_serving/worker/register/postprocess.py +++ b/mindspore_serving/worker/register/postprocess.py @@ -15,7 +15,6 @@ """Postprocessing registration interface""" from mindspore_serving._mindspore_serving import PostprocessStorage_ -from mindspore_serving.worker import check_type from mindspore_serving.worker.common import get_servable_dir, get_func_name @@ -33,6 +32,7 @@ def check_postprocess(postprocess_name, inputs_count, outputs_count): class PostprocessStorage: + """Register and get postprocess info, postprocess info include: func, name, input and output count""" def __init__(self): self.postprocess = {} self.storage = PostprocessStorage_.get_instance() @@ -52,21 +52,11 @@ class PostprocessStorage: postprocess_storage = PostprocessStorage() -def register_postprocess(inputs_count, outputs_count): - """register postprocess, input_names and output_names can be str, tuple or list of str. - For input_names and output_names, serving only consider the number of names contained in them, - which should be consistent with the number of input and output used in register_method, - and the specific names content are ignored.""" - check_type.check_int(inputs_count) - check_type.check_int(outputs_count) +def register_postprocess(func, inputs_count, outputs_count): + """register postprocess""" + servable_name = get_servable_dir() + func_name = get_func_name(func) + name = servable_name + "." + func_name - def register(func): - servable_name = get_servable_dir() - func_name = get_func_name(func) - name = servable_name + "." + func_name - - print("------------Register postprocess", name, inputs_count, outputs_count) - postprocess_storage.register(func, name, inputs_count, outputs_count) - return func - - return register + print("------------Register postprocess", name, inputs_count, outputs_count) + postprocess_storage.register(func, name, inputs_count, outputs_count) diff --git a/mindspore_serving/worker/register/preprocess.py b/mindspore_serving/worker/register/preprocess.py index 570f6b2..263e0b8 100644 --- a/mindspore_serving/worker/register/preprocess.py +++ b/mindspore_serving/worker/register/preprocess.py @@ -15,7 +15,6 @@ """Preprocessing registration interface""" from mindspore_serving._mindspore_serving import PreprocessStorage_ -from mindspore_serving.worker import check_type from mindspore_serving.worker.common import get_servable_dir, get_func_name @@ -33,6 +32,7 @@ def check_preprocess(preprocess_name, inputs_count, outputs_count): class PreprocessStorage: + """Register and get preprocess info, preprocess info include: func, name, input and output count""" def __init__(self): self.preprocess = {} self.storage = PreprocessStorage_.get_instance() @@ -52,21 +52,11 @@ class PreprocessStorage: preprocess_storage = PreprocessStorage() -def register_preprocess(inputs_count, outputs_count): - """register preprocess, input_names and output_names can be str, tuple or list of str. - For input_names and output_names, serving only consider the number of names contained in them, - which should be consistent with the number of input and output used in register_method, - and the specific names content are ignored.""" - check_type.check_int(inputs_count) - check_type.check_int(outputs_count) +def register_preprocess(func, inputs_count, outputs_count): + """register preprocess""" + servable_name = get_servable_dir() + func_name = get_func_name(func) + name = servable_name + "." + func_name - def register(func): - servable_name = get_servable_dir() - func_name = get_func_name(func) - name = servable_name + "." + func_name - - print("------------Register preprocess", name, inputs_count, outputs_count) - preprocess_storage.register(func, name, inputs_count, outputs_count) - return func - - return register + print("------------Register preprocess", name, inputs_count, outputs_count) + preprocess_storage.register(func, name, inputs_count, outputs_count) diff --git a/mindspore_serving/worker/register/servable.py b/mindspore_serving/worker/register/servable.py index 910c16c..68e7054 100644 --- a/mindspore_serving/worker/register/servable.py +++ b/mindspore_serving/worker/register/servable.py @@ -29,9 +29,9 @@ def declare_servable(servable_file, model_format, with_batch_dim=True): which should be consistent with the number of input and output used in register_method and the number of input and output of the model. The specific names content are ignored.""" - check_type.check_str(servable_file) - check_type.check_str(model_format) - check_type.check_bool(with_batch_dim) + check_type.check_str('servable_file', servable_file) + check_type.check_str('model_format', model_format) + check_type.check_bool('with_batch_dim', with_batch_dim) model_format = model_format.lower() if model_format not in ("om", "mindir"):