Browse Source

!195 Add master context: max request buffer count

From: @xu-yfei
Reviewed-by: 
Signed-off-by:
tags/v1.2.0
mindspore-ci-bot Gitee 5 years ago
parent
commit
5388caac44
27 changed files with 339 additions and 154 deletions
  1. +1
    -0
      cmake/package.cmake
  2. +2
    -2
      mindspore_serving/ccsrc/common/log.h
  3. +5
    -7
      mindspore_serving/ccsrc/master/dispacther.cc
  4. +0
    -2
      mindspore_serving/ccsrc/master/dispacther.h
  5. +35
    -0
      mindspore_serving/ccsrc/master/master_context.cc
  6. +40
    -0
      mindspore_serving/ccsrc/master/master_context.h
  7. +2
    -4
      mindspore_serving/ccsrc/master/server.cc
  8. +2
    -3
      mindspore_serving/ccsrc/master/server.h
  9. +4
    -5
      mindspore_serving/ccsrc/python/master/master_py.cc
  10. +2
    -4
      mindspore_serving/ccsrc/python/master/master_py.h
  11. +7
    -1
      mindspore_serving/ccsrc/python/serving_py.cc
  12. +15
    -0
      mindspore_serving/common/__init__.py
  13. +8
    -8
      mindspore_serving/common/check_type.py
  14. +3
    -1
      mindspore_serving/master/__init__.py
  15. +5
    -11
      mindspore_serving/master/_master.py
  16. +34
    -0
      mindspore_serving/master/context.py
  17. +21
    -16
      mindspore_serving/worker/_worker.py
  18. +0
    -64
      mindspore_serving/worker/context.py
  19. +12
    -9
      mindspore_serving/worker/distributed/agent_startup.py
  20. +1
    -1
      mindspore_serving/worker/distributed/distributed_worker.py
  21. +1
    -1
      mindspore_serving/worker/distributed/register.py
  22. +1
    -1
      mindspore_serving/worker/register/method.py
  23. +1
    -1
      mindspore_serving/worker/register/servable.py
  24. +1
    -0
      tests/ut/python/runtest.sh
  25. +1
    -0
      tests/ut/python/tests/common.py
  26. +38
    -10
      tests/ut/python/tests/test_distributed_worker.py
  27. +97
    -3
      tests/ut/python/tests/test_mater_worker_client.py

+ 1
- 0
cmake/package.cmake View File

@@ -63,6 +63,7 @@ install(
DIRECTORY
${CMAKE_SOURCE_DIR}/mindspore_serving/master
${CMAKE_SOURCE_DIR}/mindspore_serving/worker
${CMAKE_SOURCE_DIR}/mindspore_serving/common
${CMAKE_SOURCE_DIR}/mindspore_serving/client
DESTINATION ${INSTALL_PY_DIR}
COMPONENT mindspore_serving


+ 2
- 2
mindspore_serving/ccsrc/common/log.h View File

@@ -130,8 +130,8 @@ class MS_API LogWriter {

std::string GetOutputMsg(const std::ostringstream &msg) const {
std::string msg_str = msg.str();
constexpr int max_log_size = 256;
constexpr int msg_log_start_size = 128;
constexpr int max_log_size = 384;
constexpr int msg_log_start_size = 192;
if (msg_str.length() > max_log_size) {
msg_str = msg_str.substr(0, msg_log_start_size) + "..." + msg_str.substr(msg_str.length() - msg_log_start_size);
}


+ 5
- 7
mindspore_serving/ccsrc/master/dispacther.cc View File

@@ -18,6 +18,7 @@

#include <utility>
#include "common/proto_tensor.h"
#include "master/master_context.h"
#include "master/notify_worker/grpc_notify.h"
#include "master/notify_worker/local_notify.h"

@@ -54,14 +55,11 @@ DispatcherWorkerContext Dispatcher::GetWorkSession(const RequestSpec &request_sp
}
return context;
}
void Dispatcher::SetMaxInferNum(uint32_t max_infer_num) {
if (max_infer_num != 0) {
max_infer_num_ = max_infer_num;
}
}

Status Dispatcher::JudgeInferNum() {
if (infer_num_ >= max_infer_num_) {
return INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: infer number exceeds the limit " << max_infer_num_;
auto max_infer_num = MasterContext::Instance()->GetMaxRequestBufferCount();
if (infer_num_ >= max_infer_num) {
return INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: request buffer number exceeds the limit " << max_infer_num;
}
return SUCCESS;
}


+ 0
- 2
mindspore_serving/ccsrc/master/dispacther.h View File

@@ -30,7 +30,6 @@
#include "common/grpc_client.h"

namespace mindspore::serving {
constexpr uint32_t g_max_infer_num_ = 10000;
struct DispatcherWorkerContext {
WorkerSpec worker_spec;
std::shared_ptr<BaseNotifyWorker> notify_worker_ = nullptr;
@@ -63,7 +62,6 @@ class MS_API Dispatcher {
// avoid invoke Clear and then UnregisterServable is invoked by Clear in other thread
std::atomic_bool clearing_flag = false;
std::atomic_uint32_t infer_num_ = 0;
uint32_t max_infer_num_ = g_max_infer_num_;

Status JudgeInferNum();
DispatcherWorkerContext GetWorkSession(const RequestSpec &request_spec) const;


+ 35
- 0
mindspore_serving/ccsrc/master/master_context.cc View File

@@ -0,0 +1,35 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "master/master_context.h"

namespace mindspore::serving {

std::shared_ptr<MasterContext> MasterContext::Instance() {
static std::shared_ptr<MasterContext> instance;
if (instance == nullptr) {
instance = std::make_shared<MasterContext>();
}
return instance;
}

void MasterContext::SetMaxRequestBufferCount(uint32_t max_request_buffer_count) {
max_request_buffer_count_ = max_request_buffer_count;
}

uint32_t MasterContext::GetMaxRequestBufferCount() const { return max_request_buffer_count_; }

} // namespace mindspore::serving

+ 40
- 0
mindspore_serving/ccsrc/master/master_context.h View File

@@ -0,0 +1,40 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_SERVING_MASTER_CONTEXT_H
#define MINDSPORE_SERVING_MASTER_CONTEXT_H

#include <string>
#include <memory>
#include <vector>
#include "common/serving_common.h"

namespace mindspore::serving {

class MS_API MasterContext {
public:
static std::shared_ptr<MasterContext> Instance();

void SetMaxRequestBufferCount(uint32_t max_request_buffer_count);
uint32_t GetMaxRequestBufferCount() const;

private:
uint32_t max_request_buffer_count_ = 10000; // default 10000
};

} // namespace mindspore::serving

#endif // MINDSPORE_SERVING_MASTER_CONTEXT_H

+ 2
- 4
mindspore_serving/ccsrc/master/server.cc View File

@@ -35,7 +35,7 @@
namespace mindspore {
namespace serving {

Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size, uint32_t max_infer_num) {
Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size) {
if (grpc_async_server_ != nullptr) {
return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Serving Error: Serving gRPC server is already running";
}
@@ -45,7 +45,6 @@ Status Server::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int ma
max_msg_mb_size = gRpcMaxMBMsgSize;
}
grpc_async_server_ = std::make_unique<MSServiceServer>(std::make_shared<MSServiceImpl>(dispatcher_), ip, grpc_port);
dispatcher_->SetMaxInferNum(max_infer_num);
return grpc_async_server_->Init(max_msg_mb_size);
}

@@ -56,8 +55,7 @@ Status Server::StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port)
}

Status Server::StartRestfulServer(const std::string &ip, uint32_t restful_port, int max_msg_mb_size,
uint32_t max_infer_num, int time_out_second) {
dispatcher_->SetMaxInferNum(max_infer_num);
int time_out_second) {
return restful_server_.Start(ip, restful_port, max_msg_mb_size, time_out_second);
}



+ 2
- 3
mindspore_serving/ccsrc/master/server.h View File

@@ -30,11 +30,10 @@ class MS_API Server {
public:
Server();
~Server();
Status StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = gRpcDefaultMsgMBSize,
uint32_t max_infer_num = g_max_infer_num_);
Status StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = gRpcDefaultMsgMBSize);
Status StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port);
Status StartRestfulServer(const std::string &ip, uint32_t restful_port, int max_msg_mb_size = gRpcDefaultMsgMBSize,
uint32_t max_infer_num = g_max_infer_num_, int time_out_second = 100);
int time_out_second = 100);
void Clear();

std::shared_ptr<Dispatcher> GetDispatcher() { return dispatcher_; }


+ 4
- 5
mindspore_serving/ccsrc/python/master/master_py.cc View File

@@ -20,8 +20,8 @@

namespace mindspore::serving {

void PyMaster::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size, uint32_t max_infer_num) {
auto status = Server::Instance().StartGrpcServer(ip, grpc_port, max_msg_mb_size, max_infer_num);
void PyMaster::StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size) {
auto status = Server::Instance().StartGrpcServer(ip, grpc_port, max_msg_mb_size);
if (status != SUCCESS) {
MSI_LOG_EXCEPTION << "Raise failed: " << status.StatusMessage();
}
@@ -34,9 +34,8 @@ void PyMaster::StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port)
}
}

void PyMaster::StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size,
uint32_t max_infer_num) {
auto status = Server::Instance().StartRestfulServer(ip, grpc_port, max_msg_mb_size, max_infer_num);
void PyMaster::StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size) {
auto status = Server::Instance().StartRestfulServer(ip, grpc_port, max_msg_mb_size);
if (status != SUCCESS) {
MSI_LOG_EXCEPTION << "Raise failed: " << status.StatusMessage();
}


+ 2
- 4
mindspore_serving/ccsrc/python/master/master_py.h View File

@@ -36,11 +36,9 @@ namespace serving {

class MS_API PyMaster {
public:
static void StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100,
uint32_t max_infer_num = 10000);
static void StartGrpcServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100);
static void StartGrpcMasterServer(const std::string &ip, uint32_t grpc_port);
static void StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100,
uint32_t max_infer_num = 10000);
static void StartRestfulServer(const std::string &ip, uint32_t grpc_port, int max_msg_mb_size = 100);
static void WaitAndClear();
static void StopAndClear();
};


+ 7
- 1
mindspore_serving/ccsrc/python/serving_py.cc View File

@@ -21,6 +21,7 @@
#include "python/worker/servable_py.h"
#include "python/tensor_py.h"
#include "common/servable.h"
#include "master/master_context.h"
#include "worker/context.h"
#include "python/master/master_py.h"
#include "python/agent/agent_py.h"
@@ -166,7 +167,7 @@ void PyRegWorker(pybind11::module *m_ptr) {
.def_static("push_postprocess_failed", &PyWorker::PushPostprocessPyFailed)
.def_static("get_device_type", &PyWorker::GetDeviceType);

py::class_<ServableContext, std::shared_ptr<ServableContext>>(m, "Context_")
py::class_<ServableContext, std::shared_ptr<ServableContext>>(m, "ServableContext_")
.def(py::init<>())
.def_static("get_instance", &ServableContext::Instance)
.def("set_device_type_str",
@@ -177,6 +178,11 @@ void PyRegWorker(pybind11::module *m_ptr) {
}
})
.def("set_device_id", &ServableContext::SetDeviceId);

py::class_<MasterContext, std::shared_ptr<MasterContext>>(m, "MasterContext_")
.def(py::init<>())
.def_static("get_instance", &MasterContext::Instance)
.def("set_max_request_buffer_count", &MasterContext::SetMaxRequestBufferCount);
}

void PyRegWorkerAgent(pybind11::module *m_ptr) {


+ 15
- 0
mindspore_serving/common/__init__.py View File

@@ -0,0 +1,15 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""MindSpore Serving."""

mindspore_serving/worker/check_type.py → mindspore_serving/common/check_type.py View File

@@ -53,7 +53,7 @@ def check_bool(arg_name, bool_val):
raise RuntimeError(f"Parameter '{arg_name}' should be bool, but actually {type(bool_val)}")


def check_int(arg_name, int_val, mininum=None, maximum=None, is_tuple_item=False):
def check_int(arg_name, int_val, minimum=None, maximum=None, is_tuple_item=False):
"""Check whether the input parameters are reasonable int input"""
if not is_tuple_item:
prefix = f"Parameter '{arg_name}'"
@@ -64,13 +64,13 @@ def check_int(arg_name, int_val, mininum=None, maximum=None, is_tuple_item=False
raise RuntimeError(f"{prefix} should be int, but actually {type(int_val)}")
if not isinstance(int_val, int):
raise RuntimeError(f"{prefix} should be int, but actually {type(int_val)}")
if mininum is not None and int_val < mininum:
if minimum is not None and int_val < minimum:
if maximum is not None:
raise RuntimeError(f"{prefix} should be in range [{mininum},{maximum}]")
raise RuntimeError(f"{prefix} should be >= {mininum}")
raise RuntimeError(f"{prefix} should be in range [{minimum},{maximum}]")
raise RuntimeError(f"{prefix} should be >= {minimum}")
if maximum is not None and int_val > maximum:
if mininum is not None:
raise RuntimeError(f"{prefix} should be in range [{mininum},{maximum}]")
if minimum is not None:
raise RuntimeError(f"{prefix} should be in range [{minimum},{maximum}]")
raise RuntimeError(f"{prefix} should be <= {maximum}")


@@ -79,7 +79,7 @@ def check_ip_port(arg_name, port):
check_int(arg_name, port, 1, 65535)


def check_and_as_int_tuple_list(arg_name, ints, mininum=None, maximum=None):
def check_and_as_int_tuple_list(arg_name, ints, minimum=None, maximum=None):
"""Check whether the input parameters are reasonable multiple str inputs,
which can be single str, tuple or list of str.
finally, return tuple of str"""
@@ -94,7 +94,7 @@ def check_and_as_int_tuple_list(arg_name, ints, mininum=None, maximum=None):
for item in ints:
if item in int_list:
raise RuntimeError(f"The item value '{item}' in parameter '{arg_name}' should not be repeated")
check_int(arg_name, item, mininum, maximum, True)
check_int(arg_name, item, minimum, maximum, True)
int_list.append(item)

return tuple(ints)

+ 3
- 1
mindspore_serving/master/__init__.py View File

@@ -15,11 +15,13 @@
"""MindSpore Serving Master"""

from ._master import start_grpc_server, start_restful_server, start_master_server, stop
from . import context

__all__ = []
__all__.extend([
"start_grpc_server",
'start_restful_server',
'start_master_server',
'stop'
'stop',
'context'
])

+ 5
- 11
mindspore_serving/master/_master.py View File

@@ -16,7 +16,7 @@

import threading
from functools import wraps
from mindspore_serving.worker import check_type
from mindspore_serving.common import check_type
from mindspore_serving import log as logger
from mindspore_serving._mindspore_serving import ExitSignalHandle_
from mindspore_serving._mindspore_serving import Master_
@@ -70,7 +70,7 @@ def stop_on_except(func):


@stop_on_except
def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100, max_infer_num=10000):
def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100):
r"""
Start gRPC server for the communication between client and serving.

@@ -79,8 +79,6 @@ def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100, max_inf
grpc_port (int): gRPC port ip, default 5500, ip port range [1, 65535].
max_msg_mb_size (int): The maximum acceptable gRPC message size in megabytes(MB), default 100,
value range [1, 512].
max_infer_num (int): The maximum acceptable infer message size in number, default 10000,
Max infer number should be a positive integer.
Raises:
RuntimeError: Fail to start the gRPC server.

@@ -93,9 +91,8 @@ def start_grpc_server(ip="0.0.0.0", grpc_port=5500, max_msg_mb_size=100, max_inf
check_type.check_str('ip', ip)
check_type.check_ip_port('grpc_port', grpc_port)
check_type.check_int('max_msg_mb_size', max_msg_mb_size, 1, 512)
check_type.check_int('max_infer_num', max_infer_num, 0)

Master_.start_grpc_server(ip, grpc_port, max_msg_mb_size, max_infer_num)
Master_.start_grpc_server(ip, grpc_port, max_msg_mb_size)
_start_wait_and_clear()


@@ -129,7 +126,7 @@ def start_master_server(ip="127.0.0.1", master_port=6100):


@stop_on_except
def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100, max_infer_num=10000):
def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100):
r"""
Start RESTful server for the communication between client and serving.

@@ -138,8 +135,6 @@ def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100, m
restful_port (int): gRPC port ip, default 5900, ip port range [1, 65535].
max_msg_mb_size (int): The maximum acceptable RESTful message size in megabytes(MB), default 100,
value range [1, 512].
max_infer_num (int): The maximum acceptable infer message size in number, default 10000,
Max infer number should be a positive integer.
Raises:
RuntimeError: Fail to start the RESTful server.

@@ -151,7 +146,6 @@ def start_restful_server(ip="0.0.0.0", restful_port=5900, max_msg_mb_size=100, m
check_type.check_str('ip', ip)
check_type.check_ip_port('restful_port', restful_port)
check_type.check_int('max_msg_mb_size', max_msg_mb_size, 1, 512)
check_type.check_int('max_infer_num', max_infer_num, 0)

Master_.start_restful_server(ip, restful_port, max_msg_mb_size, max_infer_num)
Master_.start_restful_server(ip, restful_port, max_msg_mb_size)
_start_wait_and_clear()

+ 34
- 0
mindspore_serving/master/context.py View File

@@ -0,0 +1,34 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Set context of serving"""
from mindspore_serving._mindspore_serving import MasterContext_
from mindspore_serving.common import check_type

_context = MasterContext_.get_instance()


def set_max_request_buffer_count(max_request_buffer_count):
r"""
Set the maximum number of requests waiting to be processed.

Args:
max_request_buffer_count (int): The maximum acceptable infer message size in number, default 10000,
Max infer number should be a positive integer.

Raises:
RuntimeError: The type or value of the parameters is invalid, or other error happened.
"""
check_type.check_int("max_request_buffer_count", max_request_buffer_count, 1)
_context.set_max_request_buffer_count(max_request_buffer_count)

+ 21
- 16
mindspore_serving/worker/_worker.py View File

@@ -17,23 +17,38 @@
import threading
from functools import wraps
from mindspore_serving import log as logger
from mindspore_serving.common import check_type
from mindspore_serving.worker import init_mindspore
from mindspore_serving._mindspore_serving import ExitSignalHandle_
from mindspore_serving._mindspore_serving import Worker_
from mindspore_serving._mindspore_serving import ServableContext_
from .register.preprocess import preprocess_storage
from .register.postprocess import postprocess_storage
from . import context
from .task import _start_py_task, _join_py_task
from . import check_type

_wait_and_clear_thread = None


def _clear_python():
"""Clear python storage data"""
preprocess_storage.clear()
postprocess_storage.clear()


def _set_device_id(device_id):
"""Set device id, default 0"""
ServableContext_.get_instance().set_device_id(device_id)


def _set_device_type(device_type):
"""Set device type, now can be 'None'(default), 'GPU' and 'Ascend', 'Davinci'(same as 'Ascend'), case ignored. """
if device_type is not None:
check_type.check_str('device_type', device_type)
ServableContext_.get_instance().set_device_type_str(device_type)
else:
ServableContext_.get_instance().set_device_type_str('None') # depend on MindSpore build target


def _start_wait_and_clear():
"""Waiting for Ctrl+C, and clear up environment"""

@@ -162,13 +177,8 @@ def start_servable(servable_directory, servable_name, version_number=0,
init_mindspore.init_mindspore_cxx_env()
_load_servable_config(servable_directory, servable_name)

if device_type is not None:
check_type.check_str('device_type', device_type)
context.set_context(device_type=device_type)
else:
context.set_context(device_type='None') # depend on register implement

context.set_context(device_id=device_id)
_set_device_type(device_type)
_set_device_id(device_id)
Worker_.start_servable(servable_directory, servable_name, version_number, master_ip, master_port,
worker_ip, worker_port)
_start_py_task(Worker_.get_batch_size())
@@ -219,13 +229,8 @@ def start_servable_in_master(servable_directory, servable_name, version_number=0
init_mindspore.init_mindspore_cxx_env()
_load_servable_config(servable_directory, servable_name)

if device_type is not None:
check_type.check_str('device_type', device_type)
context.set_context(device_type=device_type)
else:
context.set_context(device_type='None') # depend on register implement

context.set_context(device_id=device_id)
_set_device_type(device_type)
_set_device_id(device_id)
Worker_.start_servable_in_master(servable_directory, servable_name, version_number)
_start_py_task(Worker_.get_batch_size())
_start_wait_and_clear()

+ 0
- 64
mindspore_serving/worker/context.py View File

@@ -1,64 +0,0 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Context setting interface"""
from mindspore_serving._mindspore_serving import Context_


class Context:
"""Set context of device, including device id and device type, can only set once currently."""

def __init__(self):
self.context_ = Context_.get_instance()

def set_device_type(self, device_type):
"""Set device type, now can be 'None'(default) and 'Ascend', 'Davinci'(same as 'Ascend'), case ignored. """
self.context_.set_device_type_str(device_type)

def set_device_id(self, device_id):
"""Set device id, default 0"""
self.context_.set_device_id(device_id)


_k_context = None


def _context():
"""
Get the global _context, if context is not created, create a new one.

Returns:
_Context, the global context in PyNative mode.
"""
global _k_context
if _k_context is None:
_k_context = Context()
return _k_context


def set_context(**kwargs):
"""The context setting interface. The acceptable parameters including:
device_type: 'Ascend','Davinci', 'None'. Case ignored.
- Davinci' and 'Ascend' are the same, means Ascend910 or Ascend310.
- 'None' means depend on MindSpore.
device_id: reasonable device id
"""
context = _context()
for (k, w) in kwargs.items():
if k == "device_type":
context.set_device_type(w)
elif k == "device_id":
context.set_device_id(w)
else:
raise RuntimeError(f"Not support context key '{k}'")

+ 12
- 9
mindspore_serving/worker/distributed/agent_startup.py View File

@@ -27,7 +27,7 @@ from mindspore_serving._mindspore_serving import ExitSignalHandle_
from mindspore_serving._mindspore_serving import WorkerAgent_, AgentStartUpConfig_

from mindspore_serving import log as logger
from mindspore_serving.worker import check_type
from mindspore_serving.common import check_type
from mindspore_serving.worker.distributed import worker_agent


@@ -160,7 +160,8 @@ def _agent_process(send_pipe, recv_pipe, index, start_config):
except Exception as e:
traceback.print_exc()
logger.error(f"Child {index}: Catch exception and notify exit of others")
send_pipe.send((index, e))
exception = RuntimeError(f"Child {index} exception happen: {e}")
send_pipe.send((index, exception))
_recv_parent(parent_process, index, recv_pipe, False)
logger.error(f"Child {index}: end send message to parent")

@@ -251,24 +252,23 @@ def _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list)
if ExitSignalHandle_.has_stopped():
logger.warning("Fail to start agents because of Ctrl+C")
_send_exit_msg_to_children(send_pipe_list, subprocess_list)
return False
raise RuntimeError("Fail to start agents because of Ctrl+C")
for send_pipe, process in zip(send_pipe_list, subprocess_list):
if process.is_alive():
continue
logger.warning("Fail to start agents because of death of one agent")
_send_exit_msg_to_children(send_pipe_list, subprocess_list)
return False
raise RuntimeError("Fail to start agents because of death of one agent")

index, msg = p_recv_pipe.recv()
logger.info(f"Receive msg from Child {index}: {msg}")
if isinstance(msg, Exception):
logger.warning("Fail to start agents because of exception raise by one agent")
_send_exit_msg_to_children(send_pipe_list, subprocess_list)
return False
raise msg

for send_pipe in send_pipe_list:
_send_pipe_msg(send_pipe, signal_success)
return True


def _listening_agents_after_startup(subprocess_list, worker_ip, worker_port, agent_ip):
@@ -328,16 +328,19 @@ def _startup_agents(common_meta, worker_ip, worker_port,
name=f"{servable_name}_worker_agent_rank{rank_id}_device{device_id}")
process.start()
subprocess_list.append(process)
ret = _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list)

msg = f"worker_ip: {worker_ip}, worker_port: {worker_port}, agent_ip: {agent_ip}, " \
f"agent_start_port: {agent_start_port}, device ids: {device_id_list}, rank ids: {rank_id_list}, " \
f"rank table file: {rank_table_file}, model files: {model_files}, group config files: {group_config_files}"
if not ret:

try:
_listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list)
# pylint: disable=broad-except
except Exception as e:
WorkerAgent_.notify_failed(worker_ip, worker_port)
logger.error(f"Failed to start agents, {msg}")
print(f"Failed to start agents, {msg}")
raise RuntimeError("Failed to start agents")
raise e

logger.info(f"Success to start agents, {msg}")
print(f"Success to start agents, {msg}")


+ 1
- 1
mindspore_serving/worker/distributed/distributed_worker.py View File

@@ -18,7 +18,7 @@ import os
from mindspore_serving._mindspore_serving import Worker_

from mindspore_serving import log as logger
from mindspore_serving.worker import check_type
from mindspore_serving.common import check_type
from mindspore_serving.worker._worker import _start_py_task, _start_wait_and_clear
from mindspore_serving.worker._worker import stop_on_except, _load_servable_config



+ 1
- 1
mindspore_serving/worker/distributed/register.py View File

@@ -15,7 +15,7 @@
"""Serving, distributed worker register"""

from mindspore_serving._mindspore_serving import ServableMeta_, ServableStorage_
from mindspore_serving.worker import check_type
from mindspore_serving.common import check_type
from mindspore_serving.worker.common import get_servable_dir
from mindspore_serving import log as logger



+ 1
- 1
mindspore_serving/worker/register/method.py View File

@@ -21,7 +21,7 @@ from easydict import EasyDict

from mindspore_serving._mindspore_serving import ServableStorage_, MethodSignature_, PredictPhaseTag_
from mindspore_serving.worker.common import get_func_name, get_servable_dir
from mindspore_serving.worker import check_type
from mindspore_serving.common import check_type
from mindspore_serving import log as logger
from .preprocess import register_preprocess, check_preprocess
from .postprocess import register_postprocess, check_postprocess


+ 1
- 1
mindspore_serving/worker/register/servable.py View File

@@ -15,7 +15,7 @@
"""Servable declaration interface"""

from mindspore_serving._mindspore_serving import ServableMeta_, ServableStorage_
from mindspore_serving.worker import check_type
from mindspore_serving.common import check_type
from mindspore_serving.worker.common import get_servable_dir
from mindspore_serving import log as logger



+ 1
- 0
tests/ut/python/runtest.sh View File

@@ -32,6 +32,7 @@ cp ../mindspore_serving/proto/ms_service*.py mindspore_serving/proto/
cp _mindspore_serving*.so mindspore_serving/
cp -r ${PROJECT_PATH}/mindspore_serving/master mindspore_serving/
cp -r ${PROJECT_PATH}/mindspore_serving/worker mindspore_serving/
cp -r ${PROJECT_PATH}/mindspore_serving/common mindspore_serving/
cp -r ${PROJECT_PATH}/mindspore_serving/client mindspore_serving/
cp ${PROJECT_PATH}/mindspore_serving/*.py mindspore_serving/



+ 1
- 0
tests/ut/python/tests/common.py View File

@@ -130,6 +130,7 @@ def serving_test(func):
try:
func(*args, **kwargs)
finally:
master.context.set_max_request_buffer_count(10000)
master.stop()
worker.stop()
servable_dir = os.path.join(os.getcwd(), "serving_python_ut_servables")


+ 38
- 10
tests/ut/python/tests/test_distributed_worker.py View File

@@ -92,23 +92,37 @@ def start_distributed_worker(base):
return worker


def start_agents(model_file_list, group_config_list):
def start_agents(model_file_list, group_config_list, start_port):
send_pipe, recv_pipe = Pipe()

def agent_process(send_pipe):
distributed.startup_worker_agents(worker_ip="127.0.0.1", worker_port=6200, model_files=model_file_list,
group_config_files=group_config_list)
send_pipe.send("Success")
try:
distributed.startup_worker_agents(worker_ip="127.0.0.1", worker_port=6200, model_files=model_file_list,
group_config_files=group_config_list, agent_start_port=start_port)
send_pipe.send("Success")
# pylint: disable=broad-except
except Exception as e:
send_pipe.send(e)
send_pipe.close()

agent = Process(target=agent_process, args=(send_pipe,))
agent.start()
index = 0
while index < 50 and agent.is_alive(): # wait max 5 s
while index < 100 and agent.is_alive(): # wait max 10 s
index += 1
if recv_pipe.poll(0.1):
msg = recv_pipe.recv()
print(f"Receive agent process msg: {msg} {agent.is_alive()}")
if isinstance(msg, Exception):
raise msg
break
assert index < 50

if recv_pipe.poll(0.1):
msg = recv_pipe.recv()
print(f"Receive agent process msg: {msg} {agent.is_alive()}")
if isinstance(msg, Exception):
raise msg
assert index < 100
assert agent.is_alive()
return agent

@@ -138,7 +152,7 @@ def test_distributed_worker_worker_exit_success():
base = start_distributed_grpc_server()
worker_process = start_distributed_worker(base)
base.add_on_exit(lambda: send_exit(worker_process))
agent_process = start_agents(base.model_file_list, base.group_config_list)
agent_process = start_agents(base.model_file_list, base.group_config_list, 7000)
base.add_on_exit(lambda: send_exit(agent_process))

client = create_client("localhost", 5500, base.servable_name, "predict")
@@ -179,7 +193,7 @@ def test_distributed_worker_agent_exit_success():
base = start_distributed_grpc_server()
worker_process = start_distributed_worker(base)
base.add_on_exit(lambda: send_exit(worker_process))
agent_process = start_agents(base.model_file_list, base.group_config_list)
agent_process = start_agents(base.model_file_list, base.group_config_list, 7008)
base.add_on_exit(lambda: send_exit(agent_process))

client = create_client("localhost", 5500, base.servable_name, "predict")
@@ -217,7 +231,7 @@ def test_distributed_worker_agent_startup_killed_exit_success():
base = start_distributed_grpc_server()
worker_process = start_distributed_worker(base)
base.add_on_exit(lambda: send_exit(worker_process))
agent_process = start_agents(base.model_file_list, base.group_config_list)
agent_process = start_agents(base.model_file_list, base.group_config_list, 7016)
base.add_on_exit(lambda: send_exit(agent_process))

client = create_client("localhost", 5500, base.servable_name, "predict")
@@ -256,7 +270,7 @@ def test_distributed_worker_agent_killed_exit_success():
base = start_distributed_grpc_server()
worker_process = start_distributed_worker(base)
base.add_on_exit(lambda: send_exit(worker_process))
agent_process = start_agents(base.model_file_list, base.group_config_list)
agent_process = start_agents(base.model_file_list, base.group_config_list, 7024)
base.add_on_exit(lambda: send_exit(agent_process))

client = create_client("localhost", 5500, base.servable_name, "predict")
@@ -289,3 +303,17 @@ def test_distributed_worker_agent_killed_exit_success():
assert not worker_process.is_alive()
assert not agent_process.is_alive()
assert not agents_alive()


@serving_test
def test_distributed_worker_agent_invalid_model_files_failed():
base = start_distributed_grpc_server()
worker_process = start_distributed_worker(base)
base.add_on_exit(lambda: send_exit(worker_process))
base.model_file_list[0] = base.model_file_list[0] + "_error"
try:
start_agents(base.model_file_list, base.group_config_list, 7036)
assert False
# pylint: disable=broad-except
except Exception as e:
assert "Cannot access model file" in str(e)

+ 97
- 3
tests/ut/python/tests/test_mater_worker_client.py View File

@@ -1089,12 +1089,106 @@ def test_servable_worker_alone_servable_not_available():
instances = []
y_data_list = []
for i in range(instance_count):
x1 = np.asarray([[1.1], [3.3]]).astype(np.float32) * (i + 1)
x2 = np.asarray([[5.5], [7.7]]).astype(np.float32) * (i + 1)
x1 = np.asarray([[1.1, 2.2], [3.3, 4.4]]).astype(np.float32) * (i + 1)
x2 = np.asarray([[5.5, 6.6], [7.7, 8.8]]).astype(np.float32) * (i + 1)
y_data_list.append(x1 + x2)
instances.append({"x3": x1, "x2": x2})
instances.append({"x1": x1, "x2": x2})

client = create_client("localhost", 5500, base.servable_name + "error", "add_common")
result = client.infer(instances)
print(result)
assert "servable is not available" in result["error"]


@serving_test
def test_servable_worker_with_master_max_request_count():
# fail returned from Worker::RunAsync
base = ServingTestBase()
servable_content = servable_config_import
servable_content += servable_config_declare_servable
servable_content += r"""
import time
def preprocess(x1, x2):
time.sleep(1)
return x1, x2
@register.register_method(output_names=["y"])
def add_common(x1, x2):
x1, x2 = register.call_preprocess(preprocess, x1, x2)
y = register.call_servable(x1, x2)
return y
"""
base.init_servable_with_servable_config(1, servable_content)
master.context.set_max_request_buffer_count(1)
worker.start_servable_in_master(base.servable_dir, base.servable_name)
master.start_grpc_server("0.0.0.0", 5500)
# Client
x1 = np.asarray([[1.1, 2.2], [3.3, 4.4]]).astype(np.float32)
x2 = np.asarray([[5.5, 6.6], [7.7, 8.8]]).astype(np.float32)
instance = {"x1": x1, "x2": x2}

client = create_client("localhost", 5500, base.servable_name, "add_common")
result_list = []
for _ in range(2):
result = client.infer_async(instance)
result_list.append(result)

result0 = result_list[0].result()
result1 = result_list[1].result()
print(result0)
print(result1)
assert "error" in result0 or "error" in result1
if "error" in result0:
assert "error" not in result1
assert "Serving Error: request buffer number exceeds the limit 1" in result0["error"]
else:
assert "error" not in result0
assert "Serving Error: request buffer number exceeds the limit 1" in result1["error"]


@serving_test
def test_servable_worker_alone_max_request_count():
# fail returned from Worker::RunAsync
base = ServingTestBase()
servable_content = servable_config_import
servable_content += servable_config_declare_servable
servable_content += r"""
import time
def preprocess(x1, x2):
time.sleep(1)
return x1, x2
@register.register_method(output_names=["y"])
def add_common(x1, x2):
x1, x2 = register.call_preprocess(preprocess, x1, x2)
y = register.call_servable(x1, x2)
return y
"""
base.init_servable_with_servable_config(1, servable_content)
master.context.set_max_request_buffer_count(1)

master.start_master_server("0.0.0.0", 6100)
worker.start_servable(base.servable_dir, base.servable_name, worker_port=6200, master_port=6100)
master.start_grpc_server("0.0.0.0", 5500)
# Client
x1 = np.asarray([[1.1, 2.2], [3.3, 4.4]]).astype(np.float32)
x2 = np.asarray([[5.5, 6.6], [7.7, 8.8]]).astype(np.float32)
instance = {"x1": x1, "x2": x2}

client = create_client("localhost", 5500, base.servable_name, "add_common")
result_list = []
for _ in range(2):
result = client.infer_async(instance)
result_list.append(result)

result0 = result_list[0].result()
result1 = result_list[1].result()
print(result0)
print(result1)
assert "error" in result0 or "error" in result1
if "error" in result0:
assert "error" not in result1
assert "Serving Error: request buffer number exceeds the limit 1" in result0["error"]
else:
assert "error" not in result0
assert "Serving Error: request buffer number exceeds the limit 1" in result1["error"]

Loading…
Cancel
Save