|
|
@@ -16,7 +16,6 @@ |
|
|
import os |
|
|
import os |
|
|
import time |
|
|
import time |
|
|
from collections import deque |
|
|
from collections import deque |
|
|
from multiprocessing import Pool, Process, Queue, cpu_count |
|
|
|
|
|
|
|
|
|
|
|
import mindspore.log as logger |
|
|
import mindspore.log as logger |
|
|
|
|
|
|
|
|
@@ -24,6 +23,12 @@ from ._lineage_adapter import serialize_to_lineage_event |
|
|
from ._summary_adapter import package_graph_event, package_summary_event |
|
|
from ._summary_adapter import package_graph_event, package_summary_event |
|
|
from ._summary_writer import LineageWriter, SummaryWriter |
|
|
from ._summary_writer import LineageWriter, SummaryWriter |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
from multiprocessing import get_context |
|
|
|
|
|
ctx = get_context('forkserver') |
|
|
|
|
|
except ValueError: |
|
|
|
|
|
import multiprocessing as ctx |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _pack_data(datadict, wall_time): |
|
|
def _pack_data(datadict, wall_time): |
|
|
"""Pack data according to which plugin.""" |
|
|
"""Pack data according to which plugin.""" |
|
|
@@ -42,7 +47,7 @@ def _pack_data(datadict, wall_time): |
|
|
return result |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WriterPool(Process): |
|
|
|
|
|
|
|
|
class WriterPool(ctx.Process): |
|
|
""" |
|
|
""" |
|
|
Use a set of pooled resident processes for writing a list of file. |
|
|
Use a set of pooled resident processes for writing a list of file. |
|
|
|
|
|
|
|
|
@@ -54,12 +59,12 @@ class WriterPool(Process): |
|
|
def __init__(self, base_dir, max_file_size, **filedict) -> None: |
|
|
def __init__(self, base_dir, max_file_size, **filedict) -> None: |
|
|
super().__init__() |
|
|
super().__init__() |
|
|
self._base_dir, self._filedict = base_dir, filedict |
|
|
self._base_dir, self._filedict = base_dir, filedict |
|
|
self._queue, self._writers_ = Queue(cpu_count() * 2), None |
|
|
|
|
|
|
|
|
self._queue, self._writers_ = ctx.Queue(ctx.cpu_count() * 2), None |
|
|
self._max_file_size = max_file_size |
|
|
self._max_file_size = max_file_size |
|
|
self.start() |
|
|
self.start() |
|
|
|
|
|
|
|
|
def run(self): |
|
|
def run(self): |
|
|
with Pool(min(cpu_count(), 32)) as pool: |
|
|
|
|
|
|
|
|
with ctx.Pool(min(ctx.cpu_count(), 32)) as pool: |
|
|
deq = deque() |
|
|
deq = deque() |
|
|
while True: |
|
|
while True: |
|
|
while deq and deq[0].ready(): |
|
|
while deq and deq[0].ready(): |
|
|
|