diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc index 47244d0..7ad7352 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc +++ b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_servable.cc @@ -59,7 +59,6 @@ Status DistributedServable::PredictInner(const std::vector &input proto::DistributedPredictRequest request; proto::DistributedPredictRequest empty_request; - for (const auto &tensor_ptr : input) { auto tensor = request.add_inputs(); ProtoTensor proto_tensor(tensor); @@ -89,7 +88,8 @@ Status DistributedServable::PredictInner(const std::vector &input } for (size_t rank_id = 0; rank_id < msg_list->size(); ++rank_id) { - auto &future = msg_list->at(rank_id).future; + auto &predict_msg = msg_list->at(rank_id); + auto &future = predict_msg.future; const uint64_t kWaitMaxHundredMs = 10 * 10; // waiting for 10s uint64_t k; for (k = 0; k < kWaitMaxHundredMs; k++) { @@ -104,18 +104,19 @@ Status DistributedServable::PredictInner(const std::vector &input if (k >= kWaitMaxHundredMs) { return INFER_STATUS_LOG_ERROR(FAILED) << "Failed to wait for result of rank " << rank_id; } - auto status = msg_list->at(rank_id).status; + auto status = predict_msg.status; if (status != SUCCESS) { return INFER_STATUS_LOG_ERROR(FAILED) << "Error happened on get result of rank " << rank_id << ": " << status.StatusMessage(); } + auto &reply = predict_msg.reply; + if (reply.has_error_msg() && reply.error_msg().error_code() != 0) { + return INFER_STATUS_LOG_ERROR(FAILED) + << "Error happened on get result of rank " << rank_id << ": " << reply.error_msg().error_msg(); + } } auto &reply = msg_list->at(result_agent_id).reply; - if (reply.has_error_msg() && reply.error_msg().error_code() != 0) { - return INFER_STATUS_LOG_ERROR(FAILED) - << "Error happened on get result of rank " << result_agent_id << ": " << reply.error_msg().error_msg(); - } for (int i = 0; i < reply.outputs_size(); ++i) { auto p = std::make_shared(reply.mutable_outputs(i)); auto tensor_ptr = std::make_shared(p->data_type(), p->shape(), p->data(), p->data_size()); @@ -200,6 +201,7 @@ void DistributedServable::Clear() { Status DistributedServable::OnAgentExit() { std::unique_lock lock{mutex_}; + MSI_LOG_INFO << "Worker agent notify exit"; SetWaitAgentsPromise(false); model_loaded_ = false; return SUCCESS; @@ -629,7 +631,10 @@ Status DistributedServable::CheckRankConfig() { return SUCCESS; } -void DistributedServable::OnAgentFailed() { SetWaitAgentsPromise(false); } +void DistributedServable::OnAgentFailed() { + MSI_LOG_INFO << "Worker agent notify failed"; + SetWaitAgentsPromise(false); +} } // namespace serving } // namespace mindspore diff --git a/mindspore_serving/worker/distributed/agent_startup.py b/mindspore_serving/worker/distributed/agent_startup.py index 4f77305..e763b5d 100644 --- a/mindspore_serving/worker/distributed/agent_startup.py +++ b/mindspore_serving/worker/distributed/agent_startup.py @@ -35,6 +35,7 @@ def _get_local_ip(rank_list, port): for item in rank_list: ip_list.add(item.ip) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) for ip in ip_list: try: s.bind((ip, port)) @@ -58,13 +59,15 @@ def _update_model_files_path(model_files, group_config_files): raise RuntimeError(f"Cannot access model file '{file_name}'") model_files_temp.append(file_name) - group_files_temp = [] - for item in group_config_files: - file_name = os.path.join(script_dir, item) - if not os.access(file_name, os.R_OK): - raise RuntimeError(f"Cannot access group config file '{file_name}'") - group_files_temp.append(file_name) - + if group_config_files is not None: + group_files_temp = [] + for item in group_config_files: + file_name = os.path.join(script_dir, item) + if not os.access(file_name, os.R_OK): + raise RuntimeError(f"Cannot access group config file '{file_name}'") + group_files_temp.append(file_name) + else: + group_files_temp = None logger.info(f"absolute model files: {model_files_temp}") logger.info(f"absolute group config files: {group_files_temp}") return model_files_temp, group_files_temp @@ -138,27 +141,49 @@ def _agent_process(send_pipe, recv_pipe, index, start_config): logger.error(f"Child {index}: end send message to parent") -def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list): - """Listening child process""" +def _send_pipe_msg(send_pipe, msg): + """Send pipe message""" + try: + send_pipe.send(msg) + # pylint: disable=broad-except + except Exception as e: + logger.warning(f"Send pipe message exception happen: {e}") + - def send_pipe_msg(send_pipe, msg): - try: - send_pipe.send(msg) - # pylint: disable=broad-except - except Exception as e: - logger.warning(f"Send pipe message exception happen: {e}") +def _send_exit_msg_to_children(send_pipe_list, subprocess_list): + """Send exit msg to all child processes, and terminate all child processes when they are still alive + in some seconds later""" + index = 0 + for send_pipe, process in zip(send_pipe_list, subprocess_list): + if process.is_alive(): + logger.warning(f"Send exit message to Child {index}") + _send_pipe_msg(send_pipe, signal_exit) + logger.warning(f"End send exit message to Child {index}") + else: + logger.warning(f"Child {index} is not alive") + index += 1 - def send_exit_msg(): - index = 0 - for send_pipe, process in zip(send_pipe_list, subprocess_list): + wait_seconds = 10 + for i in range(wait_seconds): + all_exit = True + for process in subprocess_list: if process.is_alive(): - logger.warning(f"Send exit message to Child {index}") - send_pipe_msg(send_pipe, signal_exit) - logger.warning(f"End send exit message to Child {index}") - else: - logger.warning(f"Child {index} is not alive") - index += 1 + logger.warning(f"There are still child processes that have not exited and will be forcibly killed in " + f"{wait_seconds - i} seconds.") + time.sleep(1) + all_exit = False + break + if all_exit: + logger.info(f"All Child process exited") + return + for index, process in enumerate(subprocess_list): + if process.is_alive(): + logger.warning(f"Kill Child process {index}") + process.kill() + +def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list): + """Listening child process""" count = len(send_pipe_list) for _ in range(count): while True: @@ -168,21 +193,20 @@ def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_lis if process.is_alive(): continue logger.warning("Fail to start agents because of death of one agent") - send_exit_msg() + _send_exit_msg_to_children(send_pipe_list, subprocess_list) return False for send_pipe in send_pipe_list: - send_pipe_msg(send_pipe, signal_heartbeat) + _send_pipe_msg(send_pipe, signal_heartbeat) index, msg = p_recv_pipe.recv() logger.info(f"Receive msg from Child {index}: {msg}") if isinstance(msg, Exception): logger.warning("Fail to start agents because of exception raise by one agent") - send_exit_msg() + _send_exit_msg_to_children(send_pipe_list, subprocess_list) return False for send_pipe in send_pipe_list: - send_pipe_msg(send_pipe, signal_success) - logger.info("Success to start agents") + _send_pipe_msg(send_pipe, signal_success) return True @@ -191,12 +215,16 @@ def _startup_all_agents(common_meta, worker_ip, worker_port, model_files, group_config_files, rank_table_file): """Start up all agents in one machine""" servable_name = common_meta.servable_name - index = 0 send_pipe_list = [] subprocess_list = [] c_send_pipe, p_recv_pipe = Pipe() - for device_id, rank_id, model_file, group_file in zip(device_id_list, rank_id_list, model_files, - group_config_files): + group_file = "" + agents_count = len(device_id_list) + for index in range(agents_count): + device_id, rank_id, model_file = device_id_list[index], rank_id_list[index], model_files[index] + if group_config_files is not None: + group_file = group_config_files[index] + p_send_pipe, c_recv_pipe = Pipe() send_pipe_list.append(p_send_pipe) @@ -219,19 +247,53 @@ def _startup_all_agents(common_meta, worker_ip, worker_port, name=f"{servable_name}_worker_agent_rank{rank_id}_device{device_id}") process.start() subprocess_list.append(process) - index += 1 ret = _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list) + + msg = f"worker_ip: {worker_ip}, worker_port: {worker_port}, agent_ip: {agent_ip}, " \ + f"agent_start_port: {agent_start_port}, device ids: {device_id_list}, rank ids: {rank_id_list}, " \ + f"rank table file: {rank_table_file}, model files: {model_files}, group config files: {group_config_files}" if not ret: WorkerAgent_.notify_failed(worker_ip, worker_port) - - -def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files, agent_start_port=7000): - """Start up all needed worker agents on one machine""" + logger.info(f"Failed to start agents, {msg}") + print(f"Failed to start agents, {msg}") + else: + logger.info(f"Success to start agents, {msg}") + print(f"Success to start agents, {msg}") + + +def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files=None, agent_start_port=7000): + r""" + Start up all needed worker agenton current machine. + + Serving has two running modes. One is running in a single process, providing the Serving service of a single model. + The other includes a master and multiple workers. This interface is for the second scenario. + + The master is responsible for providing the Serving access interface for clients, + while the worker is responsible for providing the inference service of the specific model. The communications + between the master and workers through gPRC are defined as (master_ip, master_port) and (worker_ip, worker_port). + + Args: + worker_ip (str): The worker ip the agents linked to. + worker_port (int): The worker port the agents linked to. + model_files (list or tuple of str): All model files need in current machine, absolute path or path relative to + this startup python script. + group_config_files (None, list or tuple of str): All group config files need in current machine, absolute path + or path relative to this startup python script, default None, which means there are no configuration files. + + Examples: + >>> import os + >>> from mindspore_serving.worker import distributed + >>> model_files = [] + >>> for i in range(8): + >>> model_files.append(f"models/device{i}/matmul.mindir") + >>> distributed.startup_worker_agents(worker_ip="127.0.0.1", worker_port=6200, model_files=model_files) + """ check_type.check_str("worker_ip", worker_ip) check_type.check_ip_port("worker_port", worker_port) check_type.check_int("agent_start_port", agent_start_port, 1, 65535 - 7) model_files = check_type.check_and_as_str_tuple_list("model_files", model_files) - group_config_files = check_type.check_and_as_str_tuple_list("group_config_files", group_config_files) + if group_config_files is not None: + group_config_files = check_type.check_and_as_str_tuple_list("group_config_files", group_config_files) ExitSignalHandle_.start() distributed_config = WorkerAgent_.get_agents_config_from_worker(worker_ip, worker_port) @@ -252,9 +314,10 @@ def startup_worker_agents(worker_ip, worker_port, model_files, group_config_file raise RuntimeError(f"Card count {local_device_id_list} described rank table does not equal to model files size " f"{len(model_files)}, model files: {model_files}") - if len(local_device_id_list) != len(group_config_files): - raise RuntimeError(f"Card count {local_device_id_list} described rank table does not equal to group config " - f"files size {len(group_config_files)}, group config files: {group_config_files}") + if group_config_files is not None and len(model_files) != len(group_config_files): + raise RuntimeError(f"Model files count {len(model_files)} does not equal to group config files " + f"count {len(group_config_files)} when group_config_files is not None, " + f"model files: {model_files}, group config files: {group_config_files}") model_files, group_config_files = _update_model_files_path(model_files, group_config_files) diff --git a/mindspore_serving/worker/distributed/distributed_worker.py b/mindspore_serving/worker/distributed/distributed_worker.py index a23d150..1c559b0 100644 --- a/mindspore_serving/worker/distributed/distributed_worker.py +++ b/mindspore_serving/worker/distributed/distributed_worker.py @@ -53,12 +53,12 @@ def start_distributed_servable(servable_directory, servable_name, rank_table_jso Examples: >>> import os - >>> from mindspore_serving import worker + >>> from mindspore_serving.worker import distributed >>> >>> servable_dir = os.path.abspath(".") - >>> worker.start_servable(servable_dir, "lenet", device_id=0, \ - ... master_ip="127.0.0.1", master_port=6500, \ - ... host_ip="127.0.0.1", host_port=6600) + >>> distributed.start_distributed_servable(servable_dir, "matmul", rank_table_json_file="hccl_8p.json", \ + ... worker_ip="127.0.0.1", worker_port=6200, \ + ... master_ip="127.0.0.1", master_port=6500) """ check_type.check_str('servable_directory', servable_directory) check_type.check_str('servable_name', servable_name) @@ -75,7 +75,7 @@ def start_distributed_servable(servable_directory, servable_name, rank_table_jso _load_servable_config(servable_directory, servable_name) Worker_.start_distributed_servable(servable_directory, servable_name, rank_table_json_file, version_number, - master_ip, master_port, worker_ip, worker_port, wait_agents_time_in_seconds) + worker_ip, worker_port, master_ip, master_port, wait_agents_time_in_seconds) _start_py_task(Worker_.get_batch_size()) _start_wait_and_clear() @@ -106,11 +106,13 @@ def start_distributed_servable_in_master(servable_directory, servable_name, rank Examples: >>> import os - >>> from mindspore_serving import worker + >>> from mindspore_serving.worker import distributed >>> from mindspore_serving import master >>> >>> servable_dir = os.path.abspath(".") - >>> worker.start_servable_in_master(servable_dir, "lenet", device_id=0) + >>> distributed.start_distributed_servable_in_master(servable_dir, "matmul", \ + ... rank_table_json_file="hccl_8p.json", \ + ... worker_ip="127.0.0.1", worker_port=6200) >>> >>> master.start_grpc_server("0.0.0.0", 5500) >>> master.start_restful_server("0.0.0.0", 1500) diff --git a/mindspore_serving/worker/distributed/register.py b/mindspore_serving/worker/distributed/register.py index 40e3bf2..9e8f571 100644 --- a/mindspore_serving/worker/distributed/register.py +++ b/mindspore_serving/worker/distributed/register.py @@ -21,7 +21,19 @@ from mindspore_serving import log as logger def declare_distributed_servable(rank_size, stage_size, with_batch_dim=True, without_batch_dim_inputs=None): - """declare distributed servable in servable_config.py""" + """declare distributed servable in servable_config.py. + + Args: + rank_size (int): Te rank size of the distributed model. + stage_size (int): The stage size of the distributed model. + with_batch_dim (bool): Whether the first shape dim of the inputs and outputs of model is batch, default True. + without_batch_dim_inputs (None, int, tuple or list of int): Index of inputs that without batch dim + when with_batch_dim is True. + + Examples: + >>> from mindspore_serving.worker import distributed + >>> distributed.declare_distributed_servable(rank_size=8, stage_size=1) + """ check_type.check_bool('with_batch_dim', with_batch_dim) meta = ServableMeta_() diff --git a/mindspore_serving/worker/distributed/worker_agent.py b/mindspore_serving/worker/distributed/worker_agent.py index f29c95c..3043e9f 100644 --- a/mindspore_serving/worker/distributed/worker_agent.py +++ b/mindspore_serving/worker/distributed/worker_agent.py @@ -31,12 +31,13 @@ def start_worker_agent(start_config): os.environ["RANK_ID"] = str(start_config.rank_id) os.environ["DEVICE_ID"] = str(start_config.device_id) os.environ["MS_ENABLE_HCCL"] = "1" - os.environ["PARA_GROUP_FILE"] = start_config.group_file_name + if start_config.group_file_name: + os.environ["PARA_GROUP_FILE"] = start_config.group_file_name os.environ["RANK_TABLE_FILE"] = start_config.rank_table_json_file_name for item in ("RANK_ID", "DEVICE_ID", "MS_ENABLE_HCCL", "PARA_GROUP_FILE", "RANK_TABLE_FILE", "LD_LIBRARY_PATH", "PYTHONPATH"): - logger.info(f"Env {item}: {os.getenv(item, '')}") + logger.info(f"Env {item}: {os.getenv(item, None)}") WorkerAgent_.start_agent(start_config) start_wait_and_clear()