|
|
@@ -16,6 +16,7 @@ |
|
|
import os |
|
|
import os |
|
|
import time |
|
|
import time |
|
|
import signal |
|
|
import signal |
|
|
|
|
|
import queue |
|
|
from collections import deque |
|
|
from collections import deque |
|
|
|
|
|
|
|
|
import psutil |
|
|
import psutil |
|
|
@@ -108,14 +109,17 @@ class WriterPool(ctx.Process): |
|
|
for plugin, data in deq.popleft().get(): |
|
|
for plugin, data in deq.popleft().get(): |
|
|
self._write(plugin, data) |
|
|
self._write(plugin, data) |
|
|
|
|
|
|
|
|
if not self._queue.empty(): |
|
|
|
|
|
action, data = self._queue.get() |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
action, data = self._queue.get(block=False) |
|
|
if action == 'WRITE': |
|
|
if action == 'WRITE': |
|
|
deq.append(pool.apply_async(_pack_data, (data, time.time()))) |
|
|
deq.append(pool.apply_async(_pack_data, (data, time.time()))) |
|
|
elif action == 'FLUSH': |
|
|
elif action == 'FLUSH': |
|
|
self._flush() |
|
|
self._flush() |
|
|
elif action == 'END': |
|
|
elif action == 'END': |
|
|
break |
|
|
break |
|
|
|
|
|
except queue.Empty: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
for result in deq: |
|
|
for result in deq: |
|
|
for plugin, data in result.get(): |
|
|
for plugin, data in result.get(): |
|
|
self._write(plugin, data) |
|
|
self._write(plugin, data) |
|
|
|