Browse Source

Serving, update agents listening

tags/v1.2.0
xuyongfei 5 years ago
parent
commit
9ed8a3c0f3
11 changed files with 103 additions and 45 deletions
  1. +4
    -0
      mindspore_serving/ccsrc/python/agent/agent_py.cc
  2. +1
    -0
      mindspore_serving/ccsrc/python/agent/agent_py.h
  3. +1
    -0
      mindspore_serving/ccsrc/python/serving_py.cc
  4. +5
    -0
      mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.cc
  5. +1
    -0
      mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.h
  6. +5
    -4
      mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc
  7. +18
    -0
      mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc
  8. +1
    -0
      mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h
  9. +4
    -2
      mindspore_serving/proto/ms_distributed.proto
  10. +62
    -38
      mindspore_serving/worker/distributed/agent_startup.py
  11. +1
    -1
      third_party/mindspore

+ 4
- 0
mindspore_serving/ccsrc/python/agent/agent_py.cc View File

@@ -60,4 +60,8 @@ void PyAgent::StopAndClear() {
WorkerAgent::Instance().Clear();
}

void PyAgent::StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip) {
WorkerAgentStartUp::Instance().StartupNotifyExit(worker_ip, worker_port, agent_ip);
}

} // namespace mindspore::serving

+ 1
- 0
mindspore_serving/ccsrc/python/agent/agent_py.h View File

@@ -39,6 +39,7 @@ class MS_API PyAgent {
static void StopAndClear();
// from start up, not agent
static void NotifyFailed(const std::string &worker_ip, uint32_t worker_port);
static void StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip);
};

} // namespace serving


+ 1
- 0
mindspore_serving/ccsrc/python/serving_py.cc View File

@@ -186,6 +186,7 @@ void PyRegWorkerAgent(pybind11::module *m_ptr) {
.def_static("wait_and_clear", &PyAgent::WaitAndClear)
.def_static("stop_and_clear", &PyAgent::StopAndClear)
.def_static("notify_failed", &PyAgent::NotifyFailed)
.def_static("startup_notify_exit", &PyAgent::StartupNotifyExit)
.def_static("start_agent", &PyAgent::StartAgent);

py::class_<AgentStartUpConfig>(m, "AgentStartUpConfig_")


+ 5
- 0
mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.cc View File

@@ -43,5 +43,10 @@ Status WorkerAgentStartUp::NotifyFailed(const std::string &worker_ip, uint32_t w
return GrpcNotifyDistributeWorker::NotifyFailed(worker_ip, worker_port);
}

void WorkerAgentStartUp::StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port,
const std::string &agent_ip) {
GrpcNotifyDistributeWorker::StartupNotifyExit(worker_ip, worker_port, agent_ip);
}

} // namespace serving
} // namespace mindspore

+ 1
- 0
mindspore_serving/ccsrc/worker/distributed_worker/agent_startup.h View File

@@ -36,6 +36,7 @@ class MS_API WorkerAgentStartUp {
Status GetDistributedServableConfig(DistributedServableConfig *config);

Status NotifyFailed(const std::string &worker_ip, uint32_t worker_port);
void StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip);

private:
DistributedServableConfig config_;


+ 5
- 4
mindspore_serving/ccsrc/worker/distributed_worker/distributed_process/distributed_process.cc View File

@@ -43,11 +43,12 @@ grpc::Status MSDistributedImpl::AgentExit(grpc::ServerContext *context, const pr
proto::AgentExitReply *reply) {
MSI_EXCEPTION_IF_NULL(request);
MSI_EXCEPTION_IF_NULL(reply);
watcher_->StopWatch(request->address());
servable_->OnAgentExit();
if (Worker::GetInstance().IsRunning()) {
Worker::GetInstance().StopServable();
if (request->address_choice_case() == proto::AgentExitRequest::kAddress) {
watcher_->StopWatch(request->address());
}
MSI_LOG_INFO << "Agent exit, address: '" << request->address() << "', agent ip: '" << request->agent_ip() << "'";
servable_->OnAgentExit();
Worker::GetInstance().StopServable();
return grpc::Status::OK;
}



+ 18
- 0
mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.cc View File

@@ -97,6 +97,24 @@ Status GrpcNotifyDistributeWorker::NotifyFailed(const std::string &worker_ip, ui
return INFER_STATUS_LOG_ERROR(SYSTEM_ERROR) << "Failed to notify failure of agent";
}

void GrpcNotifyDistributeWorker::StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port,
const std::string &agent_ip) {
auto address = worker_ip + ":" + std::to_string(worker_port);
auto channel = GrpcServer::CreateChannel(address);
auto stub = proto::MSWorker::NewStub(channel);

grpc::ClientContext context;
proto::AgentExitRequest request;
request.set_agent_ip(agent_ip);
proto::AgentExitReply reply;
grpc::Status status = stub->AgentExit(&context, request, &reply);
if (status.ok()) {
MSI_LOG(INFO) << "Success to notify exit of agent start up process";
} else {
MSI_LOG(INFO) << "Failed to notify exit of agent start up process";
}
}

Status GrpcNotifyDistributeWorker::GetAgentsConfigsFromWorker(const std::string &worker_ip, uint32_t worker_port,
DistributedServableConfig *config) {
const int32_t REGISTER_TIME_OUT = 60;


+ 1
- 0
mindspore_serving/ccsrc/worker/distributed_worker/notify_distributed/notify_worker.h View File

@@ -39,6 +39,7 @@ class MS_API GrpcNotifyDistributeWorker {
static Status NotifyFailed(const std::string &worker_ip, uint32_t worker_port);
static Status GetAgentsConfigsFromWorker(const std::string &worker_ip, uint32_t worker_port,
DistributedServableConfig *config);
static void StartupNotifyExit(const std::string &worker_ip, uint32_t worker_port, const std::string &agent_ip);

private:
static Status ParseAgentConfigAcquireReply(const proto::AgentConfigAcquireReply &reply,


+ 4
- 2
mindspore_serving/proto/ms_distributed.proto View File

@@ -56,8 +56,10 @@ message AgentRegisterReply {
}

message AgentExitRequest {
repeated AgentSpec agent_spec = 1;
string address = 2;
oneof address_choice {
string address = 1; // by agent process
string agent_ip = 2; // by agent start up process
}
}

message AgentExitReply {


+ 62
- 38
mindspore_serving/worker/distributed/agent_startup.py View File

@@ -80,7 +80,12 @@ def _make_json_table_file(distributed_config):
rank_size = len(distributed_config.rank_list)
runtime_dir = os.path.abspath(".")
time_stamp = str(time.strftime('%Y%m%d_%H%M%S', time.localtime(time.time())))
rank_table_file_name = os.path.join(runtime_dir, f"hccl_rank_table_{time_stamp}_{rank_size}p.json")
rank_table_dir = os.path.join(runtime_dir, "temp_rank_table")
try:
os.mkdir(rank_table_dir)
except FileExistsError:
pass
rank_table_file_name = os.path.join(rank_table_dir, f"hccl_rank_table_{time_stamp}_{rank_size}p.json")
with open(rank_table_file_name, "w") as fp:
fp.write(distributed_config.rank_table_content)
return rank_table_file_name
@@ -88,34 +93,28 @@ def _make_json_table_file(distributed_config):

signal_success = "Success"
signal_exit = "Exit"
signal_heartbeat = "HeartBeat"


def _recv_parent(index, recv_pipe, handle_stop_signal=True):
def _recv_parent(parent_process, index, recv_pipe, handle_stop_signal=True):
"""Receive message from Start up process.
Return False on Ctrl+C(and worker Stop message) Exit Signal, heartbeat failed, and signal_exit.
Return True on receiving signal_success."""
try:
while True:
heartbeat_count = 0
while not recv_pipe.poll(0.1):
if handle_stop_signal and ExitSignalHandle_.has_stopped():
logger.warning(f"Child {index}: Exit on Ctrl+C or stop message from worker")
return False
heartbeat_count += 1
if heartbeat_count >= 30: # 3s
logger.warning(f"Child {index}: Exit on failure of receiving parent message")
if not parent_process.is_running(): # 3s
logger.warning(f"Child {index}: Exit on failure of exit of parent process")
return False
parent_signal = recv_pipe.recv()
if parent_signal != signal_heartbeat:
break
break
if parent_signal == signal_success:
logger.info(f"Child {index}: Receive success")
return True
if parent_signal == signal_exit:
logger.warning(f"Child {index}: Exit on receiving exit message")
else:
logger.warning(f"Child {index}: Exit on receiving unknown message {parent_signal}")
# pylint: disable=broad-except
except Exception as e:
logger.warning(f"Child {index}: Exit on exception: {e}")
@@ -124,22 +123,29 @@ def _recv_parent(index, recv_pipe, handle_stop_signal=True):

def _agent_process(send_pipe, recv_pipe, index, start_config):
"""Agent process"""
parent_process = psutil.Process(os.getppid())
try:
# listening success or failed message from parent process
ExitSignalHandle_.start() # Set flag to running and receive Ctrl+C message
worker_agent.start_worker_agent(start_config=start_config)
send_pipe.send((index, signal_success))
success_msg = _recv_parent(index, recv_pipe)
success_msg = _recv_parent(parent_process, index, recv_pipe)
if not success_msg:
worker_agent.stop()
send_pipe.close()
recv_pipe.close()
while not ExitSignalHandle_.has_stopped():
if not parent_process.is_running():
logger.warning(f"Child {index}, detect parent pid={parent_process.pid} has exited, child begin to exit")
worker_agent.stop()
return
time.sleep(0.1)
# pylint: disable=broad-except
except Exception as e:
traceback.print_exc()
logger.error(f"Child {index}: Catch exception and notify exit of others")
send_pipe.send((index, e))
_recv_parent(index, recv_pipe, False)
_recv_parent(parent_process, index, recv_pipe, False)
logger.error(f"Child {index}: end send message to parent")


@@ -152,18 +158,9 @@ def _send_pipe_msg(send_pipe, msg):
logger.warning(f"Send pipe message exception happen: {e}")


def _send_exit_msg_to_children(send_pipe_list, subprocess_list):
"""Send exit msg to all child processes, and terminate all child processes when they are still alive
def _send_exit_signal_to_children(subprocess_list):
"""Send exit signal to all child processes, and terminate all child processes when they are still alive
in some seconds later"""
index = 0
for send_pipe, process in zip(send_pipe_list, subprocess_list):
if process.is_alive():
logger.warning(f"Send exit message to Child {index}")
_send_pipe_msg(send_pipe, signal_exit)
logger.warning(f"End send exit message to Child {index}")
else:
logger.warning(f"Child {index} is not alive")
index += 1

def wait_exit(wait_seconds, msg):
for i in range(wait_seconds):
@@ -209,11 +206,26 @@ def _send_exit_msg_to_children(send_pipe_list, subprocess_list):
os.kill(item.pid, signal.SIGKILL)
# pylint: disable=broad-except
except Exception as e:
logger.warning(f"Get exception when send signal SIGINT to children of child {index}, exception: {e}")
logger.warning(f"Get exception when send signal SIGKILL to children of child {index}, exception: {e}")
os.kill(process.pid, signal.SIGKILL)


def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list):
def _send_exit_msg_to_children(send_pipe_list, subprocess_list):
"""Send exit msg to all child processes, and terminate all child processes when they are still alive
in some seconds later"""
index = 0
for send_pipe, process in zip(send_pipe_list, subprocess_list):
if process.is_alive():
logger.warning(f"Send exit message to Child {index}")
_send_pipe_msg(send_pipe, signal_exit)
logger.warning(f"End send exit message to Child {index}")
else:
logger.warning(f"Child {index} is not alive")
index += 1
_send_exit_signal_to_children(subprocess_list)


def _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list):
"""Listening child process"""
count = len(send_pipe_list)
for _ in range(count):
@@ -226,8 +238,6 @@ def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_lis
logger.warning("Fail to start agents because of death of one agent")
_send_exit_msg_to_children(send_pipe_list, subprocess_list)
return False
for send_pipe in send_pipe_list:
_send_pipe_msg(send_pipe, signal_heartbeat)

index, msg = p_recv_pipe.recv()
logger.info(f"Receive msg from Child {index}: {msg}")
@@ -241,9 +251,19 @@ def _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_lis
return True


def _startup_all_agents(common_meta, worker_ip, worker_port,
agent_ip, agent_start_port, device_id_list, rank_id_list,
model_files, group_config_files, rank_table_file):
def _listening_agents_after_startup(subprocess_list):
"""Listening agent status after success start up of agents"""
while not ExitSignalHandle_.has_stopped():
for index, process in enumerate(subprocess_list):
if not process.is_alive():
logger.warning(f"Child {index}, pid={process.pid} has exited")
return
time.sleep(0.1)


def _startup_agents(common_meta, worker_ip, worker_port,
agent_ip, agent_start_port, device_id_list, rank_id_list,
model_files, group_config_files, rank_table_file):
"""Start up all agents in one machine"""
servable_name = common_meta.servable_name
send_pipe_list = []
@@ -278,7 +298,7 @@ def _startup_all_agents(common_meta, worker_ip, worker_port,
name=f"{servable_name}_worker_agent_rank{rank_id}_device{device_id}")
process.start()
subprocess_list.append(process)
ret = _start_listening_child_processes(p_recv_pipe, send_pipe_list, subprocess_list)
ret = _listening_agents_when_startup(p_recv_pipe, send_pipe_list, subprocess_list)

msg = f"worker_ip: {worker_ip}, worker_port: {worker_port}, agent_ip: {agent_ip}, " \
f"agent_start_port: {agent_start_port}, device ids: {device_id_list}, rank ids: {rank_id_list}, " \
@@ -287,9 +307,13 @@ def _startup_all_agents(common_meta, worker_ip, worker_port,
WorkerAgent_.notify_failed(worker_ip, worker_port)
logger.info(f"Failed to start agents, {msg}")
print(f"Failed to start agents, {msg}")
else:
logger.info(f"Success to start agents, {msg}")
print(f"Success to start agents, {msg}")
return

logger.info(f"Success to start agents, {msg}")
print(f"Success to start agents, {msg}")
_listening_agents_after_startup(subprocess_list)
WorkerAgent_.startup_notify_exit(worker_ip, worker_port, agent_ip)
_send_exit_signal_to_children(subprocess_list)


def startup_worker_agents(worker_ip, worker_port, model_files, group_config_files=None, agent_start_port=7000):
@@ -354,6 +378,6 @@ def startup_worker_agents(worker_ip, worker_port, model_files, group_config_file

# make json table file and export env
rank_table_file = _make_json_table_file(distributed_config)
_startup_all_agents(distributed_config.common_meta, worker_ip, worker_port, local_ip, agent_start_port,
local_device_id_list, local_rank_id_list,
model_files, group_config_files, rank_table_file)
_startup_agents(distributed_config.common_meta, worker_ip, worker_port, local_ip, agent_start_port,
local_device_id_list, local_rank_id_list,
model_files, group_config_files, rank_table_file)

+ 1
- 1
third_party/mindspore

@@ -1 +1 @@
Subproject commit dd22b5ea7106baf494704be04e2dbaad6887f0ab
Subproject commit e9a5d0248d12c323b1d0320321773d76c764a7bc

Loading…
Cancel
Save