Browse Source

Serving, bug fix

tags/v1.2.0
xuyongfei 5 years ago
parent
commit
bd2f2b8246
5 changed files with 142 additions and 59 deletions
  1. +13
    -8
      mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc
  2. +104
    -41
      mindspore_serving/worker/distributed/agent_startup.py
  3. +9
    -7
      mindspore_serving/worker/distributed/distributed_worker.py
  4. +13
    -1
      mindspore_serving/worker/distributed/register.py
  5. +3
    -2
      mindspore_serving/worker/distributed/worker_agent.py

+ 13
- 8
mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc View File

@@ -59,7 +59,6 @@ Status DistributedServable::PredictInner(const std::vector<TensorBasePtr> &input

proto::DistributedPredictRequest request;
proto::DistributedPredictRequest empty_request;

for (const auto &tensor_ptr : input) {
auto tensor = request.add_inputs();
ProtoTensor proto_tensor(tensor);
@@ -89,7 +88,8 @@ Status DistributedServable::PredictInner(const std::vector<TensorBasePtr> &input
}

for (size_t rank_id = 0; rank_id < msg_list->size(); ++rank_id) {
auto &future = msg_list->at(rank_id).future;
auto &predict_msg = msg_list->at(rank_id);
auto &future = predict_msg.future;
const uint64_t kWaitMaxHundredMs = 10 * 10; // waiting for 10s
uint64_t k;
for (k = 0; k < kWaitMaxHundredMs; k++) {
@@ -104,18 +104,19 @@ Status DistributedServable::PredictInner(const std::vector<TensorBasePtr> &input
if (k >= kWaitMaxHundredMs) {
return INFER_STATUS_LOG_ERROR(FAILED) << "Failed to wait for result of rank " << rank_id;
}
auto status = msg_list->at(rank_id).status;
auto status = predict_msg.status;
if (status != SUCCESS) {
return INFER_STATUS_LOG_ERROR(FAILED)
<< "Error happened on get result of rank " << rank_id << ": " << status.StatusMessage();
}
auto &reply = predict_msg.reply;
if (reply.has_error_msg() && reply.error_msg().error_code() != 0) {
return INFER_STATUS_LOG_ERROR(FAILED)
<< "Error happened on get result of rank " << rank_id << ": " << reply.error_msg().error_msg();
}
}

auto &reply = msg_list->at(result_agent_id).reply;
if (reply.has_error_msg() && reply.error_msg().error_code() != 0) {
return INFER_STATUS_LOG_ERROR(FAILED)
<< "Error happened on get result of rank " << result_agent_id << ": " << reply.error_msg().error_msg();
}
for (int i = 0; i < reply.outputs_size(); ++i) {
auto p = std::make_shared<ProtoTensor>(reply.mutable_outputs(i));
auto tensor_ptr = std::make_shared<Tensor>(p->data_type(), p->shape(), p->data(), p->data_size());
@@ -200,6 +201,7 @@ void DistributedServable::Clear() {

Status DistributedServable::OnAgentExit() {
std::unique_lock<std::mutex> lock{mutex_};
MSI_LOG_INFO << "Worker agent notify exit";
SetWaitAgentsPromise(false);
model_loaded_ = false;
return SUCCESS;
@@ -629,7 +631,10 @@ Status DistributedServable::CheckRankConfig() {
return SUCCESS;
}

void DistributedServable::OnAgentFailed() { SetWaitAgentsPromise(false); }
void DistributedServable::OnAgentFailed() {
MSI_LOG_INFO << "Worker agent notify failed";
SetWaitAgentsPromise(false);
}

} // namespace serving
} // namespace mindspore

+ 104
- 41
mindspore_serving/worker/distributed/agent_startup.py View File

@@ -35,6 +35,7 @@ def _get_local_ip(rank_list, port):
for item in rank_list:
ip_list.add(item.ip)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for ip in ip_list:
try:
s.bind((ip, port))
@@ -58,13 +59,15 @@ def _update_model_files_path(model_files, group_config_files):
raise RuntimeError(f"Cannot access model file '{file_name}'")
model_files_temp.append(file_name)

group_files_temp = []
for item in group_config_files:
file_name = os.path.join(script_dir, item)
if not os.access(file_name, os.R_OK):
raise RuntimeError(f"Cannot access group config file '{file_name}'")
group_files_temp.append(file_name)

if group_config_files is not None:
group_files_temp = []
for item in group_config_files:
file_name = os.path.join(script_dir, item)
if not os.access(file_name, os.R_OK):
raise RuntimeError(f"Cannot access group config file '{file_name}'")
group_files_temp.append(file_name)
else:
group_files_temp = None
logger.info(f"absolute model files: {model_files_temp}")
logger.info(f"absolute group config files: {group_files_temp}")
return model_files_temp, group_files_temp
@@ -138,27 +141,49 @@ def _agent_process(send_pipe, recv_pipe, index, start_config):
logger.error(f"Child {index}: end send message to parent")


def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list):
"""Listening child process"""
def _send_pipe_msg(send_pipe, msg):
"""Send pipe message"""
try:
send_pipe.send(msg)
# pylint: disable=broad-except
except Exception as e:
logger.warning(f"Send pipe message exception happen: {e}")


def send_pipe_msg(send_pipe, msg):
try:
send_pipe.send(msg)
# pylint: disable=broad-except
except Exception as e:
logger.warning(f"Send pipe message exception happen: {e}")
def _send_exit_msg_to_children(send_pipe_list, subprocess_list):
"""Send exit msg to all child processes, and terminate all child processes when they are still alive
in some seconds later"""
index = 0
for send_pipe, process in zip(send_pipe_list, subprocess_list):
if process.is_alive():
logger.warning(f"Send exit message to Child {index}")
_send_pipe_msg(send_pipe, signal_exit)
logger.warning(f"End send exit message to Child {index}")
else:
logger.warning(f"Child {index} is not alive")
index += 1

def send_exit_msg():
index = 0
for send_pipe, process in zip(send_pipe_list, subprocess_list):
wait_seconds = 10
for i in range(wait_seconds):
all_exit = True
for process in subprocess_list:
if process.is_alive():
logger.warning(f"Send exit message to Child {index}")
send_pipe_msg(send_pipe, signal_exit)
logger.warning(f"End send exit message to Child {index}")
else:
logger.warning(f"Child {index} is not alive")
index += 1
logger.warning(f"There are still child processes that have not exited and will be forcibly killed in "
f"{wait_seconds - i} seconds.")
time.sleep(1)
all_exit = False
break
if all_exit:
logger.info(f"All Child process exited")
return
for index, process in enumerate(subprocess_list):
if process.is_alive():
logger.warning(f"Kill Child process {index}")
process.kill()


def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list):
"""Listening child process"""
count = len(send_pipe_list)
for _ in range(count):
while True:
@@ -168,21 +193,20 @@ def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_lis
if process.is_alive():
continue
logger.warning("Fail to start agents because of death of one agent")
send_exit_msg()
_send_exit_msg_to_children(send_pipe_list, subprocess_list)
return False
for send_pipe in send_pipe_list:
send_pipe_msg(send_pipe, signal_heartbeat)
_send_pipe_msg(send_pipe, signal_heartbeat)

index, msg = p_recv_pipe.recv()
logger.info(f"Receive msg from Child {index}: {msg}")
if isinstance(msg, Exception):
logger.warning("Fail to start agents because of exception raise by one agent")
send_exit_msg()
_send_exit_msg_to_children(send_pipe_list, subprocess_list)
return False

for send_pipe in send_pipe_list:
send_pipe_msg(send_pipe, signal_success)
logger.info("Success to start agents")
_send_pipe_msg(send_pipe, signal_success)
return True


@@ -191,12 +215,16 @@ def _startup_all_agents(common_meta, worker_ip, worker_port,
model_files, group_config_files, rank_table_file):
"""Start up all agents in one machine"""
servable_name = common_meta.servable_name
index = 0
send_pipe_list = []
subprocess_list = []
c_send_pipe, p_recv_pipe = Pipe()
for device_id, rank_id, model_file, group_file in zip(device_id_list, rank_id_list, model_files,
group_config_files):
group_file = ""
agents_count = len(device_id_list)
for index in range(agents_count):
device_id, rank_id, model_file = device_id_list[index], rank_id_list[index], model_files[index]
if group_config_files is not None:
group_file = group_config_files[index]

p_send_pipe, c_recv_pipe = Pipe()
send_pipe_list.append(p_send_pipe)

@@ -219,19 +247,53 @@ def _startup_all_agents(common_meta, worker_ip, worker_port,
name=f"{servable_name}_worker_agent_rank{rank_id}_device{device_id}")
process.start()
subprocess_list.append(process)
index += 1
ret = _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list)

msg = f"worker_ip: {worker_ip}, worker_port: {worker_port}, agent_ip: {agent_ip}, " \
f"agent_start_port: {agent_start_port}, device ids: {device_id_list}, rank ids: {rank_id_list}, " \
f"rank table file: {rank_table_file}, model files: {model_files}, group config files: {group_config_files}"
if not ret:
WorkerAgent_.notify_failed(worker_ip, worker_port)


def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files, agent_start_port=7000):
"""Start up all needed worker agents on one machine"""
logger.info(f"Failed to start agents, {msg}")
print(f"Failed to start agents, {msg}")
else:
logger.info(f"Success to start agents, {msg}")
print(f"Success to start agents, {msg}")


def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files=None, agent_start_port=7000):
r"""
Start up all needed worker agenton current machine.

Serving has two running modes. One is running in a single process, providing the Serving service of a single model.
The other includes a master and multiple workers. This interface is for the second scenario.

The master is responsible for providing the Serving access interface for clients,
while the worker is responsible for providing the inference service of the specific model. The communications
between the master and workers through gPRC are defined as (master_ip, master_port) and (worker_ip, worker_port).

Args:
worker_ip (str): The worker ip the agents linked to.
worker_port (int): The worker port the agents linked to.
model_files (list or tuple of str): All model files need in current machine, absolute path or path relative to
this startup python script.
group_config_files (None, list or tuple of str): All group config files need in current machine, absolute path
or path relative to this startup python script, default None, which means there are no configuration files.

Examples:
>>> import os
>>> from mindspore_serving.worker import distributed
>>> model_files = []
>>> for i in range(8):
>>> model_files.append(f"models/device{i}/matmul.mindir")
>>> distributed.startup_worker_agents(worker_ip="127.0.0.1", worker_port=6200, model_files=model_files)
"""
check_type.check_str("worker_ip", worker_ip)
check_type.check_ip_port("worker_port", worker_port)
check_type.check_int("agent_start_port", agent_start_port, 1, 65535 - 7)
model_files = check_type.check_and_as_str_tuple_list("model_files", model_files)
group_config_files = check_type.check_and_as_str_tuple_list("group_config_files", group_config_files)
if group_config_files is not None:
group_config_files = check_type.check_and_as_str_tuple_list("group_config_files", group_config_files)

ExitSignalHandle_.start()
distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port)
@@ -252,9 +314,10 @@ def startup_worker_agents(worker_ip, worker_port, model_files, group_config_file
raise RuntimeError(f"Card count {local_device_id_list} described rank table does not equal to model files size "
f"{len(model_files)}, model files: {model_files}")

if len(local_device_id_list) != len(group_config_files):
raise RuntimeError(f"Card count {local_device_id_list} described rank table does not equal to group config "
f"files size {len(group_config_files)}, group config files: {group_config_files}")
if group_config_files is not None and len(model_files) != len(group_config_files):
raise RuntimeError(f"Model files count {len(model_files)} does not equal to group config files "
f"count {len(group_config_files)} when group_config_files is not None, "
f"model files: {model_files}, group config files: {group_config_files}")

model_files, group_config_files = _update_model_files_path(model_files, group_config_files)



+ 9
- 7
mindspore_serving/worker/distributed/distributed_worker.py View File

@@ -53,12 +53,12 @@ def start_distributed_servable(servable_directory, servable_name, rank_table_jso

Examples:
>>> import os
>>> from mindspore_serving import worker
>>> from mindspore_serving.worker import distributed
>>>
>>> servable_dir = os.path.abspath(".")
>>> worker.start_servable(servable_dir, "lenet", device_id=0, \
... master_ip="127.0.0.1", master_port=6500, \
... host_ip="127.0.0.1", host_port=6600)
>>> distributed.start_distributed_servable(servable_dir, "matmul", rank_table_json_file="hccl_8p.json", \
... worker_ip="127.0.0.1", worker_port=6200, \
... master_ip="127.0.0.1", master_port=6500)
"""
check_type.check_str('servable_directory', servable_directory)
check_type.check_str('servable_name', servable_name)
@@ -75,7 +75,7 @@ def start_distributed_servable(servable_directory, servable_name, rank_table_jso

_load_servable_config(servable_directory, servable_name)
Worker_.start_distributed_servable(servable_directory, servable_name, rank_table_json_file, version_number,
master_ip, master_port, worker_ip, worker_port, wait_agents_time_in_seconds)
worker_ip, worker_port, master_ip, master_port, wait_agents_time_in_seconds)
_start_py_task(Worker_.get_batch_size())
_start_wait_and_clear()

@@ -106,11 +106,13 @@ def start_distributed_servable_in_master(servable_directory, servable_name, rank

Examples:
>>> import os
>>> from mindspore_serving import worker
>>> from mindspore_serving.worker import distributed
>>> from mindspore_serving import master
>>>
>>> servable_dir = os.path.abspath(".")
>>> worker.start_servable_in_master(servable_dir, "lenet", device_id=0)
>>> distributed.start_distributed_servable_in_master(servable_dir, "matmul", \
... rank_table_json_file="hccl_8p.json", \
... worker_ip="127.0.0.1", worker_port=6200)
>>>
>>> master.start_grpc_server("0.0.0.0", 5500)
>>> master.start_restful_server("0.0.0.0", 1500)


+ 13
- 1
mindspore_serving/worker/distributed/register.py View File

@@ -21,7 +21,19 @@ from mindspore_serving import log as logger


def declare_distributed_servable(rank_size, stage_size, with_batch_dim=True, without_batch_dim_inputs=None):
"""declare distributed servable in servable_config.py"""
"""declare distributed servable in servable_config.py.

Args:
rank_size (int): Te rank size of the distributed model.
stage_size (int): The stage size of the distributed model.
with_batch_dim (bool): Whether the first shape dim of the inputs and outputs of model is batch, default True.
without_batch_dim_inputs (None, int, tuple or list of int): Index of inputs that without batch dim
when with_batch_dim is True.

Examples:
>>> from mindspore_serving.worker import distributed
>>> distributed.declare_distributed_servable(rank_size=8, stage_size=1)
"""
check_type.check_bool('with_batch_dim', with_batch_dim)

meta = ServableMeta_()


+ 3
- 2
mindspore_serving/worker/distributed/worker_agent.py View File

@@ -31,12 +31,13 @@ def start_worker_agent(start_config):
os.environ["RANK_ID"] = str(start_config.rank_id)
os.environ["DEVICE_ID"] = str(start_config.device_id)
os.environ["MS_ENABLE_HCCL"] = "1"
os.environ["PARA_GROUP_FILE"] = start_config.group_file_name
if start_config.group_file_name:
os.environ["PARA_GROUP_FILE"] = start_config.group_file_name
os.environ["RANK_TABLE_FILE"] = start_config.rank_table_json_file_name

for item in ("RANK_ID", "DEVICE_ID", "MS_ENABLE_HCCL", "PARA_GROUP_FILE", "RANK_TABLE_FILE",
"LD_LIBRARY_PATH", "PYTHONPATH"):
logger.info(f"Env {item}: {os.getenv(item, '')}")
logger.info(f"Env {item}: {os.getenv(item, None)}")
WorkerAgent_.start_agent(start_config)

start_wait_and_clear()


Loading…
Cancel
Save