#!/usr/bin/env python3 # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*- # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8 import asyncio import time import threading import concurrent from typing import List import signal import functools __all__ = ['TimerTaskManager', 'TimerTask'] __author__ = ['"donkey" '] """The TimerTask class is an abstract helper tool used to create tasks that are executed periodically(like a timer) in the asynchronous way. Although the performance is higher than multithreading in most cases, but if there are too many timers in one thread, the performance will be severely degraded. Typical usage example: class MyTask(TimerTask): def __init__(self, interval): # Also, you can provide your own loop super().__init__(interval=interval) # Maybe in non-main thread, according to the looper async def callback(self): print("Do my working") # may sleep for a while await asyncio.sleep(0.1) """ import time import asyncio import uuid from threading import Thread, current_thread class TimerTaskLoop: def __init__(self, interval, loop=asyncio.get_event_loop()): self.__loop = loop self.__interval = interval self.__name = "{}+{}".format(self.__class__.__name__, str(uuid.uuid4())) self.__stop = False self.__task = asyncio.run_coroutine_threadsafe(self.__job(), loop=self.__loop) async def callback(self): pass async def __job(self): delay_time = self.__interval try: while not self.__stop: ts = time.time() await self.callback() delay_time = self.__interval - (time.time() - ts) await asyncio.sleep(delay_time, loop=self.__loop) except Exception as ex: print(ex) @property def name(self) -> str: return self.__name @name.setter def name(self, value): self.__name = value def cancel(self): self.__stop = True self.__task.cancel() return self.__task class TimerTask: def __init__(self, interval): self.__interval = interval self.__name = "{}+{}".format(self.__class__.__name__, str(uuid.uuid4())) async def callback(self): pass def before_start(self): """Callback before the timer start.""" pass def after_end(self): """After the timer destory""" pass @property def interval(self) -> float: return self.__interval @property def name(self) -> str: return self.__name @name.setter def name(self, value): self.__name = value class TimerTaskManager: def __init__(self, in_thread=False): if threading.current_thread() is not threading.main_thread(): raise RuntimeError("TimerTaskManager can only be created in Main Thread!") if in_thread: self.__loop = asyncio.new_event_loop() self.__thread = threading.Thread(target=self.__run, args=(self.__loop, )) else: self.__loop = asyncio.get_event_loop() self.__thread = None self.__timers = {} self.__stop = False def start(self): self.__stop = False if self.__thread: self.__thread.start() def stop(self): self.__stop = True if self.__thread: for timer in self.__timers.values(): timer.cancel() if len(self.__timers): # Avoid WARNING: "Task was destroyed but it is pending!" concurrent.futures.wait(self.__timers.values()) self.__timers = {} self.__loop.call_soon_threadsafe(self.__loop.stop) self.__thread.join() self.__thread = None def __run(self, loop): print("[DEBUG] Run in thread:", threading.current_thread()) try: loop.run_forever() except KeyboardInterrupt: print("CTRL+C") loop.close() async def __job(self, task): delay_time = task.interval try: task.before_start() while not self.__stop: ts = time.time() await task.callback() delay_time = task.interval - (time.time() - ts) await asyncio.sleep(delay_time, loop=self.__loop) except Exception as ex: print(ex) finally: task.after_end() def add_timer(self, *tasks: List[TimerTask]) -> List[str]: ret = [] for task in tasks: assert task.name not in self.__timers self.__timers[task.name] = asyncio.run_coroutine_threadsafe(self.__job(task), loop=self.__loop) ret.append(task.name) return ret def remove_timer(self, *timer_ids: List[str]): for timer_id in timer_ids: if timer_id in self.__timers: self.__timers[timer_id].cancel() del self.__timers[timer_id] if len(self.__timers) == 0: self.__loop.call_soon_threadsafe(self.__loop.stop) def test_pure_task(): class MyTimer(TimerTaskLoop): def __init__(self, interval, loop: asyncio.AbstractEventLoop=asyncio.get_event_loop()): # Also, you can provide your own loop super().__init__(interval=interval, loop=loop) self.__counter = 0 # Maybe in non-main thread, according to the looper async def callback(self): print("Do my working: " + self.name + f"( + {time.time()} + ), count: {self.__counter}") # may sleep for a while await asyncio.sleep(0.1) self.__counter += 1 @property def counter(self): return self.__counter def run_service(loop, my_timer): try: my_timer.name = "{}[{}]".format(my_timer.name, threading.current_thread().ident) loop.run_forever() except KeyboardInterrupt: print("CTRL+C") main_loop = asyncio.get_event_loop() loop = asyncio.new_event_loop() def stopper(signame, loop): print("Got %s, stopping..." % signame) loop.stop() # Not available on Windows # for signame in ('SIGINT', 'SIGTERM'): # loop.add_signal_handler(getattr(signal, signame), functools.partial(stopper, signame, loop)) # main_loop.add_signal_handler(getattr(signal, signame), functools.partial(stopper, signame, main_loop)) my_timer1 = MyTimer(interval=0.22) my_timer2 = MyTimer(interval=0.22, loop=loop) my_timer1.name = "{}[{}]".format(my_timer1.name, threading.currentThread().ident) t = threading.Thread(target=run_service, args=(loop, my_timer2, )) t.start() time.sleep(2.0) my_timer3 = MyTimer(interval=0.3, loop=loop) async def stop_tm(): await asyncio.sleep(3.0) t1 = my_timer1.cancel() t2 = my_timer2.cancel() t3 = my_timer3.cancel() concurrent.futures.wait([t2, t3]) loop.call_soon_threadsafe(loop.stop) # We should join the thread await asyncio.sleep(0.1) asyncio.get_event_loop().stop() asyncio.ensure_future(stop_tm()) try: main_loop.run_forever() except: loop.call_soon_threadsafe(loop.stop) finally: main_loop.close() print("counter:", my_timer1.counter) print("counter:", my_timer2.counter) print("counter:", my_timer3.counter) def test_mgr(): class MyNewTimer(TimerTask): def __init__(self, interval): # Also, you can provide your own loop super().__init__(interval=interval) self.__counter = 0 # Maybe in non-main thread, according to the looper async def callback(self): print("Do my working: " + self.name + f"( + {time.time()} + ), count: {self.__counter}") # may sleep for a while await asyncio.sleep(0.1) self.__counter += 1 @property def counter(self): return self.__counter in_thread = True # in_thread = True ttm = TimerTaskManager(in_thread) t1 = MyNewTimer(interval=0.22) t2 = MyNewTimer(interval=0.3) tid1, tid2 = ttm.add_timer(t1, t2) ttm.start() async def stop_tm(): await asyncio.sleep(5.0) ttm.remove_timer(tid1, tid2) ttm.stop() asyncio.get_event_loop().stop() asyncio.ensure_future(stop_tm()) try: asyncio.get_event_loop().run_forever() except KeyboardInterrupt: print("CTRL+C") ttm.stop() if __name__ == '__main__': test_pure_task() # test_mgr()