You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

timer_task.py 8.8 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
  3. # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
  4. import asyncio
  5. import time
  6. import threading
  7. import concurrent
  8. from typing import List
  9. import signal
  10. import functools
  11. __all__ = ['TimerTaskManager', 'TimerTask']
  12. __author__ = ['"donkey" <anjingyu_ws@foxmail.com>']
  13. """The TimerTask class is an abstract helper tool used to create tasks that are executed periodically(like a timer) in the asynchronous way.
  14. Although the performance is higher than multithreading in most cases, but if there are too many timers in one thread, the performance will be severely degraded.
  15. Typical usage example:
  16. class MyTask(TimerTask):
  17. def __init__(self, interval):
  18. # Also, you can provide your own loop
  19. super().__init__(interval=interval)
  20. # Maybe in non-main thread, according to the looper
  21. async def callback(self):
  22. print("Do my working")
  23. # may sleep for a while
  24. await asyncio.sleep(0.1)
  25. """
  26. import time
  27. import asyncio
  28. import uuid
  29. from threading import Thread, current_thread
  30. class TimerTaskLoop:
  31. def __init__(self, interval, loop=asyncio.get_event_loop()):
  32. self.__loop = loop
  33. self.__interval = interval
  34. self.__name = "{}+{}".format(self.__class__.__name__, str(uuid.uuid4()))
  35. self.__stop = False
  36. self.__task = asyncio.run_coroutine_threadsafe(self.__job(), loop=self.__loop)
  37. async def callback(self):
  38. pass
  39. async def __job(self):
  40. delay_time = self.__interval
  41. try:
  42. while not self.__stop:
  43. ts = time.time()
  44. await self.callback()
  45. delay_time = self.__interval - (time.time() - ts)
  46. await asyncio.sleep(delay_time, loop=self.__loop)
  47. except Exception as ex:
  48. print(ex)
  49. @property
  50. def name(self) -> str:
  51. return self.__name
  52. @name.setter
  53. def name(self, value):
  54. self.__name = value
  55. def cancel(self):
  56. self.__stop = True
  57. self.__task.cancel()
  58. return self.__task
  59. class TimerTask:
  60. def __init__(self, interval):
  61. self.__interval = interval
  62. self.__name = "{}+{}".format(self.__class__.__name__, str(uuid.uuid4()))
  63. async def callback(self):
  64. pass
  65. def before_start(self):
  66. """Callback before the timer start."""
  67. pass
  68. def after_end(self):
  69. """After the timer destory"""
  70. pass
  71. @property
  72. def interval(self) -> float:
  73. return self.__interval
  74. @property
  75. def name(self) -> str:
  76. return self.__name
  77. @name.setter
  78. def name(self, value):
  79. self.__name = value
  80. class TimerTaskManager:
  81. def __init__(self, in_thread=False):
  82. if threading.current_thread() is not threading.main_thread():
  83. raise RuntimeError("TimerTaskManager can only be created in Main Thread!")
  84. if in_thread:
  85. self.__loop = asyncio.new_event_loop()
  86. self.__thread = threading.Thread(target=self.__run, args=(self.__loop, ))
  87. else:
  88. self.__loop = asyncio.get_event_loop()
  89. self.__thread = None
  90. self.__timers = {}
  91. self.__stop = False
  92. def start(self):
  93. self.__stop = False
  94. if self.__thread:
  95. self.__thread.start()
  96. def stop(self):
  97. self.__stop = True
  98. if self.__thread:
  99. for timer in self.__timers.values():
  100. timer.cancel()
  101. if len(self.__timers):
  102. # Avoid WARNING: "Task was destroyed but it is pending!"
  103. concurrent.futures.wait(self.__timers.values())
  104. self.__timers = {}
  105. self.__loop.call_soon_threadsafe(self.__loop.stop)
  106. self.__thread.join()
  107. self.__thread = None
  108. def __run(self, loop):
  109. print("[DEBUG] Run in thread:", threading.current_thread())
  110. try:
  111. loop.run_forever()
  112. except KeyboardInterrupt:
  113. print("CTRL+C")
  114. loop.close()
  115. async def __job(self, task):
  116. delay_time = task.interval
  117. try:
  118. task.before_start()
  119. while not self.__stop:
  120. ts = time.time()
  121. await task.callback()
  122. delay_time = task.interval - (time.time() - ts)
  123. await asyncio.sleep(delay_time, loop=self.__loop)
  124. except Exception as ex:
  125. print(ex)
  126. finally:
  127. task.after_end()
  128. def add_timer(self, *tasks: List[TimerTask]) -> List[str]:
  129. ret = []
  130. for task in tasks:
  131. assert task.name not in self.__timers
  132. self.__timers[task.name] = asyncio.run_coroutine_threadsafe(self.__job(task), loop=self.__loop)
  133. ret.append(task.name)
  134. return ret
  135. def remove_timer(self, *timer_ids: List[str]):
  136. for timer_id in timer_ids:
  137. if timer_id in self.__timers:
  138. self.__timers[timer_id].cancel()
  139. del self.__timers[timer_id]
  140. if len(self.__timers) == 0:
  141. self.__loop.call_soon_threadsafe(self.__loop.stop)
  142. def test_pure_task():
  143. class MyTimer(TimerTaskLoop):
  144. def __init__(self, interval, loop: asyncio.AbstractEventLoop=asyncio.get_event_loop()):
  145. # Also, you can provide your own loop
  146. super().__init__(interval=interval, loop=loop)
  147. self.__counter = 0
  148. # Maybe in non-main thread, according to the looper
  149. async def callback(self):
  150. print("Do my working: " + self.name + f"( + {time.time()} + ), count: {self.__counter}")
  151. # may sleep for a while
  152. await asyncio.sleep(0.1)
  153. self.__counter += 1
  154. @property
  155. def counter(self):
  156. return self.__counter
  157. def run_service(loop, my_timer):
  158. try:
  159. my_timer.name = "{}[{}]".format(my_timer.name, threading.current_thread().ident)
  160. loop.run_forever()
  161. except KeyboardInterrupt:
  162. print("CTRL+C")
  163. main_loop = asyncio.get_event_loop()
  164. loop = asyncio.new_event_loop()
  165. def stopper(signame, loop):
  166. print("Got %s, stopping..." % signame)
  167. loop.stop()
  168. # Not available on Windows
  169. # for signame in ('SIGINT', 'SIGTERM'):
  170. # loop.add_signal_handler(getattr(signal, signame), functools.partial(stopper, signame, loop))
  171. # main_loop.add_signal_handler(getattr(signal, signame), functools.partial(stopper, signame, main_loop))
  172. my_timer1 = MyTimer(interval=0.22)
  173. my_timer2 = MyTimer(interval=0.22, loop=loop)
  174. my_timer1.name = "{}[{}]".format(my_timer1.name, threading.currentThread().ident)
  175. t = threading.Thread(target=run_service, args=(loop, my_timer2, ))
  176. t.start()
  177. time.sleep(2.0)
  178. my_timer3 = MyTimer(interval=0.3, loop=loop)
  179. async def stop_tm():
  180. await asyncio.sleep(3.0)
  181. t1 = my_timer1.cancel()
  182. t2 = my_timer2.cancel()
  183. t3 = my_timer3.cancel()
  184. concurrent.futures.wait([t2, t3])
  185. loop.call_soon_threadsafe(loop.stop)
  186. # We should join the thread
  187. await asyncio.sleep(0.1)
  188. asyncio.get_event_loop().stop()
  189. asyncio.ensure_future(stop_tm())
  190. try:
  191. main_loop.run_forever()
  192. except:
  193. loop.call_soon_threadsafe(loop.stop)
  194. finally:
  195. main_loop.close()
  196. print("counter:", my_timer1.counter)
  197. print("counter:", my_timer2.counter)
  198. print("counter:", my_timer3.counter)
  199. def test_mgr():
  200. class MyNewTimer(TimerTask):
  201. def __init__(self, interval):
  202. # Also, you can provide your own loop
  203. super().__init__(interval=interval)
  204. self.__counter = 0
  205. # Maybe in non-main thread, according to the looper
  206. async def callback(self):
  207. print("Do my working: " + self.name + f"( + {time.time()} + ), count: {self.__counter}")
  208. # may sleep for a while
  209. await asyncio.sleep(0.1)
  210. self.__counter += 1
  211. @property
  212. def counter(self):
  213. return self.__counter
  214. in_thread = True
  215. # in_thread = True
  216. ttm = TimerTaskManager(in_thread)
  217. t1 = MyNewTimer(interval=0.22)
  218. t2 = MyNewTimer(interval=0.3)
  219. tid1, tid2 = ttm.add_timer(t1, t2)
  220. ttm.start()
  221. async def stop_tm():
  222. await asyncio.sleep(5.0)
  223. ttm.remove_timer(tid1, tid2)
  224. ttm.stop()
  225. asyncio.get_event_loop().stop()
  226. asyncio.ensure_future(stop_tm())
  227. try:
  228. asyncio.get_event_loop().run_forever()
  229. except KeyboardInterrupt:
  230. print("CTRL+C")
  231. ttm.stop()
  232. if __name__ == '__main__':
  233. test_pure_task()
  234. # test_mgr()