| @@ -16,6 +16,7 @@ | |||||
| import os | import os | ||||
| import time | import time | ||||
| import signal | import signal | ||||
| import queue | |||||
| from collections import deque | from collections import deque | ||||
| import psutil | import psutil | ||||
| @@ -108,14 +109,17 @@ class WriterPool(ctx.Process): | |||||
| for plugin, data in deq.popleft().get(): | for plugin, data in deq.popleft().get(): | ||||
| self._write(plugin, data) | self._write(plugin, data) | ||||
| if not self._queue.empty(): | |||||
| action, data = self._queue.get() | |||||
| try: | |||||
| action, data = self._queue.get(block=False) | |||||
| if action == 'WRITE': | if action == 'WRITE': | ||||
| deq.append(pool.apply_async(_pack_data, (data, time.time()))) | deq.append(pool.apply_async(_pack_data, (data, time.time()))) | ||||
| elif action == 'FLUSH': | elif action == 'FLUSH': | ||||
| self._flush() | self._flush() | ||||
| elif action == 'END': | elif action == 'END': | ||||
| break | break | ||||
| except queue.Empty: | |||||
| pass | |||||
| for result in deq: | for result in deq: | ||||
| for plugin, data in result.get(): | for plugin, data in result.get(): | ||||
| self._write(plugin, data) | self._write(plugin, data) | ||||