|
|
|
@@ -14,15 +14,16 @@ |
|
|
|
# ============================================================================ |
|
|
|
"""Write events to disk in a base directory.""" |
|
|
|
import os |
|
|
|
import time |
|
|
|
from collections import deque |
|
|
|
from multiprocessing import Pool, Process, Queue, cpu_count |
|
|
|
|
|
|
|
from ._lineage_adapter import serialize_to_lineage_event |
|
|
|
from ._summary_adapter import package_graph_event, package_summary_event |
|
|
|
from ._summary_writer import SummaryWriter, LineageWriter |
|
|
|
from ._summary_writer import LineageWriter, SummaryWriter |
|
|
|
|
|
|
|
|
|
|
|
def _pack_data(datadict): |
|
|
|
def _pack_data(datadict, wall_time): |
|
|
|
"""Pack data according to which plugin.""" |
|
|
|
result = [] |
|
|
|
summaries, step, mode = [], None, None |
|
|
|
@@ -37,7 +38,7 @@ def _pack_data(datadict): |
|
|
|
step = data.get('step') |
|
|
|
mode = data.get('mode') |
|
|
|
if summaries: |
|
|
|
result.append(['summary', mode, package_summary_event(summaries, step).SerializeToString()]) |
|
|
|
result.append(['summary', mode, package_summary_event(summaries, step, wall_time).SerializeToString()]) |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
@@ -70,7 +71,7 @@ class WriterPool(Process): |
|
|
|
if not self._queue.empty(): |
|
|
|
action, data = self._queue.get() |
|
|
|
if action == 'WRITE': |
|
|
|
deq.append(pool.apply_async(_pack_data, (data,))) |
|
|
|
deq.append(pool.apply_async(_pack_data, (data, time.time()))) |
|
|
|
elif action == 'FLUSH': |
|
|
|
for writer in writers: |
|
|
|
writer.flush() |
|
|
|
|