You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.cc 7.8 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /**
  2. * Copyright 2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "core/server.h"
  17. #include <grpcpp/grpcpp.h>
  18. #include <grpcpp/health_check_service_interface.h>
  19. #include <grpcpp/ext/proto_server_reflection_plugin.h>
  20. #include <string>
  21. #include <map>
  22. #include <vector>
  23. #include <utility>
  24. #include <memory>
  25. #include <future>
  26. #include <chrono>
  27. #include "include/infer_log.h"
  28. #include "serving/ms_service.grpc.pb.h"
  29. #include "core/util/option_parser.h"
  30. #include "core/version_control/version_controller.h"
  31. #include "core/util/file_system_operation.h"
  32. #include "core/serving_tensor.h"
  33. using ms_serving::MSService;
  34. using ms_serving::PredictReply;
  35. using ms_serving::PredictRequest;
  36. namespace mindspore {
  37. namespace serving {
  38. #define MSI_TIME_STAMP_START(name) auto time_start_##name = std::chrono::steady_clock::now();
  39. #define MSI_TIME_STAMP_END(name) \
  40. { \
  41. auto time_end_##name = std::chrono::steady_clock::now(); \
  42. auto time_cost = std::chrono::duration<double, std::milli>(time_end_##name - time_start_##name).count(); \
  43. MSI_LOG_INFO << #name " Time Cost # " << time_cost << " ms ---------------------"; \
  44. }
  45. Status Session::CreatDeviceSession(const std::string &device, uint32_t device_id) {
  46. session_ = inference::InferSession::CreateSession(device, device_id);
  47. if (session_ == nullptr) {
  48. MSI_LOG(ERROR) << "Creat Session Failed";
  49. return FAILED;
  50. }
  51. device_type_ = device;
  52. return SUCCESS;
  53. }
  54. Session &Session::Instance() {
  55. static Session instance;
  56. return instance;
  57. }
  58. Status Session::Predict(const PredictRequest &request, PredictReply &reply) {
  59. if (!model_loaded_) {
  60. MSI_LOG(ERROR) << "the model has not loaded";
  61. return FAILED;
  62. }
  63. if (session_ == nullptr) {
  64. MSI_LOG(ERROR) << "the inference session has not be initialized";
  65. return FAILED;
  66. }
  67. std::lock_guard<std::mutex> lock(mutex_);
  68. MSI_LOG(INFO) << "run Predict";
  69. if (request.images_size() > 0) {
  70. ServingImagesRequest serving_images(request);
  71. ServingRequest serving_request(request);
  72. ServingReply serving_reply(reply);
  73. Status ret = session_->ExecuteModel(graph_id_, serving_images, serving_request, serving_reply);
  74. if (ret != SUCCESS) {
  75. MSI_LOG(ERROR) << "execute model with images return failed";
  76. return ret;
  77. }
  78. } else if (request.data_size() > 0) {
  79. ServingRequest serving_request(request);
  80. ServingReply serving_reply(reply);
  81. Status ret = session_->ExecuteModel(graph_id_, serving_request, serving_reply);
  82. if (ret != SUCCESS) {
  83. MSI_LOG(ERROR) << "execute model with datas return failed";
  84. return ret;
  85. }
  86. }
  87. MSI_LOG(INFO) << "run Predict finished";
  88. return SUCCESS;
  89. }
  90. Status Session::Warmup(const MindSporeModelPtr model) {
  91. if (session_ == nullptr) {
  92. MSI_LOG(ERROR) << "The CreatDeviceSession should be called, before warmup";
  93. return FAILED;
  94. }
  95. std::lock_guard<std::mutex> lock(mutex_);
  96. std::string file_name = model->GetModelPath() + '/' + model->GetModelName();
  97. model_loaded_ = false;
  98. MSI_TIME_STAMP_START(LoadModelFromFile)
  99. auto ret = session_->LoadModelFromFile(file_name, graph_id_);
  100. MSI_TIME_STAMP_END(LoadModelFromFile)
  101. if (ret != SUCCESS) {
  102. MSI_LOG(ERROR) << "Load graph model failed, file name is " << file_name.c_str();
  103. return ret;
  104. }
  105. model_loaded_ = true;
  106. MSI_LOG(INFO) << "Session Warmup finished";
  107. return SUCCESS;
  108. }
  109. Status Session::Clear() {
  110. if (session_ != nullptr) {
  111. session_->UnloadModel(graph_id_);
  112. session_->FinalizeEnv();
  113. session_ = nullptr;
  114. }
  115. return SUCCESS;
  116. }
  117. namespace {
  118. static const uint32_t uint32max = 0x7FFFFFFF;
  119. std::promise<void> exit_requested;
  120. void ClearEnv() { Session::Instance().Clear(); }
  121. void HandleSignal(int sig) { exit_requested.set_value(); }
  122. grpc::Status CreatGRPCStatus(const Status &status) {
  123. switch (status.StatusCode()) {
  124. case SUCCESS:
  125. return grpc::Status::OK;
  126. case FAILED:
  127. return grpc::Status::CANCELLED;
  128. case INVALID_INPUTS: {
  129. auto status_msg = status.StatusMessage();
  130. if (status_msg.empty()) {
  131. status_msg = "The Predict Inputs do not match the Model Request!";
  132. }
  133. return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, status_msg);
  134. }
  135. default:
  136. return grpc::Status::CANCELLED;
  137. }
  138. }
  139. } // namespace
  140. // Service Implement
  141. class MSServiceImpl final : public MSService::Service {
  142. grpc::Status Predict(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
  143. std::lock_guard<std::mutex> lock(mutex_);
  144. MSI_TIME_STAMP_START(Predict)
  145. auto res = Session::Instance().Predict(*request, *reply);
  146. MSI_TIME_STAMP_END(Predict)
  147. if (res != inference::SUCCESS) {
  148. return CreatGRPCStatus(res);
  149. }
  150. MSI_LOG(INFO) << "Finish call service Eval";
  151. return grpc::Status::OK;
  152. }
  153. grpc::Status Test(grpc::ServerContext *context, const PredictRequest *request, PredictReply *reply) override {
  154. MSI_LOG(INFO) << "TestService call";
  155. return grpc::Status::OK;
  156. }
  157. std::mutex mutex_;
  158. };
  159. Status Server::BuildAndStart() {
  160. // handle exit signal
  161. signal(SIGINT, HandleSignal);
  162. signal(SIGTERM, HandleSignal);
  163. Status res;
  164. auto option_args = Options::Instance().GetArgs();
  165. std::string server_address = "0.0.0.0:" + std::to_string(option_args->grpc_port);
  166. std::string model_path = option_args->model_path;
  167. std::string model_name = option_args->model_name;
  168. std::string device_type = option_args->device_type;
  169. auto device_id = option_args->device_id;
  170. res = Session::Instance().CreatDeviceSession(device_type, device_id);
  171. if (res != SUCCESS) {
  172. MSI_LOG(ERROR) << "creat session failed";
  173. ClearEnv();
  174. return res;
  175. }
  176. VersionController version_controller(option_args->poll_model_wait_seconds, model_path, model_name);
  177. res = version_controller.Run();
  178. if (res != SUCCESS) {
  179. MSI_LOG(ERROR) << "load model failed";
  180. ClearEnv();
  181. return res;
  182. }
  183. MSServiceImpl ms_service;
  184. grpc::EnableDefaultHealthCheckService(true);
  185. grpc::reflection::InitProtoReflectionServerBuilderPlugin();
  186. // Set the port is not reuseable
  187. auto option = grpc::MakeChannelArgumentOption(GRPC_ARG_ALLOW_REUSEPORT, 0);
  188. grpc::ServerBuilder serverBuilder;
  189. serverBuilder.SetOption(std::move(option));
  190. serverBuilder.SetMaxMessageSize(uint32max);
  191. serverBuilder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  192. serverBuilder.RegisterService(&ms_service);
  193. std::unique_ptr<grpc::Server> server(serverBuilder.BuildAndStart());
  194. if (server == nullptr) {
  195. MSI_LOG(ERROR) << "The serving server create failed";
  196. ClearEnv();
  197. return FAILED;
  198. }
  199. auto grpc_server_run = [&server]() { server->Wait(); };
  200. std::thread serving_thread(grpc_server_run);
  201. MSI_LOG(INFO) << "MS Serving listening on " << server_address;
  202. auto exit_future = exit_requested.get_future();
  203. exit_future.wait();
  204. ClearEnv();
  205. server->Shutdown();
  206. serving_thread.join();
  207. return SUCCESS;
  208. }
  209. } // namespace serving
  210. } // namespace mindspore