You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

task.py 11 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Python run preprocess and postprocess in python"""
  16. import threading
  17. import time
  18. import logging
  19. from mindspore_serving._mindspore_serving import Worker_
  20. from mindspore_serving.worker.register.preprocess import preprocess_storage
  21. from mindspore_serving.worker.register.postprocess import postprocess_storage
  22. from mindspore_serving import log as logger
  23. class ServingSystemException(Exception):
  24. def __init__(self, msg):
  25. super(ServingSystemException, self).__init__()
  26. self.msg = msg
  27. def __str__(self):
  28. return "Serving system error: " + self.msg
  29. task_type_stop = "stop"
  30. task_type_empty = "empty"
  31. task_type_preprocess = "preprocess"
  32. task_type_postprocess = "postprocess"
  33. class PyTask:
  34. """Base class for preprocess and postprocess"""
  35. def __init__(self, switch_batch, task_name):
  36. super(PyTask, self).__init__()
  37. self.task_name = task_name
  38. self.switch_batch = switch_batch
  39. self.temp_result = None
  40. self.task = None
  41. self.index = 0
  42. self.instances_size = 0
  43. self.stop_flag = False
  44. self.result_batch = []
  45. def push_failed_impl(self, count):
  46. """Base method to push failed result"""
  47. raise NotImplementedError
  48. def push_result_batch_impl(self, result_batch):
  49. """Base method to push success result"""
  50. raise NotImplementedError
  51. def get_task_info(self, task_name):
  52. """Base method to get task info"""
  53. raise NotImplementedError
  54. def push_failed(self, count):
  55. """Push failed result"""
  56. self.push_result_batch() # push success first
  57. self.push_failed_impl(count)
  58. self.index += count
  59. def push_result_batch(self):
  60. """Push success result"""
  61. if not self.result_batch:
  62. return
  63. self.index += len(self.result_batch)
  64. self.push_result_batch_impl(tuple(self.result_batch))
  65. self.result_batch = []
  66. def in_processing(self):
  67. """Is last task time gab not handled done, every time gab handles some instances of preprocess and
  68. postprocess"""
  69. return self.temp_result is not None
  70. def run(self, task=None):
  71. """Run preprocess or postprocess, if last task has not been handled, continue to handle,
  72. or handle new task, every task has some instances"""
  73. if not self.temp_result:
  74. assert task is not None
  75. self.instances_size = len(task.instance_list)
  76. self.index = 0
  77. self.task = task
  78. self.temp_result = self._handle_task()
  79. if not self.temp_result:
  80. return
  81. while self.index < self.instances_size:
  82. try:
  83. get_result_time_end = time.time()
  84. last_index = self.index
  85. for _ in range(self.index, min(self.index + self.switch_batch, self.instances_size)):
  86. output = next(self.temp_result)
  87. output = self._handle_result(output)
  88. self.result_batch.append(output)
  89. get_result_time = time.time()
  90. logger.info(f"{self.task_name} get result "
  91. f"{last_index} ~ {last_index + len(self.result_batch) - 1} cost time "
  92. f"{(get_result_time - get_result_time_end) * 1000} ms")
  93. self.push_result_batch()
  94. break
  95. except StopIteration:
  96. self.push_result_batch()
  97. self.push_failed(self.instances_size - self.index)
  98. raise RuntimeError(
  99. f"expecting '{self.task_name}' yield count equal to instance size {self.instances_size}")
  100. except ServingSystemException as e:
  101. raise e
  102. except Exception as e: # catch exception and try next
  103. logger.warning(f"{self.task_name} get result catch exception: {e}")
  104. logging.exception(e)
  105. self.push_failed(1) # push success results and a failed result
  106. self.temp_result = self._handle_task_continue()
  107. if self.index >= self.instances_size:
  108. self.temp_result = None
  109. def _handle_task(self):
  110. """Handle new task"""
  111. self.task_info = self.get_task_info(self.task.name)
  112. instance_list = self.task.instance_list
  113. self.context_list = self.task.context_list
  114. # check input
  115. for item in instance_list:
  116. if not isinstance(item, tuple) or len(item) != self.task_info["inputs_count"]:
  117. raise RuntimeError(f"length of given inputs {len(item)}"
  118. f" not match {self.task_name} required " + str(self.task_info["inputs_count"]))
  119. return self._handle_task_continue()
  120. def _handle_task_continue(self):
  121. """Continue to handle task on new task or task exception happened"""
  122. if self.index >= self.instances_size:
  123. return None
  124. instance_list = self.task.instance_list
  125. try:
  126. outputs = self.task_info["fun"](instance_list[self.index:])
  127. return outputs
  128. except Exception as e:
  129. logger.warning(f"{self.task_name} invoke catch exception: ")
  130. logging.exception(e)
  131. self.push_failed(len(instance_list) - self.index)
  132. return None
  133. def _handle_result(self, output):
  134. """Further processing results of preprocess or postprocess"""
  135. if not isinstance(output, (tuple, list)):
  136. output = (output,)
  137. if len(output) != self.task_info["outputs_count"]:
  138. raise ServingSystemException(f"length of return output {len(output)} "
  139. f"not match {self.task_name} signatures " +
  140. str(self.task_info["outputs_count"]))
  141. output = (item.asnumpy() if callable(getattr(item, "asnumpy", None)) else item for item in output)
  142. return output
  143. class PyPreprocess(PyTask):
  144. """Preprocess implement"""
  145. def __init__(self, switch_batch):
  146. super(PyPreprocess, self).__init__(switch_batch, "preprocess")
  147. def push_failed_impl(self, count):
  148. """Push failed preprocess result to c++ env"""
  149. Worker_.push_preprocess_failed(count)
  150. def push_result_batch_impl(self, result_batch):
  151. """Push success preprocess result to c++ env"""
  152. Worker_.push_preprocess_result(result_batch)
  153. def get_task_info(self, task_name):
  154. """Get preprocess task info, including inputs, outputs count, function of preprocess"""
  155. return preprocess_storage.get(task_name)
  156. class PyPostprocess(PyTask):
  157. """Postprocess implement"""
  158. def __init__(self, switch_batch):
  159. super(PyPostprocess, self).__init__(switch_batch, "postprocess")
  160. def push_failed_impl(self, count):
  161. """Push failed postprocess result to c++ env"""
  162. Worker_.push_postprocess_failed(count)
  163. def push_result_batch_impl(self, result_batch):
  164. """Push success postprocess result to c++ env"""
  165. Worker_.push_postprocess_result(result_batch)
  166. def get_task_info(self, task_name):
  167. """Get postprocess task info, including inputs, outputs count, function of postprocess"""
  168. return postprocess_storage.get(task_name)
  169. class PyTaskThread(threading.Thread):
  170. """Thread for handling preprocess and postprocess"""
  171. def __init__(self, switch_batch):
  172. super(PyTaskThread, self).__init__()
  173. self.switch_batch = switch_batch
  174. if self.switch_batch <= 0:
  175. self.switch_batch = 1
  176. self.preprocess = PyPreprocess(self.switch_batch)
  177. self.postprocess = PyPostprocess(self.switch_batch)
  178. def run(self):
  179. """Run tasks of preprocess and postprocess, switch to other type of process when some instances are handled"""
  180. logger.info(f"start py task for preprocess and postprocess, switch_batch {self.switch_batch}")
  181. preprocess_turn = True
  182. while True:
  183. try:
  184. if not self.preprocess.in_processing() and not self.postprocess.in_processing():
  185. task = Worker_.get_py_task()
  186. if task.task_type == task_type_stop:
  187. break
  188. if task.task_type == task_type_preprocess:
  189. self.preprocess.run(task)
  190. preprocess_turn = False
  191. elif task.task_type == task_type_postprocess:
  192. self.postprocess.run(task)
  193. preprocess_turn = True
  194. # in preprocess turn, when preprocess is still running, switch to running preprocess
  195. # otherwise try get next preprocess task when postprocess is running
  196. # when next preprocess is not available, switch to running postprocess
  197. if preprocess_turn:
  198. if self.preprocess.in_processing():
  199. self.preprocess.run()
  200. elif self.postprocess.in_processing():
  201. task = Worker_.try_get_preprocess_py_task()
  202. if task.task_type == task_type_stop:
  203. break
  204. if task.task_type != task_type_empty:
  205. self.preprocess.run(task)
  206. preprocess_turn = False
  207. else:
  208. if self.postprocess.in_processing():
  209. self.postprocess.run()
  210. elif self.preprocess.in_processing():
  211. task = Worker_.try_get_postprocess_py_task()
  212. if task.task_type == task_type_stop:
  213. break
  214. if task.task_type != task_type_empty:
  215. self.postprocess.run(task)
  216. preprocess_turn = True
  217. except Exception as e:
  218. logger.error(f"py task catch exception and exit: {e}")
  219. logging.exception(e)
  220. break
  221. logger.info("end py task for preprocess and postprocess")
  222. Worker_.stop()
  223. py_task_thread = None
  224. def _start_py_task(switch_batch):
  225. """Start python thread for proprocessing and postprocessing"""
  226. global py_task_thread
  227. if py_task_thread is None:
  228. py_task_thread = PyTaskThread(switch_batch)
  229. py_task_thread.start()

A lightweight and high-performance service module that helps MindSpore developers efficiently deploy online inference services in the production environment.