You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

http_process.cc 18 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. /**
  2. * Copyright 2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <map>
  17. #include <vector>
  18. #include <string>
  19. #include <functional>
  20. #include <utility>
  21. #include <nlohmann/json.hpp>
  22. #include "serving/ms_service.pb.h"
  23. #include "util/status.h"
  24. #include "core/session.h"
  25. #include "core/http_process.h"
  26. #include "core/serving_tensor.h"
  27. using ms_serving::MSService;
  28. using ms_serving::PredictReply;
  29. using ms_serving::PredictRequest;
  30. using nlohmann::json;
  31. namespace mindspore {
  32. namespace serving {
  33. const int BUF_MAX = 0x7FFFFFFF;
  34. static constexpr char HTTP_DATA[] = "data";
  35. static constexpr char HTTP_TENSOR[] = "tensor";
  36. enum HTTP_TYPE { TYPE_DATA = 0, TYPE_TENSOR };
  37. enum HTTP_DATA_TYPE { HTTP_DATA_NONE, HTTP_DATA_INT, HTTP_DATA_FLOAT };
  38. static const std::map<inference::DataType, HTTP_DATA_TYPE> infer_type2_http_type{
  39. {inference::DataType::kMSI_Int32, HTTP_DATA_INT}, {inference::DataType::kMSI_Float32, HTTP_DATA_FLOAT}};
  40. Status GetPostMessage(struct evhttp_request *const req, std::string *const buf) {
  41. Status status(SUCCESS);
  42. size_t post_size = evbuffer_get_length(req->input_buffer);
  43. if (post_size == 0) {
  44. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message invalid");
  45. return status;
  46. } else if (post_size > BUF_MAX) {
  47. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message is bigger than 0x7FFFFFFF.");
  48. return status;
  49. } else {
  50. buf->resize(post_size);
  51. memcpy_s(buf->data(), post_size, evbuffer_pullup(req->input_buffer, -1), post_size);
  52. return status;
  53. }
  54. }
  55. Status CheckRequestValid(struct evhttp_request *const http_request) {
  56. Status status(SUCCESS);
  57. switch (evhttp_request_get_command(http_request)) {
  58. case EVHTTP_REQ_POST:
  59. return status;
  60. default:
  61. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message only support POST right now");
  62. return status;
  63. }
  64. }
  65. void ErrorMessage(struct evhttp_request *const req, Status status) {
  66. json error_json = {{"error_message", status.StatusMessage()}};
  67. std::string out_error_str = error_json.dump();
  68. struct evbuffer *retbuff = evbuffer_new();
  69. evbuffer_add(retbuff, out_error_str.data(), out_error_str.size());
  70. evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
  71. evbuffer_free(retbuff);
  72. }
  73. Status CheckMessageValid(const json &message_info, HTTP_TYPE *const type) {
  74. Status status(SUCCESS);
  75. int count = 0;
  76. if (message_info.find(HTTP_DATA) != message_info.end()) {
  77. *type = TYPE_DATA;
  78. count++;
  79. }
  80. if (message_info.find(HTTP_TENSOR) != message_info.end()) {
  81. *type = TYPE_TENSOR;
  82. count++;
  83. }
  84. if (count != 1) {
  85. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor)");
  86. return status;
  87. }
  88. return status;
  89. }
  90. Status GetDataFromJson(const json &json_data_array, ServingTensor *const request_tensor, size_t data_index,
  91. HTTP_DATA_TYPE type) {
  92. Status status(SUCCESS);
  93. auto type_name = [](const json &json_data) -> std::string {
  94. if (json_data.is_number_integer()) {
  95. return "integer";
  96. } else if (json_data.is_number_float()) {
  97. return "float";
  98. }
  99. return json_data.type_name();
  100. };
  101. size_t array_size = json_data_array.size();
  102. if (type == HTTP_DATA_INT) {
  103. auto data = reinterpret_cast<int32_t *>(request_tensor->mutable_data()) + data_index;
  104. for (size_t k = 0; k < array_size; k++) {
  105. auto &json_data = json_data_array[k];
  106. if (!json_data.is_number_integer()) {
  107. status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected integer, given " << type_name(json_data);
  108. MSI_LOG_ERROR << status.StatusMessage();
  109. return status;
  110. }
  111. data[k] = json_data.get<int32_t>();
  112. }
  113. } else if (type == HTTP_DATA_FLOAT) {
  114. auto data = reinterpret_cast<float *>(request_tensor->mutable_data()) + data_index;
  115. for (size_t k = 0; k < array_size; k++) {
  116. auto &json_data = json_data_array[k];
  117. if (!json_data.is_number_float()) {
  118. status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected float, given " << type_name(json_data);
  119. MSI_LOG_ERROR << status.StatusMessage();
  120. return status;
  121. }
  122. data[k] = json_data.get<float>();
  123. }
  124. }
  125. return SUCCESS;
  126. }
  127. Status RecusiveGetTensor(const json &json_data, size_t depth, ServingTensor *const request_tensor, size_t data_index,
  128. HTTP_DATA_TYPE type) {
  129. Status status(SUCCESS);
  130. std::vector<int64_t> required_shape = request_tensor->shape();
  131. if (depth >= required_shape.size()) {
  132. status = INFER_STATUS(INVALID_INPUTS)
  133. << "input tensor shape dims is more than required dims " << required_shape.size();
  134. MSI_LOG_ERROR << status.StatusMessage();
  135. return status;
  136. }
  137. if (!json_data.is_array()) {
  138. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
  139. return status;
  140. }
  141. if (json_data.size() != static_cast<size_t>(required_shape[depth])) {
  142. status = INFER_STATUS(INVALID_INPUTS)
  143. << "tensor format request is constructed illegally, input tensor shape dim " << depth
  144. << " not match, required " << required_shape[depth] << ", given " << json_data.size();
  145. MSI_LOG_ERROR << status.StatusMessage();
  146. return status;
  147. }
  148. if (depth + 1 < required_shape.size()) {
  149. size_t sub_element_cnt =
  150. std::accumulate(required_shape.begin() + depth + 1, required_shape.end(), 1LL, std::multiplies<size_t>());
  151. for (size_t k = 0; k < json_data.size(); k++) {
  152. status = RecusiveGetTensor(json_data[k], depth + 1, request_tensor, data_index + sub_element_cnt * k, type);
  153. if (status != SUCCESS) {
  154. return status;
  155. }
  156. }
  157. } else {
  158. status = GetDataFromJson(json_data, request_tensor, data_index, type);
  159. if (status != SUCCESS) {
  160. return status;
  161. }
  162. }
  163. return status;
  164. }
  165. std::vector<int64_t> GetJsonArrayShape(const json &json_array) {
  166. std::vector<int64_t> json_shape;
  167. const json *tmp_json = &json_array;
  168. while (tmp_json->is_array()) {
  169. if (tmp_json->empty()) {
  170. break;
  171. }
  172. json_shape.push_back(tmp_json->size());
  173. tmp_json = &tmp_json->at(0);
  174. }
  175. return json_shape;
  176. }
  177. Status TransDataToPredictRequest(const json &message_info, PredictRequest *const request) {
  178. Status status = SUCCESS;
  179. auto tensors = message_info.find(HTTP_DATA);
  180. if (tensors == message_info.end()) {
  181. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have data type");
  182. return status;
  183. }
  184. if (!tensors->is_array()) {
  185. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
  186. return status;
  187. }
  188. auto const &json_shape = GetJsonArrayShape(*tensors);
  189. if (json_shape.size() != 2) { // 2 is data format list deep
  190. status = INFER_STATUS(INVALID_INPUTS)
  191. << "the data format request is constructed illegally, expected list nesting depth 2, given "
  192. << json_shape.size();
  193. MSI_LOG_ERROR << status.StatusMessage();
  194. return status;
  195. }
  196. if (tensors->size() != static_cast<size_t>(request->data_size())) {
  197. status = INFER_STATUS(INVALID_INPUTS)
  198. << "model input count not match, model required " << request->data_size() << ", given " << tensors->size();
  199. MSI_LOG_ERROR << status.StatusMessage();
  200. return status;
  201. }
  202. for (size_t i = 0; i < tensors->size(); i++) {
  203. const auto &tensor = tensors->at(i);
  204. ServingTensor request_tensor(*(request->mutable_data(i)));
  205. auto iter = infer_type2_http_type.find(request_tensor.data_type());
  206. if (iter == infer_type2_http_type.end()) {
  207. ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
  208. return status;
  209. }
  210. HTTP_DATA_TYPE type = iter->second;
  211. if (!tensor.is_array()) {
  212. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
  213. return status;
  214. }
  215. if (tensor.empty()) {
  216. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor is null");
  217. return status;
  218. }
  219. if (tensor.size() != static_cast<size_t>(request_tensor.ElementNum())) {
  220. status = INFER_STATUS(INVALID_INPUTS) << "input " << i << " element count not match, model required "
  221. << request_tensor.ElementNum() << ", given " << tensor.size();
  222. MSI_LOG_ERROR << status.StatusMessage();
  223. return status;
  224. }
  225. status = GetDataFromJson(tensor, &request_tensor, 0, type);
  226. if (status != SUCCESS) {
  227. return status;
  228. }
  229. }
  230. return SUCCESS;
  231. }
  232. Status TransTensorToPredictRequest(const json &message_info, PredictRequest *const request) {
  233. Status status(SUCCESS);
  234. auto tensors = message_info.find(HTTP_TENSOR);
  235. if (tensors == message_info.end()) {
  236. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have tensor type");
  237. return status;
  238. }
  239. if (!tensors->is_array()) {
  240. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
  241. return status;
  242. }
  243. if (tensors->size() != static_cast<size_t>(request->data_size())) {
  244. status =
  245. INFER_STATUS(INVALID_INPUTS)
  246. << "model input count not match or json tensor request is constructed illegally, model input count required "
  247. << request->data_size() << ", given " << tensors->size();
  248. MSI_LOG_ERROR << status.StatusMessage();
  249. return status;
  250. }
  251. for (size_t i = 0; i < tensors->size(); i++) {
  252. const auto &tensor = tensors->at(i);
  253. ServingTensor request_tensor(*(request->mutable_data(i)));
  254. // check data shape
  255. auto const &json_shape = GetJsonArrayShape(tensor);
  256. if (json_shape != request_tensor.shape()) { // data shape not match
  257. status = INFER_STATUS(INVALID_INPUTS)
  258. << "input " << i << " shape is invalid, expected " << request_tensor.shape() << ", given " << json_shape;
  259. MSI_LOG_ERROR << status.StatusMessage();
  260. return status;
  261. }
  262. auto iter = infer_type2_http_type.find(request_tensor.data_type());
  263. if (iter == infer_type2_http_type.end()) {
  264. ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
  265. return status;
  266. }
  267. HTTP_DATA_TYPE type = iter->second;
  268. size_t depth = 0;
  269. size_t data_index = 0;
  270. status = RecusiveGetTensor(tensor, depth, &request_tensor, data_index, type);
  271. if (status != SUCCESS) {
  272. MSI_LOG_ERROR << "Transfer tensor to predict request failed";
  273. return status;
  274. }
  275. }
  276. return status;
  277. }
  278. Status TransHTTPMsgToPredictRequest(struct evhttp_request *const http_request, PredictRequest *const request,
  279. HTTP_TYPE *const type) {
  280. Status status = CheckRequestValid(http_request);
  281. if (status != SUCCESS) {
  282. return status;
  283. }
  284. std::string post_message;
  285. status = GetPostMessage(http_request, &post_message);
  286. if (status != SUCCESS) {
  287. return status;
  288. }
  289. // get model required shape
  290. std::vector<inference::InferTensor> tensor_list;
  291. status = Session::Instance().GetModelInputsInfo(tensor_list);
  292. if (status != SUCCESS) {
  293. ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed");
  294. return status;
  295. }
  296. for (auto &item : tensor_list) {
  297. auto input = request->add_data();
  298. ServingTensor tensor(*input);
  299. tensor.set_shape(item.shape());
  300. tensor.set_data_type(item.data_type());
  301. int64_t element_num = tensor.ElementNum();
  302. int64_t data_type_size = tensor.GetTypeSize(tensor.data_type());
  303. if (element_num <= 0 || INT64_MAX / element_num < data_type_size) {
  304. ERROR_INFER_STATUS(status, FAILED, "model shape invalid");
  305. return status;
  306. }
  307. tensor.resize_data(element_num * data_type_size);
  308. }
  309. MSI_TIME_STAMP_START(ParseJson)
  310. json message_info;
  311. try {
  312. message_info = nlohmann::json::parse(post_message);
  313. } catch (nlohmann::json::exception &e) {
  314. std::string json_exception = e.what();
  315. std::string error_message = "Illegal JSON format." + json_exception;
  316. ERROR_INFER_STATUS(status, INVALID_INPUTS, error_message);
  317. return status;
  318. }
  319. MSI_TIME_STAMP_END(ParseJson)
  320. status = CheckMessageValid(message_info, type);
  321. if (status != SUCCESS) {
  322. return status;
  323. }
  324. switch (*type) {
  325. case TYPE_DATA:
  326. status = TransDataToPredictRequest(message_info, request);
  327. break;
  328. case TYPE_TENSOR:
  329. status = TransTensorToPredictRequest(message_info, request);
  330. break;
  331. default:
  332. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor)");
  333. return status;
  334. }
  335. return status;
  336. }
  337. Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *const pos, json *const out_json) {
  338. Status status(SUCCESS);
  339. switch (tensor.tensor_type()) {
  340. case ms_serving::MS_INT32: {
  341. auto data = reinterpret_cast<const int *>(tensor.data().data()) + *pos;
  342. std::vector<int32_t> result_tensor(len);
  343. memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(int32_t), data, len * sizeof(int32_t));
  344. *out_json = std::move(result_tensor);
  345. *pos += len;
  346. break;
  347. }
  348. case ms_serving::MS_FLOAT32: {
  349. auto data = reinterpret_cast<const float *>(tensor.data().data()) + *pos;
  350. std::vector<float> result_tensor(len);
  351. (void)memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(float), data, len * sizeof(float));
  352. *out_json = std::move(result_tensor);
  353. *pos += len;
  354. break;
  355. }
  356. default:
  357. MSI_LOG(ERROR) << "the result type is not supported in restful api, type is " << tensor.tensor_type();
  358. ERROR_INFER_STATUS(status, FAILED, "reply have unsupported type");
  359. }
  360. return status;
  361. }
  362. Status TransPredictReplyToData(const PredictReply &reply, json *const out_json) {
  363. Status status(SUCCESS);
  364. for (int i = 0; i < reply.result_size(); i++) {
  365. (*out_json)["data"].push_back(json());
  366. json &tensor_json = (*out_json)["data"].back();
  367. int num = 1;
  368. for (auto j = 0; j < reply.result(i).tensor_shape().dims_size(); j++) {
  369. num *= reply.result(i).tensor_shape().dims(j);
  370. }
  371. int pos = 0;
  372. status = GetJsonFromTensor(reply.result(i), num, &pos, &tensor_json);
  373. if (status != SUCCESS) {
  374. return status;
  375. }
  376. }
  377. return status;
  378. }
  379. Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *const pos, json *const out_json) {
  380. Status status(SUCCESS);
  381. if (depth >= 10) {
  382. ERROR_INFER_STATUS(status, FAILED, "result tensor shape dims is larger than 10");
  383. return status;
  384. }
  385. if (depth == tensor.tensor_shape().dims_size() - 1) {
  386. status = GetJsonFromTensor(tensor, tensor.tensor_shape().dims(depth), pos, out_json);
  387. if (status != SUCCESS) {
  388. return status;
  389. }
  390. } else {
  391. for (int i = 0; i < tensor.tensor_shape().dims(depth); i++) {
  392. out_json->push_back(json());
  393. json &tensor_json = out_json->back();
  394. status = RecusiveGetJson(tensor, depth + 1, pos, &tensor_json);
  395. if (status != SUCCESS) {
  396. return status;
  397. }
  398. }
  399. }
  400. return status;
  401. }
  402. Status TransPredictReplyToTensor(const PredictReply &reply, json *const out_json) {
  403. Status status(SUCCESS);
  404. for (int i = 0; i < reply.result_size(); i++) {
  405. (*out_json)["tensor"].push_back(json());
  406. json &tensor_json = (*out_json)["tensor"].back();
  407. int pos = 0;
  408. status = RecusiveGetJson(reply.result(i), 0, &pos, &tensor_json);
  409. if (status != SUCCESS) {
  410. return status;
  411. }
  412. }
  413. return status;
  414. }
  415. Status TransPredictReplyToHTTPMsg(const PredictReply &reply, const HTTP_TYPE &type, struct evbuffer *const buf) {
  416. Status status(SUCCESS);
  417. json out_json;
  418. switch (type) {
  419. case TYPE_DATA:
  420. status = TransPredictReplyToData(reply, &out_json);
  421. break;
  422. case TYPE_TENSOR:
  423. status = TransPredictReplyToTensor(reply, &out_json);
  424. break;
  425. default:
  426. ERROR_INFER_STATUS(status, FAILED, "http message must have only one type of (data, tensor)");
  427. return status;
  428. }
  429. const std::string &out_str = out_json.dump();
  430. evbuffer_add(buf, out_str.data(), out_str.size());
  431. return status;
  432. }
  433. Status HttpHandleMsgDetail(struct evhttp_request *const req, void *const arg, struct evbuffer *const retbuff) {
  434. PredictRequest request;
  435. PredictReply reply;
  436. HTTP_TYPE type;
  437. MSI_TIME_STAMP_START(ParseRequest)
  438. auto status = TransHTTPMsgToPredictRequest(req, &request, &type);
  439. MSI_TIME_STAMP_END(ParseRequest)
  440. if (status != SUCCESS) {
  441. MSI_LOG(ERROR) << "restful trans to request failed";
  442. return status;
  443. }
  444. MSI_TIME_STAMP_START(Predict)
  445. status = Session::Instance().Predict(request, reply);
  446. MSI_TIME_STAMP_END(Predict)
  447. if (status != SUCCESS) {
  448. MSI_LOG(ERROR) << "restful predict failed";
  449. return status;
  450. }
  451. MSI_TIME_STAMP_START(CreateReplyJson)
  452. status = TransPredictReplyToHTTPMsg(reply, type, retbuff);
  453. MSI_TIME_STAMP_END(CreateReplyJson)
  454. if (status != SUCCESS) {
  455. MSI_LOG(ERROR) << "restful trans to reply failed";
  456. return status;
  457. }
  458. return SUCCESS;
  459. }
  460. void http_handler_msg(struct evhttp_request *const req, void *const arg) {
  461. MSI_TIME_STAMP_START(TotalRestfulPredict)
  462. struct evbuffer *retbuff = evbuffer_new();
  463. if (retbuff == nullptr) {
  464. MSI_LOG_ERROR << "Create event buffer failed";
  465. return;
  466. }
  467. auto status = HttpHandleMsgDetail(req, arg, retbuff);
  468. if (status != SUCCESS) {
  469. ErrorMessage(req, status);
  470. evbuffer_free(retbuff);
  471. return;
  472. }
  473. MSI_TIME_STAMP_START(ReplyJson)
  474. evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
  475. MSI_TIME_STAMP_END(ReplyJson)
  476. evbuffer_free(retbuff);
  477. MSI_TIME_STAMP_END(TotalRestfulPredict)
  478. }
  479. } // namespace serving
  480. } // namespace mindspore