From e51d552d636fea28f80654f36b9cdca3b1db0cd1 Mon Sep 17 00:00:00 2001 From: xuyongfei Date: Thu, 18 Mar 2021 12:45:29 +0800 Subject: [PATCH] fix grpc deadlock fix grpc deadlock --- .../ccsrc/worker/inference/inference.cc | 14 ++++ .../ccsrc/worker/inference/inference.h | 1 + .../worker/distributed/agent_startup.py | 77 ++++++++++++++++++- .../distribute_agent_fault/kill_15_agent.sh | 30 ++++++-- .../distribute_master_fault/kill_15_master.sh | 30 ++++++-- .../kill_15_agent.sh | 18 ++++- .../kill_15_master.sh | 18 ++++- .../distribute_worker_fault/kill_15_worker.sh | 30 ++++++-- .../matmul_distributed/matmul_distribute.sh | 23 +++++- third_party/mindspore | 2 +- 10 files changed, 212 insertions(+), 31 deletions(-) diff --git a/mindspore_serving/ccsrc/worker/inference/inference.cc b/mindspore_serving/ccsrc/worker/inference/inference.cc index 3fd8823..e3bc4a8 100644 --- a/mindspore_serving/ccsrc/worker/inference/inference.cc +++ b/mindspore_serving/ccsrc/worker/inference/inference.cc @@ -33,6 +33,9 @@ InferenceLoader::~InferenceLoader() { if (ms_cxx_lib_handle_ != nullptr) { dlclose(ms_cxx_lib_handle_); } + if (gomp_handler_ != nullptr) { + dlclose(gomp_handler_); + } } InferenceLoader &InferenceLoader::Instance() { @@ -74,6 +77,17 @@ std::vector SplitString(const std::string &s, const std::string &de Status InferenceLoader::LoadMindSporeModelWrap() { MSI_LOG_INFO << "Start Initialize MindSpore Model Wrap so"; + std::vector gomp_list = {"libgomp.so.1"}; + for (auto &item : gomp_list) { + gomp_handler_ = dlopen(item.c_str(), RTLD_NOW | RTLD_GLOBAL); + if (gomp_handler_ != nullptr) { + MSI_LOG_INFO << "dlopen libgomp so: " << item << " success"; + } + } + if (gomp_handler_ == nullptr) { + MSI_LOG_WARNING << "dlopen libgomp library failed, try dlopen list: " << gomp_list; + } + auto get_dlerror = []() -> std::string { auto error = dlerror(); if (error == nullptr) { diff --git a/mindspore_serving/ccsrc/worker/inference/inference.h b/mindspore_serving/ccsrc/worker/inference/inference.h index 33c2552..9e57ce3 100644 --- a/mindspore_serving/ccsrc/worker/inference/inference.h +++ b/mindspore_serving/ccsrc/worker/inference/inference.h @@ -141,6 +141,7 @@ class MS_API InferenceLoader { typedef InferenceBase *(*CreateInferHandle)(); void *ms_lib_handle_ = nullptr; void *ms_cxx_lib_handle_ = nullptr; + void *gomp_handler_ = nullptr; CreateInferHandle ms_create_handle_ = nullptr; Status LoadMindSporeModelWrap(); }; diff --git a/mindspore_serving/worker/distributed/agent_startup.py b/mindspore_serving/worker/distributed/agent_startup.py index 68b8fe2..ad7e67e 100644 --- a/mindspore_serving/worker/distributed/agent_startup.py +++ b/mindspore_serving/worker/distributed/agent_startup.py @@ -25,6 +25,7 @@ import psutil from mindspore_serving._mindspore_serving import ExitSignalHandle_ from mindspore_serving._mindspore_serving import WorkerAgent_, AgentStartUpConfig_ +from mindspore_serving._mindspore_serving import DistributedServableConfig_, OneRankConfig_ from mindspore_serving import log as logger from mindspore_serving.common import check_type @@ -347,6 +348,79 @@ def _startup_agents(common_meta, worker_ip, worker_port, _listening_agents_after_startup(subprocess_list, worker_ip, worker_port, agent_ip) +class DistributedServableConfig: + """Python DistributedServableConfig""" + def __init__(self): + self.rank_table_content = "" + self.rank_list = None + self.common_meta = None + self.distributed_meta = None + + def set(self, config): + """Set from C++ DistributedServableConfig_ obj""" + self.rank_table_content = config.rank_table_content + self.rank_list = [] + for item in config.rank_list: + new_item = {"device_id": item.device_id, "ip": item.ip} + self.rank_list.append(new_item) + self.common_meta = {"servable_name": config.common_meta.servable_name, + "with_batch_dim": config.common_meta.with_batch_dim, + "without_batch_dim_inputs": config.common_meta.without_batch_dim_inputs, + "inputs_count": config.common_meta.inputs_count, + "outputs_count": config.common_meta.outputs_count} + + self.distributed_meta = {"rank_size": config.distributed_meta.rank_size, + "stage_size": config.distributed_meta.stage_size} + + def get(self): + """Get as C++ DistributedServableConfig_ obj""" + config = DistributedServableConfig_() + config.rank_table_content = self.rank_table_content + rank_list = [] + for item in self.rank_list: + new_item = OneRankConfig_() + new_item.device_id = item["device_id"] + new_item.ip = item["ip"] + rank_list.append(new_item) + config.rank_list = rank_list + config.common_meta.servable_name = self.common_meta["servable_name"] + config.common_meta.with_batch_dim = self.common_meta["with_batch_dim"] + config.common_meta.without_batch_dim_inputs = self.common_meta["without_batch_dim_inputs"] + config.common_meta.inputs_count = self.common_meta["inputs_count"] + config.common_meta.outputs_count = self.common_meta["outputs_count"] + + config.distributed_meta.rank_size = self.distributed_meta["rank_size"] + config.distributed_meta.stage_size = self.distributed_meta["stage_size"] + return config + + +def _get_worker_distributed_config(worker_ip, worker_port): + """Get worker distributed config from worker through sub process""" + c_send_pipe, p_recv_pipe = Pipe() + + def process_fun(c_send_pipe): + try: + distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port) + config = DistributedServableConfig() + config.set(distributed_config) + c_send_pipe.send(config) + # pylint: disable=broad-except + except Exception as e: + c_send_pipe.send(e) + process = Process(target=process_fun, args=(c_send_pipe,), + name=f"worker_agent_get_agents_config_from_worker") + process.start() + process.join() + assert not process.is_alive() + if p_recv_pipe.poll(0.1): + config = p_recv_pipe.recv() + if isinstance(config, Exception): + raise config + distributed_config = config.get() + return distributed_config + raise RuntimeError(f"Failed to get agents config from worker") + + def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files=None, agent_start_port=7000, agent_ip=None, rank_start=None): r""" @@ -390,7 +464,8 @@ def startup_worker_agents(worker_ip, worker_port, model_files, group_config_file group_config_files = check_type.check_and_as_str_tuple_list("group_config_files", group_config_files) ExitSignalHandle_.start() - distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port) + distributed_config = _get_worker_distributed_config(worker_ip, worker_port) + # distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port) # get machine ip rank_list = distributed_config.rank_list diff --git a/tests/st/distribute_agent_fault/kill_15_agent.sh b/tests/st/distribute_agent_fault/kill_15_agent.sh index f80f4c5..3b94d90 100644 --- a/tests/st/distribute_agent_fault/kill_15_agent.sh +++ b/tests/st/distribute_agent_fault/kill_15_agent.sh @@ -184,7 +184,28 @@ kill_agent() then echo "kill agent failed" fi - sleep 5 + + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + count=0 + while [[ ${num} -ne 0 && ${count} -lt 10 ]] + do + sleep 1 + count=$(($count+1)) + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + done + + if [ ${count} -eq 10 ] + then + echo "agent exit failed" + echo $num + ps -ef | grep agent.py | grep -v grep + echo "------------------------------ agent failed log begin: " + cat agent.log + echo "------------------------------ agent failed log end" + clean_pid && exit 1 + fi + sleep 1 + num=`ps -ef | grep master.py | grep -v grep | wc -l` if [ $num -ne 1 ] then @@ -199,13 +220,6 @@ kill_agent() echo $num clean_pid && exit 1 fi - num=`ps -ef | grep agent.py | grep -v grep | wc -l` - if [ $num -ne 0 ] - then - echo "agent exit failed" - echo $num - clean_pid && exit 1 - fi } test_agent_fault_model() diff --git a/tests/st/distribute_master_fault/kill_15_master.sh b/tests/st/distribute_master_fault/kill_15_master.sh index bf78133..835def5 100644 --- a/tests/st/distribute_master_fault/kill_15_master.sh +++ b/tests/st/distribute_master_fault/kill_15_master.sh @@ -186,25 +186,39 @@ kill_master() then echo "kill master failed" fi - sleep 5 - num=`ps -ef | grep master.py | grep -v grep | wc -l` - if [ $num -ne 0 ] + + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + count=0 + while [[ ${num} -ne 0 && ${count} -lt 10 ]] + do + sleep 1 + count=$(($count+1)) + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + done + + if [ ${count} -eq 10 ] then - echo "master exit failed" + echo "agent exit failed" echo $num + ps -ef | grep agent.py | grep -v grep + echo "------------------------------ agent failed log begin: " + cat agent.log + echo "------------------------------ agent failed log end" clean_pid && exit 1 fi - num=`ps -ef | grep worker.py | grep -v grep | wc -l` + sleep 1 + + num=`ps -ef | grep master.py | grep -v grep | wc -l` if [ $num -ne 0 ] then - echo "worker exit failed" + echo "master exit failed" echo $num clean_pid && exit 1 fi - num=`ps -ef | grep agent.py | grep -v grep | wc -l` + num=`ps -ef | grep worker.py | grep -v grep | wc -l` if [ $num -ne 0 ] then - echo "agent exit failed" + echo "worker exit failed" echo $num clean_pid && exit 1 fi diff --git a/tests/st/distribute_master_with_worker/kill_15_agent.sh b/tests/st/distribute_master_with_worker/kill_15_agent.sh index f10e718..8efbc63 100644 --- a/tests/st/distribute_master_with_worker/kill_15_agent.sh +++ b/tests/st/distribute_master_with_worker/kill_15_agent.sh @@ -145,14 +145,28 @@ kill_agent() then echo "kill agent failed" fi - sleep 5 + num=`ps -ef | grep agent.py | grep -v grep | wc -l` - if [ $num -ne 0 ] + count=0 + while [[ ${num} -ne 0 && ${count} -lt 10 ]] + do + sleep 1 + count=$(($count+1)) + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + done + + if [ ${count} -eq 10 ] then echo "agent exit failed" echo $num + ps -ef | grep agent.py | grep -v grep + echo "------------------------------ agent failed log begin: " + cat agent.log + echo "------------------------------ agent failed log end" clean_pid && exit 1 fi + sleep 1 + num=`ps -ef | grep master_with_worker.py | grep -v grep | wc -l` if [ $num -ne 0 ] then diff --git a/tests/st/distribute_master_with_worker/kill_15_master.sh b/tests/st/distribute_master_with_worker/kill_15_master.sh index c6e0eb6..55088e4 100644 --- a/tests/st/distribute_master_with_worker/kill_15_master.sh +++ b/tests/st/distribute_master_with_worker/kill_15_master.sh @@ -144,14 +144,28 @@ kill_master_with_worker() then echo "kill master_with_worker failed" fi - sleep 5 + num=`ps -ef | grep agent.py | grep -v grep | wc -l` - if [ $num -ne 0 ] + count=0 + while [[ ${num} -ne 0 && ${count} -lt 10 ]] + do + sleep 1 + count=$(($count+1)) + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + done + + if [ ${count} -eq 10 ] then echo "agent exit failed" echo $num + ps -ef | grep agent.py | grep -v grep + echo "------------------------------ agent failed log begin: " + cat agent.log + echo "------------------------------ agent failed log end" clean_pid && exit 1 fi + sleep 1 + num=`ps -ef | grep master_with_worker.py | grep -v grep | wc -l` if [ $num -ne 0 ] then diff --git a/tests/st/distribute_worker_fault/kill_15_worker.sh b/tests/st/distribute_worker_fault/kill_15_worker.sh index e8d517d..cf33070 100644 --- a/tests/st/distribute_worker_fault/kill_15_worker.sh +++ b/tests/st/distribute_worker_fault/kill_15_worker.sh @@ -186,7 +186,28 @@ kill_agent() then echo "kill worker failed" fi - sleep 5 + + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + count=0 + while [[ ${num} -ne 0 && ${count} -lt 10 ]] + do + sleep 1 + count=$(($count+1)) + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + done + + if [ ${count} -eq 10 ] + then + echo "agent exit failed" + echo $num + ps -ef | grep agent.py | grep -v grep + echo "------------------------------ agent failed log begin: " + cat agent.log + echo "------------------------------ agent failed log end" + clean_pid && exit 1 + fi + sleep 1 + num=`ps -ef | grep master.py | grep -v grep | wc -l` if [ $num -ne 1 ] then @@ -201,13 +222,6 @@ kill_agent() echo $num clean_pid && exit 1 fi - num=`ps -ef | grep agent.py | grep -v grep | wc -l` - if [ $num -ne 0 ] - then - echo "agent exit failed" - echo $num - clean_pid && exit 1 - fi } test_agent_fault_model() diff --git a/tests/st/matmul_distributed/matmul_distribute.sh b/tests/st/matmul_distributed/matmul_distribute.sh index b8f7bb2..1e1b0ee 100644 --- a/tests/st/matmul_distributed/matmul_distribute.sh +++ b/tests/st/matmul_distributed/matmul_distribute.sh @@ -34,7 +34,28 @@ clean_master_pid() then echo "clean master pip failed" fi - sleep 6 + + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + count=0 + while [[ ${num} -ne 0 && ${count} -lt 10 ]] + do + sleep 1 + count=$(($count+1)) + num=`ps -ef | grep agent.py | grep -v grep | wc -l` + done + + if [ ${count} -eq 10 ] + then + echo "agent exit failed" + echo $num + ps -ef | grep agent.py | grep -v grep + echo "------------------------------ agent failed log begin: " + cat agent.log + echo "------------------------------ agent failed log end" + clean_pid && exit 1 + fi + sleep 1 + ps aux | grep 'master.py' | grep ${CURRUSER} | grep -v grep if [ $? -eq 0 ] then diff --git a/third_party/mindspore b/third_party/mindspore index 423026c..6b8bef2 160000 --- a/third_party/mindspore +++ b/third_party/mindspore @@ -1 +1 @@ -Subproject commit 423026c1ac8488417a4d8022217a573598ad132b +Subproject commit 6b8bef2c8afe3f9890cee8e866771dd4b1d23d16