|
- #!/usr/bin/env python3
- # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
- # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
-
- import asyncio
- import time
- import threading
- import concurrent
- from typing import List
- import signal
- import functools
-
-
- __all__ = ['TimerTaskManager', 'TimerTask']
- __author__ = ['"donkey" <anjingyu_ws@foxmail.com>']
-
-
- """The TimerTask class is an abstract helper tool used to create tasks that are executed periodically(like a timer) in the asynchronous way.
- Although the performance is higher than multithreading in most cases, but if there are too many timers in one thread, the performance will be severely degraded.
-
- Typical usage example:
-
- class MyTask(TimerTask):
- def __init__(self, interval):
- # Also, you can provide your own loop
- super().__init__(interval=interval)
-
- # Maybe in non-main thread, according to the looper
- async def callback(self):
- print("Do my working")
- # may sleep for a while
- await asyncio.sleep(0.1)
- """
-
- import time
- import asyncio
- import uuid
- from threading import Thread, current_thread
-
-
- class TimerTaskLoop:
- def __init__(self, interval, loop=asyncio.get_event_loop()):
- self.__loop = loop
- self.__interval = interval
- self.__name = "{}+{}".format(self.__class__.__name__, str(uuid.uuid4()))
- self.__stop = False
- self.__task = asyncio.run_coroutine_threadsafe(self.__job(), loop=self.__loop)
-
- async def callback(self):
- pass
-
- async def __job(self):
- delay_time = self.__interval
- try:
- while not self.__stop:
- ts = time.time()
- await self.callback()
- delay_time = self.__interval - (time.time() - ts)
- await asyncio.sleep(delay_time, loop=self.__loop)
- except Exception as ex:
- print(ex)
-
- @property
- def name(self) -> str:
- return self.__name
-
- @name.setter
- def name(self, value):
- self.__name = value
-
- def cancel(self):
- self.__stop = True
- self.__task.cancel()
- return self.__task
-
-
- class TimerTask:
- def __init__(self, interval):
- self.__interval = interval
- self.__name = "{}+{}".format(self.__class__.__name__, str(uuid.uuid4()))
-
- async def callback(self):
- pass
-
- def before_start(self):
- """Callback before the timer start."""
- pass
-
- def after_end(self):
- """After the timer destory"""
- pass
-
- @property
- def interval(self) -> float:
- return self.__interval
-
- @property
- def name(self) -> str:
- return self.__name
-
- @name.setter
- def name(self, value):
- self.__name = value
-
-
- class TimerTaskManager:
- def __init__(self, in_thread=False):
- if threading.current_thread() is not threading.main_thread():
- raise RuntimeError("TimerTaskManager can only be created in Main Thread!")
-
- if in_thread:
- self.__loop = asyncio.new_event_loop()
- self.__thread = threading.Thread(target=self.__run, args=(self.__loop, ))
- else:
- self.__loop = asyncio.get_event_loop()
- self.__thread = None
-
- self.__timers = {}
- self.__stop = False
-
- def start(self):
- self.__stop = False
- if self.__thread:
- self.__thread.start()
-
- def stop(self):
- self.__stop = True
- if self.__thread:
- for timer in self.__timers.values():
- timer.cancel()
-
- if len(self.__timers):
- # Avoid WARNING: "Task was destroyed but it is pending!"
- concurrent.futures.wait(self.__timers.values())
-
- self.__timers = {}
- self.__loop.call_soon_threadsafe(self.__loop.stop)
- self.__thread.join()
- self.__thread = None
-
- def __run(self, loop):
- print("[DEBUG] Run in thread:", threading.current_thread())
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- print("CTRL+C")
-
- loop.close()
-
- async def __job(self, task):
- delay_time = task.interval
- try:
- task.before_start()
- while not self.__stop:
- ts = time.time()
- await task.callback()
- delay_time = task.interval - (time.time() - ts)
- await asyncio.sleep(delay_time, loop=self.__loop)
- except Exception as ex:
- print(ex)
- finally:
- task.after_end()
-
- def add_timer(self, *tasks: List[TimerTask]) -> List[str]:
- ret = []
- for task in tasks:
- assert task.name not in self.__timers
- self.__timers[task.name] = asyncio.run_coroutine_threadsafe(self.__job(task), loop=self.__loop)
- ret.append(task.name)
-
- return ret
-
- def remove_timer(self, *timer_ids: List[str]):
- for timer_id in timer_ids:
- if timer_id in self.__timers:
- self.__timers[timer_id].cancel()
- del self.__timers[timer_id]
-
- if len(self.__timers) == 0:
- self.__loop.call_soon_threadsafe(self.__loop.stop)
-
-
- def test_pure_task():
- class MyTimer(TimerTaskLoop):
- def __init__(self, interval, loop: asyncio.AbstractEventLoop=asyncio.get_event_loop()):
- # Also, you can provide your own loop
- super().__init__(interval=interval, loop=loop)
- self.__counter = 0
-
- # Maybe in non-main thread, according to the looper
- async def callback(self):
- print("Do my working: " + self.name + f"( + {time.time()} + ), count: {self.__counter}")
- # may sleep for a while
- await asyncio.sleep(0.1)
- self.__counter += 1
-
- @property
- def counter(self):
- return self.__counter
-
- def run_service(loop, my_timer):
- try:
- my_timer.name = "{}[{}]".format(my_timer.name, threading.current_thread().ident)
- loop.run_forever()
- except KeyboardInterrupt:
- print("CTRL+C")
-
- main_loop = asyncio.get_event_loop()
- loop = asyncio.new_event_loop()
-
- def stopper(signame, loop):
- print("Got %s, stopping..." % signame)
- loop.stop()
-
- # Not available on Windows
- # for signame in ('SIGINT', 'SIGTERM'):
- # loop.add_signal_handler(getattr(signal, signame), functools.partial(stopper, signame, loop))
- # main_loop.add_signal_handler(getattr(signal, signame), functools.partial(stopper, signame, main_loop))
-
- my_timer1 = MyTimer(interval=0.22)
- my_timer2 = MyTimer(interval=0.22, loop=loop)
-
- my_timer1.name = "{}[{}]".format(my_timer1.name, threading.currentThread().ident)
-
- t = threading.Thread(target=run_service, args=(loop, my_timer2, ))
- t.start()
-
- time.sleep(2.0)
-
- my_timer3 = MyTimer(interval=0.3, loop=loop)
-
- async def stop_tm():
- await asyncio.sleep(3.0)
- t1 = my_timer1.cancel()
- t2 = my_timer2.cancel()
- t3 = my_timer3.cancel()
- concurrent.futures.wait([t2, t3])
- loop.call_soon_threadsafe(loop.stop)
- # We should join the thread
- await asyncio.sleep(0.1)
- asyncio.get_event_loop().stop()
-
- asyncio.ensure_future(stop_tm())
-
- try:
- main_loop.run_forever()
- except:
- loop.call_soon_threadsafe(loop.stop)
- finally:
- main_loop.close()
-
- print("counter:", my_timer1.counter)
- print("counter:", my_timer2.counter)
- print("counter:", my_timer3.counter)
-
- def test_mgr():
- class MyNewTimer(TimerTask):
- def __init__(self, interval):
- # Also, you can provide your own loop
- super().__init__(interval=interval)
- self.__counter = 0
-
- # Maybe in non-main thread, according to the looper
- async def callback(self):
- print("Do my working: " + self.name + f"( + {time.time()} + ), count: {self.__counter}")
- # may sleep for a while
- await asyncio.sleep(0.1)
- self.__counter += 1
-
- @property
- def counter(self):
- return self.__counter
-
- in_thread = True
- # in_thread = True
-
- ttm = TimerTaskManager(in_thread)
- t1 = MyNewTimer(interval=0.22)
- t2 = MyNewTimer(interval=0.3)
- tid1, tid2 = ttm.add_timer(t1, t2)
- ttm.start()
-
- async def stop_tm():
- await asyncio.sleep(5.0)
- ttm.remove_timer(tid1, tid2)
- ttm.stop()
- asyncio.get_event_loop().stop()
-
- asyncio.ensure_future(stop_tm())
-
- try:
- asyncio.get_event_loop().run_forever()
- except KeyboardInterrupt:
- print("CTRL+C")
- ttm.stop()
-
- if __name__ == '__main__':
- test_pure_task()
- # test_mgr()
|