You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

http_process.cc 18 kB

5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. /**
  2. * Copyright 2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <map>
  17. #include <vector>
  18. #include <string>
  19. #include <functional>
  20. #include <utility>
  21. #include <nlohmann/json.hpp>
  22. #include "serving/ms_service.pb.h"
  23. #include "util/status.h"
  24. #include "core/session.h"
  25. #include "core/http_process.h"
  26. #include "core/serving_tensor.h"
  27. using ms_serving::MSService;
  28. using ms_serving::PredictReply;
  29. using ms_serving::PredictRequest;
  30. using nlohmann::json;
  31. namespace mindspore {
  32. namespace serving {
  33. const int BUF_MAX = 0x7FFFFFFF;
  34. static constexpr char HTTP_DATA[] = "data";
  35. static constexpr char HTTP_TENSOR[] = "tensor";
  36. enum HTTP_TYPE { TYPE_DATA = 0, TYPE_TENSOR };
  37. enum HTTP_DATA_TYPE { HTTP_DATA_NONE, HTTP_DATA_INT, HTTP_DATA_FLOAT };
  38. static const std::map<inference::DataType, HTTP_DATA_TYPE> infer_type2_http_type{
  39. {inference::DataType::kMSI_Int32, HTTP_DATA_INT}, {inference::DataType::kMSI_Float32, HTTP_DATA_FLOAT}};
  40. Status GetPostMessage(struct evhttp_request *req, std::string *buf) {
  41. Status status(SUCCESS);
  42. size_t post_size = evbuffer_get_length(req->input_buffer);
  43. if (post_size == 0) {
  44. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message invalid");
  45. return status;
  46. } else if (post_size > BUF_MAX) {
  47. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message is bigger than 0x7FFFFFFF.");
  48. return status;
  49. } else {
  50. buf->resize(post_size);
  51. memcpy_s(buf->data(), post_size, evbuffer_pullup(req->input_buffer, -1), post_size);
  52. return status;
  53. }
  54. }
  55. Status CheckRequestValid(struct evhttp_request *http_request) {
  56. Status status(SUCCESS);
  57. switch (evhttp_request_get_command(http_request)) {
  58. case EVHTTP_REQ_POST:
  59. return status;
  60. default:
  61. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message only support POST right now");
  62. return status;
  63. }
  64. }
  65. void ErrorMessage(struct evhttp_request *req, Status status) {
  66. json error_json = {{"error_message", status.StatusMessage()}};
  67. std::string out_error_str = error_json.dump();
  68. struct evbuffer *retbuff = evbuffer_new();
  69. evbuffer_add(retbuff, out_error_str.data(), out_error_str.size());
  70. evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
  71. evbuffer_free(retbuff);
  72. }
  73. Status CheckMessageValid(const json &message_info, HTTP_TYPE *type) {
  74. Status status(SUCCESS);
  75. int count = 0;
  76. if (message_info.find(HTTP_DATA) != message_info.end()) {
  77. *type = TYPE_DATA;
  78. count++;
  79. }
  80. if (message_info.find(HTTP_TENSOR) != message_info.end()) {
  81. *type = TYPE_TENSOR;
  82. count++;
  83. }
  84. if (count != 1) {
  85. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor)");
  86. return status;
  87. }
  88. return status;
  89. }
  90. Status GetDataFromJson(const json &json_data_array, ServingTensor *request_tensor, size_t data_index,
  91. HTTP_DATA_TYPE type) {
  92. Status status(SUCCESS);
  93. auto type_name = [](const json &json_data) -> std::string {
  94. if (json_data.is_number_integer()) {
  95. return "integer";
  96. } else if (json_data.is_number_float()) {
  97. return "float";
  98. }
  99. return json_data.type_name();
  100. };
  101. size_t array_size = json_data_array.size();
  102. if (type == HTTP_DATA_INT) {
  103. auto data = reinterpret_cast<int32_t *>(request_tensor->mutable_data()) + data_index;
  104. for (size_t k = 0; k < array_size; k++) {
  105. auto &json_data = json_data_array[k];
  106. if (!json_data.is_number_integer()) {
  107. status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected integer, given " << type_name(json_data);
  108. MSI_LOG_ERROR << status.StatusMessage();
  109. return status;
  110. }
  111. data[k] = json_data.get<int32_t>();
  112. }
  113. } else if (type == HTTP_DATA_FLOAT) {
  114. auto data = reinterpret_cast<float *>(request_tensor->mutable_data()) + data_index;
  115. for (size_t k = 0; k < array_size; k++) {
  116. auto &json_data = json_data_array[k];
  117. if (!json_data.is_number_float()) {
  118. status = INFER_STATUS(INVALID_INPUTS) << "get data failed, expected float, given " << type_name(json_data);
  119. MSI_LOG_ERROR << status.StatusMessage();
  120. return status;
  121. }
  122. data[k] = json_data.get<float>();
  123. }
  124. }
  125. return SUCCESS;
  126. }
  127. Status RecusiveGetTensor(const json &json_data, size_t depth, ServingTensor *request_tensor, size_t data_index,
  128. HTTP_DATA_TYPE type) {
  129. Status status(SUCCESS);
  130. std::vector<int64_t> required_shape = request_tensor->shape();
  131. if (depth >= required_shape.size()) {
  132. status = INFER_STATUS(INVALID_INPUTS)
  133. << "input tensor shape dims is more than required dims " << required_shape.size();
  134. MSI_LOG_ERROR << status.StatusMessage();
  135. return status;
  136. }
  137. if (!json_data.is_array()) {
  138. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
  139. return status;
  140. }
  141. if (json_data.size() != static_cast<size_t>(required_shape[depth])) {
  142. status = INFER_STATUS(INVALID_INPUTS)
  143. << "tensor format request is constructed illegally, input tensor shape dim " << depth
  144. << " not match, required " << required_shape[depth] << ", given " << json_data.size();
  145. MSI_LOG_ERROR << status.StatusMessage();
  146. return status;
  147. }
  148. if (depth + 1 < required_shape.size()) {
  149. size_t sub_element_cnt =
  150. std::accumulate(required_shape.begin() + depth + 1, required_shape.end(), 1LL, std::multiplies<size_t>());
  151. for (size_t k = 0; k < json_data.size(); k++) {
  152. status = RecusiveGetTensor(json_data[k], depth + 1, request_tensor, data_index + sub_element_cnt * k, type);
  153. if (status != SUCCESS) {
  154. return status;
  155. }
  156. }
  157. } else {
  158. status = GetDataFromJson(json_data, request_tensor, data_index, type);
  159. if (status != SUCCESS) {
  160. return status;
  161. }
  162. }
  163. return status;
  164. }
  165. std::vector<int64_t> GetJsonArrayShape(const json &json_array) {
  166. std::vector<int64_t> json_shape;
  167. const json *tmp_json = &json_array;
  168. while (tmp_json->is_array()) {
  169. if (tmp_json->empty()) {
  170. break;
  171. }
  172. json_shape.push_back(tmp_json->size());
  173. tmp_json = &tmp_json->at(0);
  174. }
  175. return json_shape;
  176. }
  177. Status TransDataToPredictRequest(const json &message_info, PredictRequest *request) {
  178. Status status = SUCCESS;
  179. auto tensors = message_info.find(HTTP_DATA);
  180. if (tensors == message_info.end()) {
  181. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have data type");
  182. return status;
  183. }
  184. if (!tensors->is_array()) {
  185. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
  186. return status;
  187. }
  188. auto const &json_shape = GetJsonArrayShape(*tensors);
  189. if (json_shape.size() != 2) { // 2 is data format list deep
  190. status = INFER_STATUS(INVALID_INPUTS)
  191. << "the data format request is constructed illegally, expected list nesting depth 2, given "
  192. << json_shape.size();
  193. MSI_LOG_ERROR << status.StatusMessage();
  194. return status;
  195. }
  196. if (tensors->size() != static_cast<size_t>(request->data_size())) {
  197. status = INFER_STATUS(INVALID_INPUTS)
  198. << "model input count not match, model required " << request->data_size() << ", given " << tensors->size();
  199. MSI_LOG_ERROR << status.StatusMessage();
  200. return status;
  201. }
  202. for (size_t i = 0; i < tensors->size(); i++) {
  203. const auto &tensor = tensors->at(i);
  204. ServingTensor request_tensor(*(request->mutable_data(i)));
  205. auto iter = infer_type2_http_type.find(request_tensor.data_type());
  206. if (iter == infer_type2_http_type.end()) {
  207. ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
  208. return status;
  209. }
  210. HTTP_DATA_TYPE type = iter->second;
  211. if (!tensor.is_array()) {
  212. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the tensor is constructed illegally");
  213. return status;
  214. }
  215. if (tensor.empty()) {
  216. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor is null");
  217. return status;
  218. }
  219. if (tensor.size() != static_cast<size_t>(request_tensor.ElementNum())) {
  220. status = INFER_STATUS(INVALID_INPUTS) << "input " << i << " element count not match, model required "
  221. << request_tensor.ElementNum() << ", given " << tensor.size();
  222. MSI_LOG_ERROR << status.StatusMessage();
  223. return status;
  224. }
  225. status = GetDataFromJson(tensor, &request_tensor, 0, type);
  226. if (status != SUCCESS) {
  227. return status;
  228. }
  229. }
  230. return SUCCESS;
  231. }
  232. Status TransTensorToPredictRequest(const json &message_info, PredictRequest *request) {
  233. Status status(SUCCESS);
  234. auto tensors = message_info.find(HTTP_TENSOR);
  235. if (tensors == message_info.end()) {
  236. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message do not have tensor type");
  237. return status;
  238. }
  239. if (!tensors->is_array()) {
  240. ERROR_INFER_STATUS(status, INVALID_INPUTS, "the input tensor list is not array");
  241. return status;
  242. }
  243. if (tensors->size() != static_cast<size_t>(request->data_size())) {
  244. status =
  245. INFER_STATUS(INVALID_INPUTS)
  246. << "model input count not match or json tensor request is constructed illegally, model input count required "
  247. << request->data_size() << ", given " << tensors->size();
  248. MSI_LOG_ERROR << status.StatusMessage();
  249. return status;
  250. }
  251. for (size_t i = 0; i < tensors->size(); i++) {
  252. const auto &tensor = tensors->at(i);
  253. ServingTensor request_tensor(*(request->mutable_data(i)));
  254. // check data shape
  255. auto const &json_shape = GetJsonArrayShape(tensor);
  256. if (json_shape != request_tensor.shape()) { // data shape not match
  257. status = INFER_STATUS(INVALID_INPUTS)
  258. << "input " << i << " shape is invalid, expected " << request_tensor.shape() << ", given " << json_shape;
  259. MSI_LOG_ERROR << status.StatusMessage();
  260. return status;
  261. }
  262. auto iter = infer_type2_http_type.find(request_tensor.data_type());
  263. if (iter == infer_type2_http_type.end()) {
  264. ERROR_INFER_STATUS(status, FAILED, "the model input type is not supported right now");
  265. return status;
  266. }
  267. HTTP_DATA_TYPE type = iter->second;
  268. size_t depth = 0;
  269. size_t data_index = 0;
  270. status = RecusiveGetTensor(tensor, depth, &request_tensor, data_index, type);
  271. if (status != SUCCESS) {
  272. MSI_LOG_ERROR << "Transfer tensor to predict request failed";
  273. return status;
  274. }
  275. }
  276. return status;
  277. }
  278. Status TransHTTPMsgToPredictRequest(struct evhttp_request *http_request, PredictRequest *request, HTTP_TYPE *type) {
  279. Status status = CheckRequestValid(http_request);
  280. if (status != SUCCESS) {
  281. return status;
  282. }
  283. std::string post_message;
  284. status = GetPostMessage(http_request, &post_message);
  285. if (status != SUCCESS) {
  286. return status;
  287. }
  288. // get model required shape
  289. std::vector<inference::InferTensor> tensor_list;
  290. status = Session::Instance().GetModelInputsInfo(tensor_list);
  291. if (status != SUCCESS) {
  292. ERROR_INFER_STATUS(status, FAILED, "get model inputs info failed");
  293. return status;
  294. }
  295. for (auto &item : tensor_list) {
  296. auto input = request->add_data();
  297. ServingTensor tensor(*input);
  298. tensor.set_shape(item.shape());
  299. tensor.set_data_type(item.data_type());
  300. int64_t element_num = tensor.ElementNum();
  301. int64_t data_type_size = tensor.GetTypeSize(tensor.data_type());
  302. if (element_num <= 0 || INT64_MAX / element_num < data_type_size) {
  303. ERROR_INFER_STATUS(status, FAILED, "model shape invalid");
  304. return status;
  305. }
  306. tensor.resize_data(element_num * data_type_size);
  307. }
  308. MSI_TIME_STAMP_START(ParseJson)
  309. json message_info;
  310. try {
  311. message_info = nlohmann::json::parse(post_message);
  312. } catch (nlohmann::json::exception &e) {
  313. std::string json_exception = e.what();
  314. std::string error_message = "Illegal JSON format." + json_exception;
  315. ERROR_INFER_STATUS(status, INVALID_INPUTS, error_message);
  316. return status;
  317. }
  318. MSI_TIME_STAMP_END(ParseJson)
  319. status = CheckMessageValid(message_info, type);
  320. if (status != SUCCESS) {
  321. return status;
  322. }
  323. switch (*type) {
  324. case TYPE_DATA:
  325. status = TransDataToPredictRequest(message_info, request);
  326. break;
  327. case TYPE_TENSOR:
  328. status = TransTensorToPredictRequest(message_info, request);
  329. break;
  330. default:
  331. ERROR_INFER_STATUS(status, INVALID_INPUTS, "http message must have only one type of (data, tensor)");
  332. return status;
  333. }
  334. return status;
  335. }
  336. Status GetJsonFromTensor(const ms_serving::Tensor &tensor, int len, int *pos, json *out_json) {
  337. Status status(SUCCESS);
  338. switch (tensor.tensor_type()) {
  339. case ms_serving::MS_INT32: {
  340. auto data = reinterpret_cast<const int *>(tensor.data().data()) + *pos;
  341. std::vector<int32_t> result_tensor(len);
  342. memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(int32_t), data, len * sizeof(int32_t));
  343. *out_json = std::move(result_tensor);
  344. *pos += len;
  345. break;
  346. }
  347. case ms_serving::MS_FLOAT32: {
  348. auto data = reinterpret_cast<const float *>(tensor.data().data()) + *pos;
  349. std::vector<float> result_tensor(len);
  350. memcpy_s(result_tensor.data(), result_tensor.size() * sizeof(float), data, len * sizeof(float));
  351. *out_json = std::move(result_tensor);
  352. *pos += len;
  353. break;
  354. }
  355. default:
  356. MSI_LOG(ERROR) << "the result type is not supported in restful api, type is " << tensor.tensor_type();
  357. ERROR_INFER_STATUS(status, FAILED, "reply have unsupported type");
  358. }
  359. return status;
  360. }
  361. Status TransPredictReplyToData(const PredictReply &reply, json *out_json) {
  362. Status status(SUCCESS);
  363. for (int i = 0; i < reply.result_size(); i++) {
  364. (*out_json)["data"].push_back(json());
  365. json &tensor_json = (*out_json)["data"].back();
  366. int num = 1;
  367. for (auto j = 0; j < reply.result(i).tensor_shape().dims_size(); j++) {
  368. num *= reply.result(i).tensor_shape().dims(j);
  369. }
  370. int pos = 0;
  371. status = GetJsonFromTensor(reply.result(i), num, &pos, &tensor_json);
  372. if (status != SUCCESS) {
  373. return status;
  374. }
  375. }
  376. return status;
  377. }
  378. Status RecusiveGetJson(const ms_serving::Tensor &tensor, int depth, int *pos, json *out_json) {
  379. Status status(SUCCESS);
  380. if (depth >= 10) {
  381. ERROR_INFER_STATUS(status, FAILED, "result tensor shape dims is larger than 10");
  382. return status;
  383. }
  384. if (depth == tensor.tensor_shape().dims_size() - 1) {
  385. status = GetJsonFromTensor(tensor, tensor.tensor_shape().dims(depth), pos, out_json);
  386. if (status != SUCCESS) {
  387. return status;
  388. }
  389. } else {
  390. for (int i = 0; i < tensor.tensor_shape().dims(depth); i++) {
  391. out_json->push_back(json());
  392. json &tensor_json = out_json->back();
  393. status = RecusiveGetJson(tensor, depth + 1, pos, &tensor_json);
  394. if (status != SUCCESS) {
  395. return status;
  396. }
  397. }
  398. }
  399. return status;
  400. }
  401. Status TransPredictReplyToTensor(const PredictReply &reply, json *out_json) {
  402. Status status(SUCCESS);
  403. for (int i = 0; i < reply.result_size(); i++) {
  404. (*out_json)["tensor"].push_back(json());
  405. json &tensor_json = (*out_json)["tensor"].back();
  406. int pos = 0;
  407. status = RecusiveGetJson(reply.result(i), 0, &pos, &tensor_json);
  408. if (status != SUCCESS) {
  409. return status;
  410. }
  411. }
  412. return status;
  413. }
  414. Status TransPredictReplyToHTTPMsg(const PredictReply &reply, const HTTP_TYPE &type, struct evbuffer *buf) {
  415. Status status(SUCCESS);
  416. json out_json;
  417. switch (type) {
  418. case TYPE_DATA:
  419. status = TransPredictReplyToData(reply, &out_json);
  420. break;
  421. case TYPE_TENSOR:
  422. status = TransPredictReplyToTensor(reply, &out_json);
  423. break;
  424. default:
  425. ERROR_INFER_STATUS(status, FAILED, "http message must have only one type of (data, tensor)");
  426. return status;
  427. }
  428. const std::string &out_str = out_json.dump();
  429. evbuffer_add(buf, out_str.data(), out_str.size());
  430. return status;
  431. }
  432. Status HttpHandleMsgDetail(struct evhttp_request *req, void *arg, struct evbuffer *retbuff) {
  433. PredictRequest request;
  434. PredictReply reply;
  435. HTTP_TYPE type;
  436. MSI_TIME_STAMP_START(ParseRequest)
  437. auto status = TransHTTPMsgToPredictRequest(req, &request, &type);
  438. MSI_TIME_STAMP_END(ParseRequest)
  439. if (status != SUCCESS) {
  440. MSI_LOG(ERROR) << "restful trans to request failed";
  441. return status;
  442. }
  443. MSI_TIME_STAMP_START(Predict)
  444. status = Session::Instance().Predict(request, reply);
  445. MSI_TIME_STAMP_END(Predict)
  446. if (status != SUCCESS) {
  447. MSI_LOG(ERROR) << "restful predict failed";
  448. return status;
  449. }
  450. MSI_TIME_STAMP_START(CreateReplyJson)
  451. status = TransPredictReplyToHTTPMsg(reply, type, retbuff);
  452. MSI_TIME_STAMP_END(CreateReplyJson)
  453. if (status != SUCCESS) {
  454. MSI_LOG(ERROR) << "restful trans to reply failed";
  455. return status;
  456. }
  457. return SUCCESS;
  458. }
  459. void http_handler_msg(struct evhttp_request *req, void *arg) {
  460. MSI_TIME_STAMP_START(TotalRestfulPredict)
  461. struct evbuffer *retbuff = evbuffer_new();
  462. if (retbuff == nullptr) {
  463. MSI_LOG_ERROR << "Create event buffer failed";
  464. return;
  465. }
  466. auto status = HttpHandleMsgDetail(req, arg, retbuff);
  467. if (status != SUCCESS) {
  468. ErrorMessage(req, status);
  469. evbuffer_free(retbuff);
  470. return;
  471. }
  472. MSI_TIME_STAMP_START(ReplyJson)
  473. evhttp_send_reply(req, HTTP_OK, "Client", retbuff);
  474. MSI_TIME_STAMP_END(ReplyJson)
  475. evbuffer_free(retbuff);
  476. MSI_TIME_STAMP_END(TotalRestfulPredict)
  477. }
  478. } // namespace serving
  479. } // namespace mindspore