Browse Source

fix grpc deadlock

fix grpc deadlock
tags/v1.2.0
xuyongfei 5 years ago
parent
commit
e51d552d63
10 changed files with 212 additions and 31 deletions
  1. +14
    -0
      mindspore_serving/ccsrc/worker/inference/inference.cc
  2. +1
    -0
      mindspore_serving/ccsrc/worker/inference/inference.h
  3. +76
    -1
      mindspore_serving/worker/distributed/agent_startup.py
  4. +22
    -8
      tests/st/distribute_agent_fault/kill_15_agent.sh
  5. +22
    -8
      tests/st/distribute_master_fault/kill_15_master.sh
  6. +16
    -2
      tests/st/distribute_master_with_worker/kill_15_agent.sh
  7. +16
    -2
      tests/st/distribute_master_with_worker/kill_15_master.sh
  8. +22
    -8
      tests/st/distribute_worker_fault/kill_15_worker.sh
  9. +22
    -1
      tests/st/matmul_distributed/matmul_distribute.sh
  10. +1
    -1
      third_party/mindspore

+ 14
- 0
mindspore_serving/ccsrc/worker/inference/inference.cc View File

@@ -33,6 +33,9 @@ InferenceLoader::~InferenceLoader() {
if (ms_cxx_lib_handle_ != nullptr) {
dlclose(ms_cxx_lib_handle_);
}
if (gomp_handler_ != nullptr) {
dlclose(gomp_handler_);
}
}

InferenceLoader &InferenceLoader::Instance() {
@@ -74,6 +77,17 @@ std::vector<std::string> SplitString(const std::string &s, const std::string &de

Status InferenceLoader::LoadMindSporeModelWrap() {
MSI_LOG_INFO << "Start Initialize MindSpore Model Wrap so";
std::vector<std::string> gomp_list = {"libgomp.so.1"};
for (auto &item : gomp_list) {
gomp_handler_ = dlopen(item.c_str(), RTLD_NOW | RTLD_GLOBAL);
if (gomp_handler_ != nullptr) {
MSI_LOG_INFO << "dlopen libgomp so: " << item << " success";
}
}
if (gomp_handler_ == nullptr) {
MSI_LOG_WARNING << "dlopen libgomp library failed, try dlopen list: " << gomp_list;
}

auto get_dlerror = []() -> std::string {
auto error = dlerror();
if (error == nullptr) {


+ 1
- 0
mindspore_serving/ccsrc/worker/inference/inference.h View File

@@ -141,6 +141,7 @@ class MS_API InferenceLoader {
typedef InferenceBase *(*CreateInferHandle)();
void *ms_lib_handle_ = nullptr;
void *ms_cxx_lib_handle_ = nullptr;
void *gomp_handler_ = nullptr;
CreateInferHandle ms_create_handle_ = nullptr;
Status LoadMindSporeModelWrap();
};


+ 76
- 1
mindspore_serving/worker/distributed/agent_startup.py View File

@@ -25,6 +25,7 @@ import psutil

from mindspore_serving._mindspore_serving import ExitSignalHandle_
from mindspore_serving._mindspore_serving import WorkerAgent_, AgentStartUpConfig_
from mindspore_serving._mindspore_serving import DistributedServableConfig_, OneRankConfig_

from mindspore_serving import log as logger
from mindspore_serving.common import check_type
@@ -347,6 +348,79 @@ def _startup_agents(common_meta, worker_ip, worker_port,
_listening_agents_after_startup(subprocess_list, worker_ip, worker_port, agent_ip)


class DistributedServableConfig:
"""Python DistributedServableConfig"""
def __init__(self):
self.rank_table_content = ""
self.rank_list = None
self.common_meta = None
self.distributed_meta = None

def set(self, config):
"""Set from C++ DistributedServableConfig_ obj"""
self.rank_table_content = config.rank_table_content
self.rank_list = []
for item in config.rank_list:
new_item = {"device_id": item.device_id, "ip": item.ip}
self.rank_list.append(new_item)
self.common_meta = {"servable_name": config.common_meta.servable_name,
"with_batch_dim": config.common_meta.with_batch_dim,
"without_batch_dim_inputs": config.common_meta.without_batch_dim_inputs,
"inputs_count": config.common_meta.inputs_count,
"outputs_count": config.common_meta.outputs_count}

self.distributed_meta = {"rank_size": config.distributed_meta.rank_size,
"stage_size": config.distributed_meta.stage_size}

def get(self):
"""Get as C++ DistributedServableConfig_ obj"""
config = DistributedServableConfig_()
config.rank_table_content = self.rank_table_content
rank_list = []
for item in self.rank_list:
new_item = OneRankConfig_()
new_item.device_id = item["device_id"]
new_item.ip = item["ip"]
rank_list.append(new_item)
config.rank_list = rank_list
config.common_meta.servable_name = self.common_meta["servable_name"]
config.common_meta.with_batch_dim = self.common_meta["with_batch_dim"]
config.common_meta.without_batch_dim_inputs = self.common_meta["without_batch_dim_inputs"]
config.common_meta.inputs_count = self.common_meta["inputs_count"]
config.common_meta.outputs_count = self.common_meta["outputs_count"]

config.distributed_meta.rank_size = self.distributed_meta["rank_size"]
config.distributed_meta.stage_size = self.distributed_meta["stage_size"]
return config


def _get_worker_distributed_config(worker_ip, worker_port):
"""Get worker distributed config from worker through sub process"""
c_send_pipe, p_recv_pipe = Pipe()

def process_fun(c_send_pipe):
try:
distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port)
config = DistributedServableConfig()
config.set(distributed_config)
c_send_pipe.send(config)
# pylint: disable=broad-except
except Exception as e:
c_send_pipe.send(e)
process = Process(target=process_fun, args=(c_send_pipe,),
name=f"worker_agent_get_agents_config_from_worker")
process.start()
process.join()
assert not process.is_alive()
if p_recv_pipe.poll(0.1):
config = p_recv_pipe.recv()
if isinstance(config, Exception):
raise config
distributed_config = config.get()
return distributed_config
raise RuntimeError(f"Failed to get agents config from worker")


def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files=None,
agent_start_port=7000, agent_ip=None, rank_start=None):
r"""
@@ -390,7 +464,8 @@ def startup_worker_agents(worker_ip, worker_port, model_files, group_config_file
group_config_files = check_type.check_and_as_str_tuple_list("group_config_files", group_config_files)

ExitSignalHandle_.start()
distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port)
distributed_config = _get_worker_distributed_config(worker_ip, worker_port)
# distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port)

# get machine ip
rank_list = distributed_config.rank_list


+ 22
- 8
tests/st/distribute_agent_fault/kill_15_agent.sh View File

@@ -184,7 +184,28 @@ kill_agent()
then
echo "kill agent failed"
fi
sleep 5

num=`ps -ef | grep agent.py | grep -v grep | wc -l`
count=0
while [[ ${num} -ne 0 && ${count} -lt 10 ]]
do
sleep 1
count=$(($count+1))
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
done

if [ ${count} -eq 10 ]
then
echo "agent exit failed"
echo $num
ps -ef | grep agent.py | grep -v grep
echo "------------------------------ agent failed log begin: "
cat agent.log
echo "------------------------------ agent failed log end"
clean_pid && exit 1
fi
sleep 1

num=`ps -ef | grep master.py | grep -v grep | wc -l`
if [ $num -ne 1 ]
then
@@ -199,13 +220,6 @@ kill_agent()
echo $num
clean_pid && exit 1
fi
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
then
echo "agent exit failed"
echo $num
clean_pid && exit 1
fi
}

test_agent_fault_model()


+ 22
- 8
tests/st/distribute_master_fault/kill_15_master.sh View File

@@ -186,25 +186,39 @@ kill_master()
then
echo "kill master failed"
fi
sleep 5
num=`ps -ef | grep master.py | grep -v grep | wc -l`
if [ $num -ne 0 ]

num=`ps -ef | grep agent.py | grep -v grep | wc -l`
count=0
while [[ ${num} -ne 0 && ${count} -lt 10 ]]
do
sleep 1
count=$(($count+1))
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
done

if [ ${count} -eq 10 ]
then
echo "master exit failed"
echo "agent exit failed"
echo $num
ps -ef | grep agent.py | grep -v grep
echo "------------------------------ agent failed log begin: "
cat agent.log
echo "------------------------------ agent failed log end"
clean_pid && exit 1
fi
num=`ps -ef | grep worker.py | grep -v grep | wc -l`
sleep 1

num=`ps -ef | grep master.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
then
echo "worker exit failed"
echo "master exit failed"
echo $num
clean_pid && exit 1
fi
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
num=`ps -ef | grep worker.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
then
echo "agent exit failed"
echo "worker exit failed"
echo $num
clean_pid && exit 1
fi


+ 16
- 2
tests/st/distribute_master_with_worker/kill_15_agent.sh View File

@@ -145,14 +145,28 @@ kill_agent()
then
echo "kill agent failed"
fi
sleep 5
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
count=0
while [[ ${num} -ne 0 && ${count} -lt 10 ]]
do
sleep 1
count=$(($count+1))
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
done

if [ ${count} -eq 10 ]
then
echo "agent exit failed"
echo $num
ps -ef | grep agent.py | grep -v grep
echo "------------------------------ agent failed log begin: "
cat agent.log
echo "------------------------------ agent failed log end"
clean_pid && exit 1
fi
sleep 1

num=`ps -ef | grep master_with_worker.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
then


+ 16
- 2
tests/st/distribute_master_with_worker/kill_15_master.sh View File

@@ -144,14 +144,28 @@ kill_master_with_worker()
then
echo "kill master_with_worker failed"
fi
sleep 5
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
count=0
while [[ ${num} -ne 0 && ${count} -lt 10 ]]
do
sleep 1
count=$(($count+1))
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
done

if [ ${count} -eq 10 ]
then
echo "agent exit failed"
echo $num
ps -ef | grep agent.py | grep -v grep
echo "------------------------------ agent failed log begin: "
cat agent.log
echo "------------------------------ agent failed log end"
clean_pid && exit 1
fi
sleep 1

num=`ps -ef | grep master_with_worker.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
then


+ 22
- 8
tests/st/distribute_worker_fault/kill_15_worker.sh View File

@@ -186,7 +186,28 @@ kill_agent()
then
echo "kill worker failed"
fi
sleep 5

num=`ps -ef | grep agent.py | grep -v grep | wc -l`
count=0
while [[ ${num} -ne 0 && ${count} -lt 10 ]]
do
sleep 1
count=$(($count+1))
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
done

if [ ${count} -eq 10 ]
then
echo "agent exit failed"
echo $num
ps -ef | grep agent.py | grep -v grep
echo "------------------------------ agent failed log begin: "
cat agent.log
echo "------------------------------ agent failed log end"
clean_pid && exit 1
fi
sleep 1

num=`ps -ef | grep master.py | grep -v grep | wc -l`
if [ $num -ne 1 ]
then
@@ -201,13 +222,6 @@ kill_agent()
echo $num
clean_pid && exit 1
fi
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
if [ $num -ne 0 ]
then
echo "agent exit failed"
echo $num
clean_pid && exit 1
fi
}

test_agent_fault_model()


+ 22
- 1
tests/st/matmul_distributed/matmul_distribute.sh View File

@@ -34,7 +34,28 @@ clean_master_pid()
then
echo "clean master pip failed"
fi
sleep 6

num=`ps -ef | grep agent.py | grep -v grep | wc -l`
count=0
while [[ ${num} -ne 0 && ${count} -lt 10 ]]
do
sleep 1
count=$(($count+1))
num=`ps -ef | grep agent.py | grep -v grep | wc -l`
done

if [ ${count} -eq 10 ]
then
echo "agent exit failed"
echo $num
ps -ef | grep agent.py | grep -v grep
echo "------------------------------ agent failed log begin: "
cat agent.log
echo "------------------------------ agent failed log end"
clean_pid && exit 1
fi
sleep 1

ps aux | grep 'master.py' | grep ${CURRUSER} | grep -v grep
if [ $? -eq 0 ]
then


+ 1
- 1
third_party/mindspore

@@ -1 +1 @@
Subproject commit 423026c1ac8488417a4d8022217a573598ad132b
Subproject commit 6b8bef2c8afe3f9890cee8e866771dd4b1d23d16

Loading…
Cancel
Save