diff --git a/example/resnet/client.py b/example/resnet/client.py index dd29e4f..4827be3 100644 --- a/example/resnet/client.py +++ b/example/resnet/client.py @@ -31,6 +31,7 @@ def read_images(): def run_classify_top1(): """Client for servable resnet50 and method classify_top1""" + print("run_classify_top1-----------") client = Client("localhost", 5500, "resnet50", "classify_top1") instances = [] for image in read_images(): @@ -41,6 +42,7 @@ def run_classify_top1(): def run_classify_top1_v1(): """Client for servable resnet50 and method classify_top1_v1""" + print("run_classify_top1_v1-----------") client = Client("localhost", 5500, "resnet50", "classify_top1_v1") instances = [] for image in read_images(): @@ -51,21 +53,44 @@ def run_classify_top1_v1(): def run_classify_top5(): """Client for servable resnet50 and method classify_top5""" + print("run_classify_top5-----------") client = Client("localhost", 5500, "resnet50", "classify_top5") instances = [] for image in read_images(): # read multi image instances.append({"image": image}) # input `image` + result = client.infer(instances) + + print(result) + for result_item in result: # result for every image + label = result_item["label"] # result `label` + score = result_item["score"] # result `score` + print("label result:", label) + print("score result:", score) + + +def run_classify_top5_async(): + """Client for servable resnet50 and method classify_top5""" + print("run_classify_top5_async-----------") + client = Client("localhost", 5500, "resnet50", "classify_top5") + instances = [] + for image in read_images(): # read multi image + instances.append({"image": image}) # input `image` + + result_future = client.infer_async(instances) + result = result_future.result() + print(result) for result_item in result: # result for every image label = result_item["label"] # result `label` score = result_item["score"] # result `score` - print("label result", label) - print("score result", score) + print("label result:", label) + print("score result:", score) def run_restful_classify_top1(): """RESTful Client for servable resnet50 and method classify_top1""" + print("run_restful_classify_top1-----------") import base64 import requests import json @@ -88,3 +113,4 @@ if __name__ == '__main__': run_classify_top1_v1() run_classify_top5() run_restful_classify_top1() + run_classify_top5_async() diff --git a/mindspore_serving/ccsrc/common/proto_tensor.cc b/mindspore_serving/ccsrc/common/proto_tensor.cc index a0f3429..593f3b0 100644 --- a/mindspore_serving/ccsrc/common/proto_tensor.cc +++ b/mindspore_serving/ccsrc/common/proto_tensor.cc @@ -244,13 +244,14 @@ Status GrpcTensorHelper::CreateInstanceFromRequest(const proto::PredictRequest & << "Method " << method_name << " is not registed for servable " << servable_name; } - // instance - if (request.instances_size() > 0) { - status = CreateInstanceFromRequestInstances(request, method_signature.inputs, results); - if (status != SUCCESS) { - MSI_LOG_ERROR << "Create instances from request instances failed"; - return status; - } + if (request.instances_size() == 0) { + return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) + << "Instances count of request cannot be 0, servable: " << servable_name << ", method: " << method_name; + } + status = CreateInstanceFromRequestInstances(request, method_signature.inputs, results); + if (status != SUCCESS) { + MSI_LOG_ERROR << "Create instances from request instances failed"; + return status; } return SUCCESS; } @@ -273,12 +274,12 @@ Status GrpcTensorHelper::CreateReplyFromInstances(const proto::PredictRequest &r *reply->mutable_servable_spec() = request.servable_spec(); size_t err_cnt = 0; - for (auto &output_intance : outputs) { - if (output_intance.error_msg != SUCCESS) { + for (auto &output_instance : outputs) { + if (output_instance.error_msg != SUCCESS) { err_cnt++; - } else if (output_intance.data.size() != method_signature.outputs.size()) { + } else if (output_instance.data.size() != method_signature.outputs.size()) { return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) - << "Result data tensor size " << output_intance.data.size() << " not equal outputs size " + << "Result data tensor size " << output_instance.data.size() << " not equal outputs size " << method_signature.outputs.size() << "defined in method signature"; } } @@ -294,19 +295,17 @@ Status GrpcTensorHelper::CreateReplyFromInstances(const proto::PredictRequest &r } } // create instance reply, same with request - if (request.instances_size() > 0) { - for (auto &output_intance : outputs) { - auto proto_instance = reply->add_instances(); - if (output_intance.data.empty()) { - continue; - } - auto proto_items = proto_instance->mutable_items(); - for (size_t i = 0; i < method_signature.outputs.size(); i++) { - auto &output_tensor = output_intance.data[i]; - auto &proto_tensor = (*proto_items)[method_signature.outputs[i]]; - ProtoTensor result_tensor(&proto_tensor); - result_tensor.assgin(*output_tensor); - } + for (auto &output_intance : outputs) { + auto proto_instance = reply->add_instances(); + if (output_intance.data.empty()) { + continue; + } + auto proto_items = proto_instance->mutable_items(); + for (size_t i = 0; i < method_signature.outputs.size(); i++) { + auto &output_tensor = output_intance.data[i]; + auto &proto_tensor = (*proto_items)[method_signature.outputs[i]]; + ProtoTensor result_tensor(&proto_tensor); + result_tensor.assgin(*output_tensor); } } return SUCCESS; @@ -328,7 +327,7 @@ Status GrpcTensorHelper::CreateInstanceFromRequestInstances(const proto::Predict << "Cannot find input " << input_name << " in instance input , servable " << servable_name << ", method " << method_name; } - status = CheckRequestTensor(it->second, true, 1); + status = CheckRequestTensor(it->second); if (status != SUCCESS) { auto status2 = INFER_STATUS(INVALID_INPUTS) << "Instances input " << input_name << " check failed"; MSI_LOG_ERROR << status2.StatusMessage(); @@ -342,33 +341,18 @@ Status GrpcTensorHelper::CreateInstanceFromRequestInstances(const proto::Predict return SUCCESS; } -Status GrpcTensorHelper::CheckRequestTensor(const proto::Tensor &tensor, bool is_instance_tensor, uint32_t batch_size) { +Status GrpcTensorHelper::CheckRequestTensor(const proto::Tensor &tensor) { Status status; ProtoTensor tensor_input(const_cast(&tensor)); auto shape = tensor_input.shape(); if (tensor.dtype() == proto::MS_BYTES || tensor.dtype() == proto::MS_STRING) { - if (is_instance_tensor) { - if (tensor.bytes_val_size() != 1) { - return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) - << "Instance tensor check failed: bytes or string type shape batch size " << batch_size - << " not equal to bytes val size " << tensor.bytes_val_size(); - } - if (!(shape.size() == 1 && shape[0] == 1) && !shape.empty()) { - return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) - << "Instance tensor check failed: bytes or string type input " - << " shape can only be (1,) or empty, but given shape is " << shape; - } - } else { - if (static_cast(tensor.bytes_val_size()) != batch_size) { - return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) - << "Inputs tensor check failed: bytes or string type shape batch size " << batch_size - << " not equal to bytes val size " << tensor.bytes_val_size(); - } - if (shape.size() != 1) { - return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) - << "Inputs Tensor check failed: bytes or string type input " - << " shape can only be (batch_size,), but given shape is " << shape; - } + if (tensor.bytes_val_size() != 1) { + return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) + << "Instance tensor check failed: bytes or string type shape batch size can only be 1"; + } + if (!(shape.size() == 1 && shape[0] == 1) && !shape.empty()) { + return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) << "Instance tensor check failed: bytes or string type input " + << " shape can only be (1,) or empty, but given shape is " << shape; } } else { size_t element_num = tensor_input.element_cnt(); diff --git a/mindspore_serving/ccsrc/common/proto_tensor.h b/mindspore_serving/ccsrc/common/proto_tensor.h index 1d89a9a..554da02 100644 --- a/mindspore_serving/ccsrc/common/proto_tensor.h +++ b/mindspore_serving/ccsrc/common/proto_tensor.h @@ -73,7 +73,7 @@ class MS_API GrpcTensorHelper { static Status CreateInstanceFromRequestInstances(const proto::PredictRequest &request, const std::vector &input_names, std::vector *results); - static Status CheckRequestTensor(const proto::Tensor &tensor, bool is_instance_tensor, uint32_t batch_size); + static Status CheckRequestTensor(const proto::Tensor &tensor); }; extern MS_API LogStream &operator<<(serving::LogStream &stream, proto::DataType data_type); diff --git a/mindspore_serving/ccsrc/master/restful/http_process.cc b/mindspore_serving/ccsrc/master/restful/http_process.cc index da79c30..053157b 100644 --- a/mindspore_serving/ccsrc/master/restful/http_process.cc +++ b/mindspore_serving/ccsrc/master/restful/http_process.cc @@ -242,9 +242,8 @@ DataType RestfulService::GetArrayDataType(const json &json_array, HTTP_DATA_TYPE Status RestfulService::CheckReqJsonValid(const json &js_msg) { int count = 0; - for (size_t i = 0; i < request_type_list_.size(); i++) { - std::string item = request_type_list_.at(i); - auto it = js_msg.find(item.c_str()); + for (auto &item : request_type_list_) { + auto it = js_msg.find(item); if (it != js_msg.end()) { count++; auto request_type = GetReqType(item); @@ -257,7 +256,8 @@ Status RestfulService::CheckReqJsonValid(const json &js_msg) { } if (count != 1) { - return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) << "key 'instances' should exit and only exit one time"; + return INFER_STATUS_LOG_ERROR(INVALID_INPUTS) + << "key 'instances' expects to exist once, but actually " << count << " times"; } return SUCCESS; } diff --git a/mindspore_serving/ccsrc/worker/work_executor.cc b/mindspore_serving/ccsrc/worker/work_executor.cc index 4d8eab1..5cc095a 100644 --- a/mindspore_serving/ccsrc/worker/work_executor.cc +++ b/mindspore_serving/ccsrc/worker/work_executor.cc @@ -60,11 +60,11 @@ Status WorkExecutor::CheckSevableSignature() { << "The outputs count " << servable_declare_.servable_meta.outputs_count << " registered in method not equal to the count " << output_infos.size() << " defined in servable"; } - MSI_LOG_INFO << "Model input infos:"; + MSI_LOG_INFO << "Model input infos: count " << input_infos.size(); for (auto &item : input_infos) { MSI_LOG_INFO << item.shape << ", " << item.data_type << ", " << item.size; } - MSI_LOG_INFO << "Model output infos:"; + MSI_LOG_INFO << "Model output infos: count " << output_infos.size(); for (auto &item : output_infos) { MSI_LOG_INFO << item.shape << ", " << item.data_type << ", " << item.size; } diff --git a/mindspore_serving/client/python/client.py b/mindspore_serving/client/python/client.py index 91f8d36..a1dd36c 100644 --- a/mindspore_serving/client/python/client.py +++ b/mindspore_serving/client/python/client.py @@ -191,24 +191,28 @@ class Client: def infer(self, instances): """ - Used to create requests, access serving, and parse results. + Used to create requests, access serving service, and parse and return results. Args: instances (map, tuple of map): Instance or tuple of instances, every instance item is the inputs map. - The map key is the input name, and the value is the input value. + The map key is the input name, and the value is the input value, the type of value can be python int, + float, bool, str, bytes, numpy number, or numpy array object. Raises: RuntimeError: The type or value of the parameters is invalid, or other errors happened. - """ - if not isinstance(instances, (tuple, list)): - instances = (instances,) - request = self._create_request() - for item in instances: - if isinstance(item, dict): - request.instances.append(self._create_instance(**item)) - else: - raise RuntimeError("instance should be a map") + Examples: + >>> from mindspore_serving.client import Client + >>> import numpy as np + >>> client = Client("localhost", 5500, "add", "add_cast") + >>> instances = [] + >>> x1 = np.ones((2, 2), np.int32) + >>> x2 = np.ones((2, 2), np.int32) + >>> instances.append({"x1": x1, "x2": x2}) + >>> result = client.infer(instances) + >>> print(result) + """ + request = self._create_request(instances) try: result = self.stub.Predict(request) return self._paser_result(result) @@ -220,15 +224,60 @@ class Client: print(status_code.value) return {"error": "Grpc Error, " + str(status_code.value)} - def _create_request(self): + def infer_async(self, instances): + """ + Used to create requests, async access serving. + + Args: + instances (map, tuple of map): Instance or tuple of instances, every instance item is the inputs map. + The map key is the input name, and the value is the input value. + + Raises: + RuntimeError: The type or value of the parameters is invalid, or other errors happened. + + Examples: + >>> from mindspore_serving.client import Client + >>> import numpy as np + >>> client = Client("localhost", 5500, "add", "add_cast") + >>> instances = [] + >>> x1 = np.ones((2, 2), np.int32) + >>> x2 = np.ones((2, 2), np.int32) + >>> instances.append({"x1": x1, "x2": x2}) + >>> result_future = client.infer_async(instances) + >>> result = result_future.result() + >>> print(result) + """ + request = self._create_request(instances) + try: + result_future = self.stub.Predict.future(request) + return ClientGrpcAsyncResult(result_future) + + except grpc.RpcError as e: + print(e.details()) + status_code = e.code() + print(status_code.name) + print(status_code.value) + return ClientGrpcAsyncError({"error": "Grpc Error, " + str(status_code.value)}) + + def _create_request(self, instances): """Used to create request spec.""" + if not isinstance(instances, (tuple, list)): + instances = (instances,) + request = ms_service_pb2.PredictRequest() request.servable_spec.name = self.servable_name request.servable_spec.method_name = self.method_name request.servable_spec.version_number = self.version_number + + for item in instances: + if isinstance(item, dict): + request.instances.append(self._create_instance(**item)) + else: + raise RuntimeError("instance should be a map") return request - def _create_instance(self, **kwargs): + @staticmethod + def _create_instance(**kwargs): """Used to create gRPC instance.""" instance = ms_service_pb2.Instance() for k, w in kwargs.items(): @@ -245,7 +294,8 @@ class Client: raise RuntimeError("Not support value type " + str(type(w))) return instance - def _paser_result(self, result): + @staticmethod + def _paser_result(result): """Used to parse result.""" error_msg_len = len(result.error_msg) if error_msg_len == 1: @@ -265,3 +315,47 @@ class Client: else: ret_val.append({"error": bytes.decode(result.error_msg[i].error_msg)}) return ret_val + + +class ClientGrpcAsyncResult: + """ + When Client.infer_async invoke sucessfully, a ClientGrpcAsyncResult object is returned. + + Examples: + >>> from mindspore_serving.client import Client + >>> import numpy as np + >>> client = Client("localhost", 5500, "add", "add_cast") + >>> instances = [] + >>> x1 = np.ones((2, 2), np.int32) + >>> x2 = np.ones((2, 2), np.int32) + >>> instances.append({"x1": x1, "x2": x2}) + >>> result_future = client.infer_async(instances) + >>> result = result_future.result() + >>> print(result) + """ + + def __init__(self, result_future): + self.result_future = result_future + + def result(self): + """Wait and get result of inference result, the gRPC message will be parse to tuple of instances result. + Every instance result is dict, and value could be numpy array/number, str or bytes according gRPC Tensor + data type. + """ + result = self.result_future.result() + # pylint: disable=protected-access + result = Client._paser_result(result) + return result + + +class ClientGrpcAsyncError: + """When gRPC failed happened when calling Client.infer_async, a ClientGrpcAsyncError object is returned. + """ + + def __init__(self, result_error): + self.result_error = result_error + + def result(self): + """Get gRPC error message. + """ + return self.result_error diff --git a/mindspore_serving/worker/register/method.py b/mindspore_serving/worker/register/method.py index 2f94520..d9f9b93 100644 --- a/mindspore_serving/worker/register/method.py +++ b/mindspore_serving/worker/register/method.py @@ -118,6 +118,14 @@ def _wrap_fun_to_pipeline(fun, input_count): def call_preprocess_pipeline(preprocess_fun, *args): r"""For method registration, define the preprocessing pipeline function and its' parameters. + A single request can include multiple instances, and multiple queued requests will also have multiple instances. + If you need to process multiple instances through multi thread or other parallel processing capability + in `preprocess` or `postprocess`, such as using MindData concurrency ability to process multiple input + images in `preprocess`, MindSpore Serving provides 'call_preprocess_pipeline' and 'call_pstprocess_pipeline' + to register such preprocessing and postprocessing. For more detail, + please refer to [Resnet50 model configuration example] + `_ . + Args: preprocess_fun (function): Python pipeline function for preprocess. args: Preprocess inputs. The length of 'args' should equal to the input parameters number @@ -129,14 +137,17 @@ def call_preprocess_pipeline(preprocess_fun, *args): Examples: >>> from mindspore_serving.worker import register >>> import numpy as np - >>> def add_trans_datatype(x1, x2): - ... return x1.astype(np.float32), x2.astype(np.float32) + >>> def add_trans_datatype(instances): + ... for instance in instances: + ... x1 = instance[0] + ... x2 = instance[0] + ... yield x1.astype(np.float32), x2.astype(np.float32) >>> >>> register.declare_servable(servable_file="tensor_add.mindir", model_format="MindIR", with_batch_dim=False) >>> >>> @register.register_method(output_names=["y"]) # register add_cast method in add >>> def add_cast(x1, x2): - ... x1, x2 = register.call_preprocess(add_trans_datatype, x1, x2) # cast input to float32 + ... x1, x2 = register.call_preprocess_pipeline(add_trans_datatype, x1, x2) # cast input to float32 ... y = register.call_servable(x1, x2) ... return y """ @@ -275,6 +286,14 @@ def call_servable(*args): def call_postprocess_pipeline(postprocess_fun, *args): r"""For method registration, define the postprocessing pipeline function and its' parameters. + A single request can include multiple instances, and multiple queued requests will also have multiple instances. + If you need to process multiple instances through multi thread or other parallel processing capability + in `preprocess` or `postprocess`, such as using MindData concurrency ability to process multiple input + images in `preprocess`, MindSpore Serving provides 'call_preprocess_pipeline' and 'call_pstprocess_pipeline' + to register such preprocessing and postprocessing. For more detail, + please refer to [Resnet50 model configuration example] + `_ . + Args: postprocess_fun (function): Python pipeline function for postprocess. args: Preprocess inputs. The length of 'args' should equal to the input parameters number @@ -460,7 +479,7 @@ def register_method(output_names): output_tensors = (output_tensors,) if len(output_tensors) != len(output_names): raise RuntimeError( - f"Method return output size {len(output_tensors)} not match registed {len(output_names)}") + f"Method return output size {len(output_tensors)} not match registered {len(output_names)}") method_def_context_.returns = [item.as_pair() for item in output_tensors] logger.info(f"Register method: method_name {method_def_context_.method_name} " diff --git a/third_party/mindspore b/third_party/mindspore index 7a3f342..0662766 160000 --- a/third_party/mindspore +++ b/third_party/mindspore @@ -1 +1 @@ -Subproject commit 7a3f34247db5a6c7a00b00b16d2bfa2ff94e0b08 +Subproject commit 06627661f43fb07761f3956e1a8b2c007c4e7987