You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.cc 5.7 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /**
  2. * Copyright 2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "core/server.h"
  17. #include <evhttp.h>
  18. #include <event.h>
  19. #include <event2/thread.h>
  20. #include <grpcpp/grpcpp.h>
  21. #include <grpcpp/health_check_service_interface.h>
  22. #include <grpcpp/ext/proto_server_reflection_plugin.h>
  23. #include <future>
  24. #include <memory>
  25. #include <string>
  26. #include <vector>
  27. #include <utility>
  28. #include "include/infer_log.h"
  29. #include "serving/ms_service.grpc.pb.h"
  30. #include "core/util/option_parser.h"
  31. #include "core/version_control/version_controller.h"
  32. #include "core/session.h"
  33. #include "core/serving_tensor.h"
  34. #include "core/http_process.h"
  35. using ms_serving::MSService;
  36. using ms_serving::PredictReply;
  37. using ms_serving::PredictRequest;
  38. namespace mindspore {
  39. namespace serving {
  40. namespace {
  41. static const uint32_t uint32max = 0x7FFFFFFF;
  42. std::promise<void> exit_requested;
  43. void ClearEnv() { Session::Instance().Clear(); }
  44. void HandleSignal(int sig) { exit_requested.set_value(); }
  45. grpc::Status CreatGRPCStatus(const Status &status) {
  46. switch (status.StatusCode()) {
  47. case SUCCESS:
  48. return grpc::Status::OK;
  49. case FAILED:
  50. return grpc::Status::CANCELLED;
  51. case INVALID_INPUTS: {
  52. auto status_msg = status.StatusMessage();
  53. if (status_msg.empty()) {
  54. status_msg = "The Predict Inputs do not match the Model Request!";
  55. }
  56. return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, status_msg);
  57. }
  58. default:
  59. return grpc::Status::CANCELLED;
  60. }
  61. }
  62. } // namespace
  63. // Service Implement
  64. class MSServiceImpl final : public MSService::Service {
  65. grpc::Status Predict(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
  66. std::lock_guard<std::mutex> lock(mutex_);
  67. MSI_TIME_STAMP_START(Predict)
  68. auto res = Session::Instance().Predict(*request, *reply);
  69. MSI_TIME_STAMP_END(Predict)
  70. if (res != inference::SUCCESS) {
  71. return CreatGRPCStatus(res);
  72. }
  73. MSI_LOG(INFO) << "Finish call service Eval";
  74. return grpc::Status::OK;
  75. }
  76. grpc::Status Test(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
  77. MSI_LOG(INFO) << "TestService call";
  78. return grpc::Status::OK;
  79. }
  80. std::mutex mutex_;
  81. };
  82. Status Server::BuildAndStart() {
  83. // handle exit signal
  84. signal(SIGINT, HandleSignal);
  85. signal(SIGTERM, HandleSignal);
  86. Status res;
  87. auto option_args = Options::Instance().GetArgs();
  88. std::string server_address = "0.0.0.0:" + std::to_string(option_args->grpc_port);
  89. std::string model_path = option_args->model_path;
  90. std::string model_name = option_args->model_name;
  91. std::string device_type = option_args->device_type;
  92. auto device_id = option_args->device_id;
  93. res = Session::Instance().CreatDeviceSession(device_type, device_id);
  94. if (res != SUCCESS) {
  95. MSI_LOG(ERROR) << "creat session failed";
  96. ClearEnv();
  97. return res;
  98. }
  99. VersionController version_controller(option_args->poll_model_wait_seconds, model_path, model_name);
  100. res = version_controller.Run();
  101. if (res != SUCCESS) {
  102. MSI_LOG(ERROR) << "load model failed";
  103. ClearEnv();
  104. return res;
  105. }
  106. // init http server
  107. struct evhttp *http_server = NULL;
  108. struct event_base *eb = NULL;
  109. int32_t http_port = option_args->rest_api_port;
  110. std::string http_addr = "0.0.0.0";
  111. event_init();
  112. evthread_use_pthreads();
  113. eb = event_base_new();
  114. http_server = evhttp_new(eb);
  115. evhttp_bind_socket_with_handle(http_server, http_addr.c_str(), http_port);
  116. // http_server = evhttp_start(http_addr.c_str(), http_port);
  117. if (http_server == NULL) {
  118. MSI_LOG(ERROR) << "http server start failed.";
  119. return res;
  120. }
  121. evhttp_set_timeout(http_server, 5);
  122. evhttp_set_gencb(http_server, http_handler_msg, NULL);
  123. // grpc server
  124. MSServiceImpl ms_service;
  125. grpc::EnableDefaultHealthCheckService(true);
  126. grpc::reflection::InitProtoReflectionServerBuilderPlugin();
  127. // Set the port is not reuseable
  128. auto option = grpc::MakeChannelArgumentOption(GRPC_ARG_ALLOW_REUSEPORT, 0);
  129. grpc::ServerBuilder serverBuilder;
  130. serverBuilder.SetOption(std::move(option));
  131. serverBuilder.SetMaxMessageSize(uint32max);
  132. serverBuilder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  133. serverBuilder.RegisterService(&ms_service);
  134. std::unique_ptr<grpc::Server> server(serverBuilder.BuildAndStart());
  135. if (server == nullptr) {
  136. MSI_LOG(ERROR) << "The serving server create failed";
  137. ClearEnv();
  138. return FAILED;
  139. }
  140. auto grpc_server_run = [&server, &server_address]() {
  141. MSI_LOG(INFO) << "MS Serving grpc listening on " << server_address;
  142. server->Wait();
  143. };
  144. auto http_server_run = [&eb, &http_addr, &http_port]() {
  145. MSI_LOG(INFO) << "MS Serving restful listening on " << http_addr << ":" << http_port;
  146. event_base_dispatch(eb);
  147. };
  148. std::thread grpc_thread(grpc_server_run);
  149. std::thread restful_thread(http_server_run);
  150. auto exit_future = exit_requested.get_future();
  151. exit_future.wait();
  152. ClearEnv();
  153. server->Shutdown();
  154. event_base_loopexit(eb, NULL);
  155. grpc_thread.join();
  156. restful_thread.join();
  157. return SUCCESS;
  158. }
  159. } // namespace serving
  160. } // namespace mindspore