Browse Source

!8698 added libevent pthread

From: @anancds
Reviewed-by: 
Signed-off-by:
tags/v1.1.0
mindspore-ci-bot Gitee 5 years ago
parent
commit
f657fcb155
3 changed files with 34 additions and 9 deletions
  1. +21
    -7
      mindspore/ccsrc/ps/core/http_server.cc
  2. +9
    -1
      mindspore/ccsrc/ps/core/http_server.h
  3. +4
    -1
      mindspore/ccsrc/ps/core/tcp_server.cc

+ 21
- 7
mindspore/ccsrc/ps/core/http_server.cc View File

@@ -49,6 +49,13 @@ bool HttpServer::InitServer() {
MS_LOG(EXCEPTION) << "The http server ip:" << server_address_ << " is illegal!"; MS_LOG(EXCEPTION) << "The http server ip:" << server_address_ << " is illegal!";
} }


int result = evthread_use_pthreads();
if (result != 0) {
MS_LOG(EXCEPTION) << "Use event pthread failed!";
}

is_stop_ = false;

event_base_ = event_base_new(); event_base_ = event_base_new();
MS_EXCEPTION_IF_NULL(event_base_); MS_EXCEPTION_IF_NULL(event_base_);
event_http_ = evhttp_new(event_base_); event_http_ = evhttp_new(event_base_);
@@ -146,13 +153,20 @@ bool HttpServer::Start() {


void HttpServer::Stop() { void HttpServer::Stop() {
MS_LOG(INFO) << "Stop http server!"; MS_LOG(INFO) << "Stop http server!";
if (event_http_) {
evhttp_free(event_http_);
event_http_ = nullptr;
}
if (event_base_) {
event_base_free(event_base_);
event_base_ = nullptr;
if (!is_stop_.load()) {
int ret = event_base_loopbreak(event_base_);
if (ret != 0) {
MS_LOG(EXCEPTION) << "event base loop break failed!";
}
if (event_http_) {
evhttp_free(event_http_);
event_http_ = nullptr;
}
if (event_base_) {
event_base_free(event_base_);
event_base_ = nullptr;
}
is_stop_ = true;
} }
} }




+ 9
- 1
mindspore/ccsrc/ps/core/http_server.h View File

@@ -25,6 +25,7 @@
#include <event2/keyvalq_struct.h> #include <event2/keyvalq_struct.h>
#include <event2/listener.h> #include <event2/listener.h>
#include <event2/util.h> #include <event2/util.h>
#include <event2/thread.h>


#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
@@ -32,6 +33,7 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string> #include <string>
#include <atomic>


namespace mindspore { namespace mindspore {
namespace ps { namespace ps {
@@ -55,7 +57,12 @@ class HttpServer {
public: public:
// Server address only support IPV4 now, and should be in format of "x.x.x.x" // Server address only support IPV4 now, and should be in format of "x.x.x.x"
explicit HttpServer(const std::string &address, std::uint16_t port) explicit HttpServer(const std::string &address, std::uint16_t port)
: server_address_(address), server_port_(port), event_base_(nullptr), event_http_(nullptr), is_init_(false) {}
: server_address_(address),
server_port_(port),
event_base_(nullptr),
event_http_(nullptr),
is_init_(false),
is_stop_(true) {}


~HttpServer(); ~HttpServer();


@@ -84,6 +91,7 @@ class HttpServer {
struct event_base *event_base_; struct event_base *event_base_;
struct evhttp *event_http_; struct evhttp *event_http_;
bool is_init_; bool is_init_;
std::atomic<bool> is_stop_;
}; };


} // namespace core } // namespace core


+ 4
- 1
mindspore/ccsrc/ps/core/tcp_server.cc View File

@@ -198,7 +198,10 @@ void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, st
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) { if (!bev) {
MS_LOG(ERROR) << "Error constructing buffer event!"; MS_LOG(ERROR) << "Error constructing buffer event!";
event_base_loopbreak(base);
int ret = event_base_loopbreak(base);
if (ret != 0) {
MS_LOG(EXCEPTION) << "event base loop break failed!";
}
return; return;
} }




Loading…
Cancel
Save