Merge pull request !5161 from ZhangQinghua/mastertags/v1.0.0
| @@ -16,7 +16,7 @@ | |||||
| #include "common/duplex_pipe.h" | #include "common/duplex_pipe.h" | ||||
| #include <signal.h> | |||||
| #include <sys/wait.h> | |||||
| #include <iostream> | #include <iostream> | ||||
| #include <vector> | #include <vector> | ||||
| #include <algorithm> | #include <algorithm> | ||||
| @@ -70,6 +70,8 @@ int DuplexPipe::Open(std::initializer_list<std::string> arg_list, bool append_fd | |||||
| local_stderr_ = dup(STDERR_FILENO); | local_stderr_ = dup(STDERR_FILENO); | ||||
| close(fd1_[0]); | close(fd1_[0]); | ||||
| close(fd2_[1]); | close(fd2_[1]); | ||||
| signal_handler_ = std::make_shared<SignalHandler>(shared_from_this(), pid_); | |||||
| } | } | ||||
| return 0; | return 0; | ||||
| } | } | ||||
| @@ -147,14 +149,58 @@ void DuplexPipe::Close() { | |||||
| close(fd2_[1]); | close(fd2_[1]); | ||||
| } | } | ||||
| void DuplexPipe::Alarm::Set(std::shared_ptr<DuplexPipe> dp, unsigned int interval_secs) { | |||||
| DuplexPipe::SignalHandler::SignalHandler(std::shared_ptr<DuplexPipe> dp, pid_t pid) { | |||||
| dp_ = dp; | dp_ = dp; | ||||
| signal(SIGALRM, SigHandler); | |||||
| child_pid_ = pid; | |||||
| signal(SIGCHLD, SigChildHandler); | |||||
| signal(SIGPIPE, SigPipeHandler); | |||||
| } | |||||
| DuplexPipe::SignalHandler::~SignalHandler() { dp_.reset(); } | |||||
| void DuplexPipe::SignalHandler::SetAlarm(unsigned int interval_secs) { | |||||
| signal(SIGALRM, SigAlarmHandler); | |||||
| alarm(interval_secs); | alarm(interval_secs); | ||||
| } | } | ||||
| void DuplexPipe::Alarm::Cancel() { | |||||
| alarm(0); | |||||
| dp_.reset(); | |||||
| void DuplexPipe::SignalHandler::CancelAlarm() { alarm(0); } | |||||
| void DuplexPipe::SignalHandler::SigAlarmHandler(int sig) { | |||||
| DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_; | |||||
| if (!dp_.expired()) { | |||||
| dp_.lock()->TimeOut(); | |||||
| } | |||||
| } | |||||
| void DuplexPipe::SignalHandler::SigPipeHandler(int sig) { | |||||
| DP_INFO << "Signal: " << sig; | |||||
| if (!dp_.expired()) { | |||||
| dp_.lock()->Close(); | |||||
| } | |||||
| } | |||||
| void DuplexPipe::SignalHandler::SigChildHandler(int sig) { | |||||
| DP_INFO << "Signal: " << sig << ", child_pid_: " << child_pid_; | |||||
| int status; | |||||
| auto pid = waitpid(child_pid_, &status, WNOHANG | WUNTRACED); | |||||
| if (WIFEXITED(status)) { | |||||
| DP_ERROR << "Child exited, status: " << WEXITSTATUS(status) << ", pid: " << pid; | |||||
| if (!dp_.expired()) { | |||||
| dp_.lock()->Close(); | |||||
| } | |||||
| // When run multiple processes by 'mpirun', | |||||
| // parent process never quit even Exception happens, | |||||
| // which caused by MPI_Finalize() never returned. | |||||
| exit(-1); | |||||
| } else if (WIFSTOPPED(status)) { | |||||
| DP_ERROR << "Child stopped, sig: " << WSTOPSIG(status) << ", pid: " << pid; | |||||
| } else if (WIFSIGNALED(status)) { | |||||
| DP_INFO << "Child not exited, signaled, sig: " << WTERMSIG(status) << ", pid: " << pid; | |||||
| } else if (WIFCONTINUED(status)) { | |||||
| DP_INFO << "Child continued, pid: " << pid; | |||||
| } else { | |||||
| DP_ERROR << "Wrong child status: " << status << ", pid: " << pid; | |||||
| } | |||||
| } | } | ||||
| } // namespace mindspore | } // namespace mindspore | ||||
| @@ -18,6 +18,7 @@ | |||||
| #define MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_ | #define MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_ | ||||
| #include <unistd.h> | #include <unistd.h> | ||||
| #include <signal.h> | |||||
| #include <string> | #include <string> | ||||
| #include <memory> | #include <memory> | ||||
| #include <initializer_list> | #include <initializer_list> | ||||
| @@ -61,8 +62,8 @@ class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> { | |||||
| DuplexPipe &operator>>(std::string &buf); | DuplexPipe &operator>>(std::string &buf); | ||||
| private: | private: | ||||
| void SetTimeOut() { alarm_.Set(shared_from_this(), time_out_secs_); } | |||||
| void CancelTimeOut() { alarm_.Cancel(); } | |||||
| void SetTimeOut() { signal_handler_->SetAlarm(time_out_secs_); } | |||||
| void CancelTimeOut() { signal_handler_->CancelAlarm(); } | |||||
| void TimeOut() { | void TimeOut() { | ||||
| if (has_time_out_callback_) { | if (has_time_out_callback_) { | ||||
| time_out_callback_(); | time_out_callback_(); | ||||
| @@ -96,27 +97,27 @@ class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> { | |||||
| int remote_stdout_; | int remote_stdout_; | ||||
| int remote_stderr_; | int remote_stderr_; | ||||
| class Alarm { | |||||
| class SignalHandler { | |||||
| public: | public: | ||||
| Alarm() = default; | |||||
| ~Alarm() = default; | |||||
| SignalHandler(std::shared_ptr<DuplexPipe> dp, pid_t pid); | |||||
| ~SignalHandler(); | |||||
| void Set(std::shared_ptr<DuplexPipe> dp, unsigned int interval_secs); | |||||
| void Cancel(); | |||||
| void SetAlarm(unsigned int interval_secs); | |||||
| void CancelAlarm(); | |||||
| private: | private: | ||||
| static void SigHandler(int sig) { | |||||
| DP_INFO << "Signal: " << sig; | |||||
| dp_->TimeOut(); | |||||
| } | |||||
| static void SigAlarmHandler(int sig); | |||||
| static void SigPipeHandler(int sig); | |||||
| static void SigChildHandler(int sig); | |||||
| inline static std::shared_ptr<DuplexPipe> dp_; | |||||
| inline static std::weak_ptr<DuplexPipe> dp_; | |||||
| inline static pid_t child_pid_; | |||||
| }; | }; | ||||
| unsigned int time_out_secs_ = kTimeOutSeconds; | unsigned int time_out_secs_ = kTimeOutSeconds; | ||||
| bool has_time_out_callback_ = false; | bool has_time_out_callback_ = false; | ||||
| std::function<void()> time_out_callback_; | std::function<void()> time_out_callback_; | ||||
| Alarm alarm_; | |||||
| std::shared_ptr<SignalHandler> signal_handler_; | |||||
| }; | }; | ||||
| } // namespace mindspore | } // namespace mindspore | ||||
| @@ -40,9 +40,9 @@ DuplexPipe &DuplexPipe::operator>>(std::string &buf) { DP_EXCEPTION << "Not supp | |||||
| void DuplexPipe::Close() { DP_EXCEPTION << "Not support for Windows by now."; } | void DuplexPipe::Close() { DP_EXCEPTION << "Not support for Windows by now."; } | ||||
| void DuplexPipe::Alarm::Set(std::shared_ptr<DuplexPipe> dp, unsigned int interval_secs) { | |||||
| void DuplexPipe::SignalHandler::SetAlarm(unsigned int interval_secs) { | |||||
| DP_EXCEPTION << "Not support for Windows by now."; | DP_EXCEPTION << "Not support for Windows by now."; | ||||
| } | } | ||||
| void DuplexPipe::Alarm::Cancel() { DP_EXCEPTION << "Not support for Windows by now."; } | |||||
| void DuplexPipe::SignalHandler::CancelAlarm() { DP_EXCEPTION << "Not support for Windows by now."; } | |||||
| } // namespace mindspore | } // namespace mindspore | ||||