You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.cc 11 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. /**
  2. * Copyright 2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "core/server.h"
  17. #include <evhttp.h>
  18. #include <event.h>
  19. #include <event2/thread.h>
  20. #include <event2/listener.h>
  21. #include <grpcpp/grpcpp.h>
  22. #include <grpcpp/health_check_service_interface.h>
  23. #include <grpcpp/ext/proto_server_reflection_plugin.h>
  24. #include <future>
  25. #include <memory>
  26. #include <string>
  27. #include <vector>
  28. #include <utility>
  29. #include <atomic>
  30. #include "include/infer_log.h"
  31. #include "serving/ms_service.grpc.pb.h"
  32. #include "core/util/option_parser.h"
  33. #include "core/version_control/version_controller.h"
  34. #include "core/session.h"
  35. #include "core/serving_tensor.h"
  36. #include "core/http_process.h"
  37. using ms_serving::MSService;
  38. using ms_serving::PredictReply;
  39. using ms_serving::PredictRequest;
  40. namespace mindspore {
  41. namespace serving {
  42. namespace {
  43. static const uint32_t uint32max = 0x7FFFFFFF;
  44. std::promise<void> exit_requested;
  45. std::atomic_flag has_exited = ATOMIC_FLAG_INIT;
  46. static const char kServerHttpIp[] = "0.0.0.0";
  47. void ClearEnv() { Session::Instance().Clear(); }
  48. void HandleSignal(int sig) {
  49. if (!has_exited.test_and_set()) {
  50. exit_requested.set_value();
  51. }
  52. }
  53. grpc::Status CreatGRPCStatus(const Status &status) {
  54. switch (status.StatusCode()) {
  55. case SUCCESS:
  56. return grpc::Status::OK;
  57. case FAILED:
  58. return grpc::Status::CANCELLED;
  59. case INVALID_INPUTS: {
  60. auto status_msg = status.StatusMessage();
  61. if (status_msg.empty()) {
  62. status_msg = "The Predict Inputs do not match the Model Request!";
  63. }
  64. return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, status_msg);
  65. }
  66. default:
  67. return grpc::Status::CANCELLED;
  68. }
  69. }
  70. } // namespace
  71. // Service Implement
  72. class MSServiceImpl final : public MSService::Service {
  73. grpc::Status Predict(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
  74. std::lock_guard<std::mutex> lock(mutex_);
  75. MSI_TIME_STAMP_START(Predict)
  76. auto res = Session::Instance().Predict(*request, *reply);
  77. MSI_TIME_STAMP_END(Predict)
  78. if (res != inference::SUCCESS) {
  79. return CreatGRPCStatus(res);
  80. }
  81. MSI_LOG(INFO) << "Finish call service Eval";
  82. return grpc::Status::OK;
  83. }
  84. grpc::Status Test(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
  85. MSI_LOG(INFO) << "TestService call";
  86. return grpc::Status::OK;
  87. }
  88. std::mutex mutex_;
  89. };
  90. static std::pair<struct evhttp *, struct event_base *> NewHttpServer() {
  91. auto option_args = Options::Instance().GetArgs();
  92. int32_t http_port = option_args->rest_api_port;
  93. // init http server
  94. event_init();
  95. evthread_use_pthreads();
  96. struct event_base *eb = event_base_new();
  97. if (eb == nullptr) {
  98. MSI_LOG(ERROR) << "Serving Error: RESTful server start failed, new http event failed";
  99. std::cout << "Serving Error: RESTful server start failed, new http event failed" << std::endl;
  100. return std::make_pair(nullptr, nullptr);
  101. }
  102. struct evhttp *http_server = evhttp_new(eb);
  103. if (http_server == nullptr) {
  104. MSI_LOG(ERROR) << "Serving Error: RESTful server start failed, create http server faild";
  105. std::cout << "Serving Error: RESTful server start failed, create http server faild" << std::endl;
  106. event_base_free(eb);
  107. return std::make_pair(nullptr, nullptr);
  108. }
  109. struct sockaddr_in sin = {};
  110. sin.sin_family = AF_INET;
  111. sin.sin_port = htons(http_port);
  112. auto listener =
  113. evconnlistener_new_bind(eb, nullptr, nullptr, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_EXEC | LEV_OPT_CLOSE_ON_FREE, -1,
  114. reinterpret_cast<struct sockaddr *>(&sin), sizeof(sin));
  115. if (listener == nullptr) {
  116. MSI_LOG_ERROR << "Serving Error: RESTful server start failed, create http listener faild, port " << http_port;
  117. std::cout << "Serving Error: RESTful server start failed, create http listener faild, port " << http_port
  118. << std::endl;
  119. event_base_free(eb);
  120. evhttp_free(http_server);
  121. return std::make_pair(nullptr, nullptr);
  122. }
  123. auto bound = evhttp_bind_listener(http_server, listener);
  124. if (bound == nullptr) {
  125. MSI_LOG_ERROR << "Serving Error: RESTful server start failed, bind http listener to server faild, port "
  126. << http_port;
  127. std::cout << "Serving Error: RESTful server start failed, bind http listener to server faild, port " << http_port
  128. << std::endl;
  129. evconnlistener_free(listener);
  130. event_base_free(eb);
  131. evhttp_free(http_server);
  132. return std::make_pair(nullptr, nullptr);
  133. }
  134. return std::make_pair(http_server, eb);
  135. }
  136. Status BuildAndStartModelInner() {
  137. Status res;
  138. auto option_args = Options::Instance().GetArgs();
  139. std::string model_path = option_args->model_path;
  140. std::string model_name = option_args->model_name;
  141. std::string device_type = option_args->device_type;
  142. auto device_id = option_args->device_id;
  143. res = Session::Instance().CreatDeviceSession(device_type, device_id);
  144. if (res != SUCCESS) {
  145. MSI_LOG(ERROR) << "Serving Error: create inference session failed, device type " << device_type << " device id "
  146. << device_id;
  147. std::cout << "Serving Error: create inference session failed, device type " << device_type << " device id "
  148. << device_id << std::endl;
  149. return res;
  150. }
  151. VersionController version_controller(option_args->poll_model_wait_seconds, model_path, model_name);
  152. res = version_controller.Run();
  153. if (res != SUCCESS) {
  154. MSI_LOG(ERROR) << "Serving Error: load model failed, model directory " << option_args->model_path << " model name "
  155. << option_args->model_name;
  156. std::cout << "Serving Error: load model failed, model directory " << option_args->model_path << " model name "
  157. << option_args->model_name << std::endl;
  158. return res;
  159. }
  160. return SUCCESS;
  161. }
  162. Status BuildAndStartModel() {
  163. try {
  164. auto status = BuildAndStartModelInner();
  165. return status;
  166. } catch (const std::bad_alloc &ex) {
  167. MSI_LOG(ERROR) << "Serving Error: malloc memory failed";
  168. std::cout << "Serving Error: malloc memory failed" << std::endl;
  169. } catch (const std::runtime_error &ex) {
  170. MSI_LOG(ERROR) << "Serving Error: runtime error occurred: " << ex.what();
  171. std::cout << "Serving Error: runtime error occurred: " << ex.what() << std::endl;
  172. } catch (const std::exception &ex) {
  173. MSI_LOG(ERROR) << "Serving Error: exception occurred: " << ex.what();
  174. std::cout << "Serving Error: exception occurred: " << ex.what() << std::endl;
  175. } catch (...) {
  176. MSI_LOG(ERROR) << "Serving Error: exception occurred";
  177. std::cout << "Serving Error: exception occurred";
  178. }
  179. return FAILED;
  180. }
  181. Status Server::BuildAndStart() {
  182. // handle exit signal
  183. signal(SIGINT, HandleSignal);
  184. signal(SIGTERM, HandleSignal);
  185. Status res = BuildAndStartModel();
  186. if (res != SUCCESS) {
  187. ClearEnv();
  188. return res;
  189. }
  190. auto option_args = Options::Instance().GetArgs();
  191. std::string server_address = std::string(kServerHttpIp) + ":" + std::to_string(option_args->grpc_port);
  192. auto http_server_new_ret = NewHttpServer();
  193. struct evhttp *http_server = http_server_new_ret.first;
  194. struct event_base *eb = http_server_new_ret.second;
  195. if (http_server == nullptr || eb == nullptr) {
  196. MSI_LOG(ERROR) << "Serving Error: RESTful server start failed";
  197. std::cout << "Serving Error: RESTful server start failed" << std::endl;
  198. ClearEnv();
  199. return FAILED;
  200. }
  201. auto exit_http = [eb, http_server]() {
  202. evhttp_free(http_server);
  203. event_base_free(eb);
  204. };
  205. int32_t http_port = option_args->rest_api_port;
  206. std::string http_addr = kServerHttpIp;
  207. evhttp_set_timeout(http_server, 60);
  208. evhttp_set_gencb(http_server, http_handler_msg, nullptr);
  209. // grpc server
  210. MSServiceImpl ms_service;
  211. grpc::EnableDefaultHealthCheckService(true);
  212. grpc::reflection::InitProtoReflectionServerBuilderPlugin();
  213. // Set the port is not reuseable
  214. auto option = grpc::MakeChannelArgumentOption(GRPC_ARG_ALLOW_REUSEPORT, 0);
  215. grpc::ServerBuilder serverBuilder;
  216. serverBuilder.SetOption(std::move(option));
  217. serverBuilder.SetMaxMessageSize(uint32max);
  218. serverBuilder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  219. serverBuilder.RegisterService(&ms_service);
  220. std::unique_ptr<grpc::Server> server(serverBuilder.BuildAndStart());
  221. if (server == nullptr) {
  222. MSI_LOG(ERROR) << "Serving Error: create server failed, gRPC address " << server_address << ", RESTful address "
  223. << http_addr << ":" << http_port << ", model directory " << option_args->model_path << " model name "
  224. << option_args->model_name << ", device type " << option_args->device_type << ", device id "
  225. << option_args->device_id;
  226. std::cout << "Serving Error: create server failed, gRPC address " << server_address << ", RESTful address "
  227. << http_addr << ":" << http_port << ", model directory " << option_args->model_path << " model name "
  228. << option_args->model_name << ", device type " << option_args->device_type << ", device id "
  229. << option_args->device_id << std::endl;
  230. ClearEnv();
  231. exit_http();
  232. return FAILED;
  233. }
  234. auto grpc_server_run = [&server, &server_address]() {
  235. MSI_LOG(INFO) << "MS Serving grpc listening on " << server_address;
  236. std::cout << "Serving: MS Serving gRPC start success, listening on " << server_address << std::endl;
  237. server->Wait();
  238. };
  239. auto http_server_run = [&eb, &http_addr, &http_port]() {
  240. MSI_LOG(INFO) << "MS Serving restful listening on " << http_addr << ":" << http_port;
  241. std::cout << "Serving: MS Serving RESTful start success, listening on " << http_addr << ":" << http_port
  242. << std::endl;
  243. event_base_dispatch(eb);
  244. };
  245. std::thread grpc_thread(grpc_server_run);
  246. std::thread restful_thread(http_server_run);
  247. auto exit_future = exit_requested.get_future();
  248. exit_future.wait();
  249. ClearEnv();
  250. server->Shutdown();
  251. event_base_loopexit(eb, nullptr);
  252. grpc_thread.join();
  253. restful_thread.join();
  254. exit_http();
  255. return SUCCESS;
  256. }
  257. } // namespace serving
  258. } // namespace mindspore