Browse Source

!190 Serving, bugfix on worker exit with task processing

From: @xu-yfei
Reviewed-by: @zhangyinxia,@linqingke
Signed-off-by: @linqingke
tags/v1.2.0
mindspore-ci-bot Gitee 5 years ago
parent
commit
18b935fd34
4 changed files with 35 additions and 3 deletions
  1. +5
    -0
      mindspore_serving/ccsrc/worker/task_queue.cc
  2. +2
    -1
      mindspore_serving/worker/_check_version.py
  3. +27
    -1
      mindspore_serving/worker/task.py
  4. +1
    -1
      third_party/mindspore

+ 5
- 0
mindspore_serving/ccsrc/worker/task_queue.cc View File

@@ -205,6 +205,11 @@ void TaskQueue::Stop() {
if (!is_running) {
return;
}
task_map_.clear();
task_priority_list_ = std::queue<std::string>();
task_item_processing_ = TaskItem();
callback_map_.clear();

is_running = false;
cond_var_->notify_all();
}


+ 2
- 1
mindspore_serving/worker/_check_version.py View File

@@ -166,6 +166,7 @@ class AscendEnvChecker:
"Can not find opp path (need by mindspore-ascend), please check if you have set env ASCEND_OPP_PATH, "
"you can reference to the installation guidelines https://www.mindspore.cn/install")


class GPUEnvChecker():
"""GPU environment check."""

@@ -190,7 +191,7 @@ class GPUEnvChecker():
"""Get cuda bin path by lib path."""
path_list = []
for path in self.cuda_lib_path:
path = os.path.abspath(path.strip()+"/bin/")
path = os.path.abspath(path.strip() + "/bin/")
if Path(path).is_dir():
path_list.append(path)
return np.unique(path_list)


+ 27
- 1
mindspore_serving/worker/task.py View File

@@ -18,12 +18,15 @@ import threading
import time
import logging
from mindspore_serving._mindspore_serving import Worker_
from mindspore_serving._mindspore_serving import ExitSignalHandle_
from mindspore_serving.worker.register.preprocess import preprocess_storage
from mindspore_serving.worker.register.postprocess import postprocess_storage
from mindspore_serving import log as logger


class ServingSystemException(Exception):
"""Exception notify system error of worker, and need to exit py task"""

def __init__(self, msg):
super(ServingSystemException, self).__init__()
self.msg = msg
@@ -32,12 +35,24 @@ class ServingSystemException(Exception):
return "Serving system error: " + self.msg


class ServingExitException(Exception):
"""Exception notify exit of worker, and need to exit py task"""

def __str__(self):
return "Serving has exited"


task_type_stop = "stop"
task_type_empty = "empty"
task_type_preprocess = "preprocess"
task_type_postprocess = "postprocess"


def has_worker_stopped():
"""Whether worker has stopped"""
return ExitSignalHandle_.has_stopped()


class PyTask:
"""Base class for preprocess and postprocess"""

@@ -119,6 +134,10 @@ class PyTask:
last_index = self.index

for _ in range(self.index, min(self.index + self.switch_batch, instances_size)):
if has_worker_stopped():
logger.info("Worker has exited, exit py task")
raise ServingExitException()

output = next(result)
output = self._handle_result(output)
self.result_batch.append(output)
@@ -141,6 +160,8 @@ class PyTask:
result_count = self.index + len(self.result_batch)
self.push_failed(instances_size - result_count)
raise e
except ServingExitException as e:
raise e
except Exception as e: # catch exception and try next
logger.warning(f"{self.task_name} get result catch exception: {e}")
logging.exception(e)
@@ -226,6 +247,9 @@ class PyTaskThread(threading.Thread):
preprocess_turn = True
while True:
try:
if has_worker_stopped():
logger.info("Worker has exited, exit py task")
break
if not self.preprocess.has_next() and not self.postprocess.has_next():
task = Worker_.get_py_task()
if task.task_type == task_type_stop:
@@ -260,7 +284,9 @@ class PyTaskThread(threading.Thread):
if task.task_type != task_type_empty:
self.postprocess.run(task)
preprocess_turn = True

except ServingExitException:
logger.info("Catch ServingExitException and exit py task")
break
except Exception as e:
logger.error(f"py task catch exception and exit: {e}")
logging.exception(e)


+ 1
- 1
third_party/mindspore

@@ -1 +1 @@
Subproject commit bc6ad2127863f9f7fd7b2f9b25841e508b99fd51
Subproject commit 6fc8a818e13665788483c5d32bda0534bd336b6e

Loading…
Cancel
Save