diff --git a/mindspore_serving/ccsrc/python/agent/agent_py.cc b/mindspore_serving/ccsrc/python/agent/agent_py.cc index c2c1465..52543d3 100644 --- a/mindspore_serving/ccsrc/python/agent/agent_py.cc +++ b/mindspore_serving/ccsrc/python/agent/agent_py.cc @@ -60,4 +60,8 @@ void PyAgent::StopAndClear() { WorkerAgent::Instance().Clear(); } +void PyAgent::StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip) { + WorkerAgentStartUp::Instance().StartupNotifyExit(worker_ip, worker_port, agent_ip); +} + } // namespace mindspore::serving diff --git a/mindspore_serving/ccsrc/python/agent/agent_py.h b/mindspore_serving/ccsrc/python/agent/agent_py.h index 708b673..d5b2447 100644 --- a/mindspore_serving/ccsrc/python/agent/agent_py.h +++ b/mindspore_serving/ccsrc/python/agent/agent_py.h @@ -39,6 +39,7 @@ class MS_API PyAgent { static void StopAndClear(); // from start up, not agent static void NotifyFailed(const std::string &worker_ip, uint32_t worker_port); + static void StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip); }; } // namespace serving diff --git a/mindspore_serving/ccsrc/python/serving_py.cc b/mindspore_serving/ccsrc/python/serving_py.cc index 5e9e950..eb20a2b 100644 --- a/mindspore_serving/ccsrc/python/serving_py.cc +++ b/mindspore_serving/ccsrc/python/serving_py.cc @@ -186,6 +186,7 @@ void PyRegWorkerAgent(pybind11::module *m_ptr) { .def_static("wait_and_clear", &PyAgent::WaitAndClear) .def_static("stop_and_clear", &PyAgent::StopAndClear) .def_static("notify_failed", &PyAgent::NotifyFailed) + .def_static("startup_notify_exit", &PyAgent::StartupNotifyExit) .def_static("start_agent", &PyAgent::StartAgent); py::class_(m, "AgentStartUpConfig_") diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.cc b/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.cc index b0c00b3..da0fa7d 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.cc +++ b/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.cc @@ -43,5 +43,10 @@ Status WorkerAgentStartUp::NotifyFailed(const std::string &worker_ip, uint32_t w return GrpcNotifyDistributeWorker::NotifyFailed(worker_ip, worker_port); } +void WorkerAgentStartUp::StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, + const std::string &agent_ip) { + GrpcNotifyDistributeWorker::StartupNotifyExit(worker_ip, worker_port, agent_ip); +} + } // namespace serving } // namespace mindspore diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.h b/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.h index ad28e5c..07c34a4 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.h +++ b/mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.h @@ -36,6 +36,7 @@ class MS_API WorkerAgentStartUp { Status GetDistributedServableConfig(DistributedServableConfig *config); Status NotifyFailed(const std::string &worker_ip, uint32_t worker_port); + void StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip); private: DistributedServableConfig config_; diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc index d136867..d807502 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc +++ b/mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc @@ -43,11 +43,12 @@ grpc::Status MSDistributedImpl::AgentExit(grpc::ServerContext *context, const pr proto::AgentExitReply *reply) { MSI_EXCEPTION_IF_NULL(request); MSI_EXCEPTION_IF_NULL(reply); - watcher_->StopWatch(request->address()); - servable_->OnAgentExit(); - if (Worker::GetInstance().IsRunning()) { - Worker::GetInstance().StopServable(); + if (request->address_choice_case() == proto::AgentExitRequest::kAddress) { + watcher_->StopWatch(request->address()); } + MSI_LOG_INFO << "Agent exit, address: '" << request->address() << "', agent ip: '" << request->agent_ip() << "'"; + servable_->OnAgentExit(); + Worker::GetInstance().StopServable(); return grpc::Status::OK; } diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc index afe3b38..f2ca441 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc @@ -97,6 +97,24 @@ Status GrpcNotifyDistributeWorker::NotifyFailed(const std::string &worker_ip, ui return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Failed to notify failure of agent"; } +void GrpcNotifyDistributeWorker::StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, + const std::string &agent_ip) { + auto address = worker_ip + ":" + std::to_string(worker_port); + auto channel = GrpcServer::CreateChannel(address); + auto stub = proto::MSWorker::NewStub(channel); + + grpc::ClientContext context; + proto::AgentExitRequest request; + request.set_agent_ip(agent_ip); + proto::AgentExitReply reply; + grpc::Status status = stub->AgentExit(&context, request, &reply); + if (status.ok()) { + MSI_LOG(INFO) << "Success to notify exit of agent start up process"; + } else { + MSI_LOG(INFO) << "Failed to notify exit of agent start up process"; + } +} + Status GrpcNotifyDistributeWorker::GetAgentsConfigsFromWorker(const std::string &worker_ip, uint32_t worker_port, DistributedServableConfig *config) { const int32_t REGISTER_TIME_OUT = 60; diff --git a/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h index 830fba9..ef6e86b 100644 --- a/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h +++ b/mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h @@ -39,6 +39,7 @@ class MS_API GrpcNotifyDistributeWorker { static Status NotifyFailed(const std::string &worker_ip, uint32_t worker_port); static Status GetAgentsConfigsFromWorker(const std::string &worker_ip, uint32_t worker_port, DistributedServableConfig *config); + static void StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip); private: static Status ParseAgentConfigAcquireReply(const proto::AgentConfigAcquireReply &reply, diff --git a/mindspore_serving/proto/ms_distributed.proto b/mindspore_serving/proto/ms_distributed.proto index 9faa0dd..82d0e6f 100644 --- a/mindspore_serving/proto/ms_distributed.proto +++ b/mindspore_serving/proto/ms_distributed.proto @@ -56,8 +56,10 @@ message AgentRegisterReply { } message AgentExitRequest { - repeated AgentSpec agent_spec = 1; - string address = 2; + oneof address_choice { + string address = 1; // by agent process + string agent_ip = 2; // by agent start up process + } } message AgentExitReply { diff --git a/mindspore_serving/worker/distributed/agent_startup.py b/mindspore_serving/worker/distributed/agent_startup.py index 63315fd..c6a895b 100644 --- a/mindspore_serving/worker/distributed/agent_startup.py +++ b/mindspore_serving/worker/distributed/agent_startup.py @@ -80,7 +80,12 @@ def _make_json_table_file(distributed_config): rank_size = len(distributed_config.rank_list) runtime_dir = os.path.abspath(".") time_stamp = str(time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time()))) - rank_table_file_name = os.path.join(runtime_dir, f"hccl_rank_table_{time_stamp}_{rank_size}p.json") + rank_table_dir = os.path.join(runtime_dir, "temp_rank_table") + try: + os.mkdir(rank_table_dir) + except FileExistsError: + pass + rank_table_file_name = os.path.join(rank_table_dir, f"hccl_rank_table_{time_stamp}_{rank_size}p.json") with open(rank_table_file_name, "w") as fp: fp.write(distributed_config.rank_table_content) return rank_table_file_name @@ -88,34 +93,28 @@ def _make_json_table_file(distributed_config): signal_success = "Success" signal_exit = "Exit" -signal_heartbeat = "HeartBeat" -def _recv_parent(index, recv_pipe, handle_stop_signal=True): +def _recv_parent(parent_process, index, recv_pipe, handle_stop_signal=True): """Receive message from Start up process. Return False on Ctrl+C(and worker Stop message) Exit Signal, heartbeat failed, and signal_exit. Return True on receiving signal_success.""" try: while True: - heartbeat_count = 0 while not recv_pipe.poll(0.1): if handle_stop_signal and ExitSignalHandle_.has_stopped(): logger.warning(f"Child {index}: Exit on Ctrl+C or stop message from worker") return False - heartbeat_count += 1 - if heartbeat_count >= 30: # 3s - logger.warning(f"Child {index}: Exit on failure of receiving parent message") + if not parent_process.is_running(): # 3s + logger.warning(f"Child {index}: Exit on failure of exit of parent process") return False parent_signal = recv_pipe.recv() - if parent_signal != signal_heartbeat: - break + break if parent_signal == signal_success: logger.info(f"Child {index}: Receive success") return True if parent_signal == signal_exit: logger.warning(f"Child {index}: Exit on receiving exit message") - else: - logger.warning(f"Child {index}: Exit on receiving unknown message {parent_signal}") # pylint: disable=broad-except except Exception as e: logger.warning(f"Child {index}: Exit on exception: {e}") @@ -124,22 +123,29 @@ def _recv_parent(index, recv_pipe, handle_stop_signal=True): def _agent_process(send_pipe, recv_pipe, index, start_config): """Agent process""" + parent_process = psutil.Process(os.getppid()) try: # listening success or failed message from parent process ExitSignalHandle_.start() # Set flag to running and receive Ctrl+C message worker_agent.start_worker_agent(start_config=start_config) send_pipe.send((index, signal_success)) - success_msg = _recv_parent(index, recv_pipe) + success_msg = _recv_parent(parent_process, index, recv_pipe) if not success_msg: worker_agent.stop() send_pipe.close() recv_pipe.close() + while not ExitSignalHandle_.has_stopped(): + if not parent_process.is_running(): + logger.warning(f"Child {index}, detect parent pid={parent_process.pid} has exited, child begin to exit") + worker_agent.stop() + return + time.sleep(0.1) # pylint: disable=broad-except except Exception as e: traceback.print_exc() logger.error(f"Child {index}: Catch exception and notify exit of others") send_pipe.send((index, e)) - _recv_parent(index, recv_pipe, False) + _recv_parent(parent_process, index, recv_pipe, False) logger.error(f"Child {index}: end send message to parent") @@ -152,18 +158,9 @@ def _send_pipe_msg(send_pipe, msg): logger.warning(f"Send pipe message exception happen: {e}") -def _send_exit_msg_to_children(send_pipe_list, subprocess_list): - """Send exit msg to all child processes, and terminate all child processes when they are still alive +def _send_exit_signal_to_children(subprocess_list): + """Send exit signal to all child processes, and terminate all child processes when they are still alive in some seconds later""" - index = 0 - for send_pipe, process in zip(send_pipe_list, subprocess_list): - if process.is_alive(): - logger.warning(f"Send exit message to Child {index}") - _send_pipe_msg(send_pipe, signal_exit) - logger.warning(f"End send exit message to Child {index}") - else: - logger.warning(f"Child {index} is not alive") - index += 1 def wait_exit(wait_seconds, msg): for i in range(wait_seconds): @@ -209,11 +206,26 @@ def _send_exit_msg_to_children(send_pipe_list, subprocess_list): os.kill(item.pid, signal.SIGKILL) # pylint: disable=broad-except except Exception as e: - logger.warning(f"Get exception when send signal SIGINT to children of child {index}, exception: {e}") + logger.warning(f"Get exception when send signal SIGKILL to children of child {index}, exception: {e}") os.kill(process.pid, signal.SIGKILL) -def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list): +def _send_exit_msg_to_children(send_pipe_list, subprocess_list): + """Send exit msg to all child processes, and terminate all child processes when they are still alive + in some seconds later""" + index = 0 + for send_pipe, process in zip(send_pipe_list, subprocess_list): + if process.is_alive(): + logger.warning(f"Send exit message to Child {index}") + _send_pipe_msg(send_pipe, signal_exit) + logger.warning(f"End send exit message to Child {index}") + else: + logger.warning(f"Child {index} is not alive") + index += 1 + _send_exit_signal_to_children(subprocess_list) + + +def _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list): """Listening child process""" count = len(send_pipe_list) for _ in range(count): @@ -226,8 +238,6 @@ def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_lis logger.warning("Fail to start agents because of death of one agent") _send_exit_msg_to_children(send_pipe_list, subprocess_list) return False - for send_pipe in send_pipe_list: - _send_pipe_msg(send_pipe, signal_heartbeat) index, msg = p_recv_pipe.recv() logger.info(f"Receive msg from Child {index}: {msg}") @@ -241,9 +251,19 @@ def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_lis return True -def _startup_all_agents(common_meta, worker_ip, worker_port, - agent_ip, agent_start_port, device_id_list, rank_id_list, - model_files, group_config_files, rank_table_file): +def _listening_agents_after_startup(subprocess_list): + """Listening agent status after success start up of agents""" + while not ExitSignalHandle_.has_stopped(): + for index, process in enumerate(subprocess_list): + if not process.is_alive(): + logger.warning(f"Child {index}, pid={process.pid} has exited") + return + time.sleep(0.1) + + +def _startup_agents(common_meta, worker_ip, worker_port, + agent_ip, agent_start_port, device_id_list, rank_id_list, + model_files, group_config_files, rank_table_file): """Start up all agents in one machine""" servable_name = common_meta.servable_name send_pipe_list = [] @@ -278,7 +298,7 @@ def _startup_all_agents(common_meta, worker_ip, worker_port, name=f"{servable_name}_worker_agent_rank{rank_id}_device{device_id}") process.start() subprocess_list.append(process) - ret = _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list) + ret = _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list) msg = f"worker_ip: {worker_ip}, worker_port: {worker_port}, agent_ip: {agent_ip}, " \ f"agent_start_port: {agent_start_port}, device ids: {device_id_list}, rank ids: {rank_id_list}, " \ @@ -287,9 +307,13 @@ def _startup_all_agents(common_meta, worker_ip, worker_port, WorkerAgent_.notify_failed(worker_ip, worker_port) logger.info(f"Failed to start agents, {msg}") print(f"Failed to start agents, {msg}") - else: - logger.info(f"Success to start agents, {msg}") - print(f"Success to start agents, {msg}") + return + + logger.info(f"Success to start agents, {msg}") + print(f"Success to start agents, {msg}") + _listening_agents_after_startup(subprocess_list) + WorkerAgent_.startup_notify_exit(worker_ip, worker_port, agent_ip) + _send_exit_signal_to_children(subprocess_list) def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files=None, agent_start_port=7000): @@ -354,6 +378,6 @@ def startup_worker_agents(worker_ip, worker_port, model_files, group_config_file # make json table file and export env rank_table_file = _make_json_table_file(distributed_config) - _startup_all_agents(distributed_config.common_meta, worker_ip, worker_port, local_ip, agent_start_port, - local_device_id_list, local_rank_id_list, - model_files, group_config_files, rank_table_file) + _startup_agents(distributed_config.common_meta, worker_ip, worker_port, local_ip, agent_start_port, + local_device_id_list, local_rank_id_list, + model_files, group_config_files, rank_table_file) diff --git a/third_party/mindspore b/third_party/mindspore index dd22b5e..e9a5d02 160000 --- a/third_party/mindspore +++ b/third_party/mindspore @@ -1 +1 @@ -Subproject commit dd22b5ea7106baf494704be04e2dbaad6887f0ab +Subproject commit e9a5d0248d12c323b1d0320321773d76c764a7bc