Browse Source

!215 Serving, update set_max_enqueued_requests api name

From: @xu-yfei
Reviewed-by: @linqingke,@zhoufeng54
Signed-off-by: @zhoufeng54
pull/215/MERGE
mindspore-ci-bot Gitee 5 years ago
parent
commit
770acffd29
9 changed files with 28 additions and 27 deletions
  1. +7
    -6
      mindspore_serving/ccsrc/master/dispacther.cc
  2. +1
    -1
      mindspore_serving/ccsrc/master/dispacther.h
  3. +3
    -3
      mindspore_serving/ccsrc/master/master_context.cc
  4. +3
    -3
      mindspore_serving/ccsrc/master/master_context.h
  5. +1
    -1
      mindspore_serving/ccsrc/python/serving_py.cc
  6. +5
    -5
      mindspore_serving/master/context.py
  7. +1
    -1
      tests/ut/python/tests/common.py
  8. +6
    -6
      tests/ut/python/tests/test_mater_worker_client.py
  9. +1
    -1
      third_party/mindspore

+ 7
- 6
mindspore_serving/ccsrc/master/dispacther.cc View File

@@ -57,9 +57,10 @@ DispatcherWorkerContext Dispatcher::GetWorkSession(const RequestSpec &request_sp
}

Status Dispatcher::JudgeInferNum() {
auto max_infer_num = MasterContext::Instance()->GetMaxRequestBufferCount();
if (infer_num_ >= max_infer_num) {
return INFER_STATUS_LOG_ERROR(FAILED) << "Serving Error: request buffer number exceeds the limit " << max_infer_num;
auto max_enqueued_requests = MasterContext::Instance()->GetMaxEnqueuedRequests();
if (enqueued_requests_ >= max_enqueued_requests) {
return INFER_STATUS_LOG_ERROR(FAILED)
<< "Serving Error: enqueued requests count exceeds the limit " << max_enqueued_requests;
}
return SUCCESS;
}
@@ -77,9 +78,9 @@ void Dispatcher::DispatchAsync(const proto::PredictRequest &request, proto::Pred
try {
auto callback = [this, on_finish]() {
on_finish();
this->infer_num_--;
this->enqueued_requests_--;
};
infer_num_++;
enqueued_requests_++;
status = DispatchAsyncInner(request, reply, callback);
} catch (const std::bad_alloc &ex) {
MSI_LOG(ERROR) << "Serving Error: malloc memory failed";
@@ -99,7 +100,7 @@ void Dispatcher::DispatchAsync(const proto::PredictRequest &request, proto::Pred
if (status != SUCCESS) {
GrpcTensorHelper::CreateReplyFromErrorMsg(status, reply);
on_finish();
infer_num_--;
enqueued_requests_--;
}
}



+ 1
- 1
mindspore_serving/ccsrc/master/dispacther.h View File

@@ -61,7 +61,7 @@ class MS_API Dispatcher {
std::shared_mutex servable_shared_lock_;
// avoid invoke Clear and then UnregisterServable is invoked by Clear in other thread
std::atomic_bool clearing_flag = false;
std::atomic_uint32_t infer_num_ = 0;
std::atomic_uint32_t enqueued_requests_ = 0;

Status JudgeInferNum();
DispatcherWorkerContext GetWorkSession(const RequestSpec &request_spec) const;


+ 3
- 3
mindspore_serving/ccsrc/master/master_context.cc View File

@@ -26,10 +26,10 @@ std::shared_ptr<MasterContext> MasterContext::Instance() {
return instance;
}

void MasterContext::SetMaxRequestBufferCount(uint32_t max_request_buffer_count) {
max_request_buffer_count_ = max_request_buffer_count;
void MasterContext::SetMaxEnqueuedRequests(uint32_t max_enqueued_requests) {
max_enqueued_requests_ = max_enqueued_requests;
}

uint32_t MasterContext::GetMaxRequestBufferCount() const { return max_request_buffer_count_; }
uint32_t MasterContext::GetMaxEnqueuedRequests() const { return max_enqueued_requests_; }

} // namespace mindspore::serving

+ 3
- 3
mindspore_serving/ccsrc/master/master_context.h View File

@@ -28,11 +28,11 @@ class MS_API MasterContext {
public:
static std::shared_ptr<MasterContext> Instance();

void SetMaxRequestBufferCount(uint32_t max_request_buffer_count);
uint32_t GetMaxRequestBufferCount() const;
void SetMaxEnqueuedRequests(uint32_t max_enqueued_requests);
uint32_t GetMaxEnqueuedRequests() const;

private:
uint32_t max_request_buffer_count_ = 10000; // default 10000
uint32_t max_enqueued_requests_ = 10000; // default 10000
};

} // namespace mindspore::serving


+ 1
- 1
mindspore_serving/ccsrc/python/serving_py.cc View File

@@ -182,7 +182,7 @@ void PyRegWorker(pybind11::module *m_ptr) {
py::class_<MasterContext, std::shared_ptr<MasterContext>>(m, "MasterContext_")
.def(py::init<>())
.def_static("get_instance", &MasterContext::Instance)
.def("set_max_request_buffer_count", &MasterContext::SetMaxRequestBufferCount);
.def("set_max_enqueued_requests", &MasterContext::SetMaxEnqueuedRequests);
}

void PyRegWorkerAgent(pybind11::module *m_ptr) {


+ 5
- 5
mindspore_serving/master/context.py View File

@@ -16,21 +16,21 @@
from mindspore_serving._mindspore_serving import MasterContext_
from mindspore_serving.common import check_type

__all__ = ["set_max_request_buffer_count"]
__all__ = ["set_max_enqueued_requests"]

_context = MasterContext_.get_instance()


def set_max_request_buffer_count(max_request_buffer_count):
def set_max_enqueued_requests(max_enqueued_requests):
r"""
Set the maximum number of requests waiting to be processed.

Args:
max_request_buffer_count (int): The maximum acceptable infer message size in number, default 10000,
max_enqueued_requests (int): The maximum acceptable infer message size in number, default 10000,
Max infer number should be a positive integer.

Raises:
RuntimeError: The type or value of the parameters is invalid, or other error happened.
"""
check_type.check_int("max_request_buffer_count", max_request_buffer_count, 1)
_context.set_max_request_buffer_count(max_request_buffer_count)
check_type.check_int("max_enqueued_requests", max_enqueued_requests, 1)
_context.set_max_enqueued_requests(max_enqueued_requests)

+ 1
- 1
tests/ut/python/tests/common.py View File

@@ -130,7 +130,7 @@ def serving_test(func):
try:
func(*args, **kwargs)
finally:
master.context.set_max_request_buffer_count(10000)
master.context.set_max_enqueued_requests(10000)
master.stop()
worker.stop()
servable_dir = os.path.join(os.getcwd(), "serving_python_ut_servables")


+ 6
- 6
tests/ut/python/tests/test_mater_worker_client.py View File

@@ -1119,7 +1119,7 @@ def add_common(x1, x2):
return y
"""
base.init_servable_with_servable_config(1, servable_content)
master.context.set_max_request_buffer_count(1)
master.context.set_max_enqueued_requests(1)
worker.start_servable_in_master(base.servable_dir, base.servable_name)
master.start_grpc_server("0.0.0.0", 5500)
# Client
@@ -1140,10 +1140,10 @@ def add_common(x1, x2):
assert "error" in result0 or "error" in result1
if "error" in result0:
assert "error" not in result1
assert "Serving Error: request buffer number exceeds the limit 1" in result0["error"]
assert "Serving Error: enqueued requests count exceeds the limit 1" in result0["error"]
else:
assert "error" not in result0
assert "Serving Error: request buffer number exceeds the limit 1" in result1["error"]
assert "Serving Error: enqueued requests count exceeds the limit 1" in result1["error"]


@serving_test
@@ -1165,7 +1165,7 @@ def add_common(x1, x2):
return y
"""
base.init_servable_with_servable_config(1, servable_content)
master.context.set_max_request_buffer_count(1)
master.context.set_max_enqueued_requests(1)

master.start_master_server("0.0.0.0", 6100)
worker.start_servable(base.servable_dir, base.servable_name, worker_port=6200, master_port=6100)
@@ -1188,7 +1188,7 @@ def add_common(x1, x2):
assert "error" in result0 or "error" in result1
if "error" in result0:
assert "error" not in result1
assert "Serving Error: request buffer number exceeds the limit 1" in result0["error"]
assert "Serving Error: enqueued requests count exceeds the limit 1" in result0["error"]
else:
assert "error" not in result0
assert "Serving Error: request buffer number exceeds the limit 1" in result1["error"]
assert "Serving Error: enqueued requests count exceeds the limit 1" in result1["error"]

+ 1
- 1
third_party/mindspore

@@ -1 +1 @@
Subproject commit 385edf507d1b1498f6513ad5ef2fa4f7dfb67a87
Subproject commit 370713456510641dd53496cdb17acf94b35e0a1a

Loading…
Cancel
Save