| @@ -26,7 +26,7 @@ def _clear_plasma_store(): | |||
| # `_PlasmaStoreManager.__del__` will not be called automaticly in subprocess, | |||
| # so this function should be called explicitly | |||
| global MGE_PLASMA_STORE_MANAGER | |||
| if MGE_PLASMA_STORE_MANAGER is not None: | |||
| if MGE_PLASMA_STORE_MANAGER is not None and MGE_PLASMA_STORE_MANAGER.refcount == 0: | |||
| del MGE_PLASMA_STORE_MANAGER | |||
| MGE_PLASMA_STORE_MANAGER = None | |||
| @@ -50,6 +50,7 @@ class _PlasmaStoreManager: | |||
| stderr=None if debug_flag else subprocess.DEVNULL, | |||
| ) | |||
| self.__initialized = True | |||
| self.refcount = 1 | |||
| def __del__(self): | |||
| if self.__initialized and self.plasma_store.returncode is None: | |||
| @@ -83,6 +84,8 @@ class PlasmaShmQueue: | |||
| "Exception happened in starting plasma_store: {}\n" | |||
| "Tips: {}".format(str(e), err_info) | |||
| ) | |||
| else: | |||
| MGE_PLASMA_STORE_MANAGER.refcount += 1 | |||
| self.socket_name = MGE_PLASMA_STORE_MANAGER.socket_name | |||
| @@ -133,6 +136,8 @@ class PlasmaShmQueue: | |||
| def close(self): | |||
| self.queue.close() | |||
| self.disconnect_client() | |||
| global MGE_PLASMA_STORE_MANAGER | |||
| MGE_PLASMA_STORE_MANAGER.refcount -= 1 | |||
| _clear_plasma_store() | |||
| def cancel_join_thread(self): | |||
| @@ -118,7 +118,7 @@ class COCO(VisionDataset): | |||
| self.ids = ids | |||
| self.json_category_id_to_contiguous_id = { | |||
| v: i + 1 for i, v in enumerate(self.cats.keys()) | |||
| v: i + 1 for i, v in enumerate(sorted(self.cats.keys())) | |||
| } | |||
| self.contiguous_category_id_to_json_id = { | |||
| @@ -81,7 +81,7 @@ class Objects365(VisionDataset): | |||
| self.ids = ids | |||
| self.json_category_id_to_contiguous_id = { | |||
| v: i + 1 for i, v in enumerate(self.cats.keys()) | |||
| v: i + 1 for i, v in enumerate(sorted(self.cats.keys())) | |||
| } | |||
| self.contiguous_category_id_to_json_id = { | |||
| @@ -75,6 +75,8 @@ class PascalVOC(VisionDataset): | |||
| else: | |||
| raise NotImplementedError | |||
| self.img_infos = dict() | |||
| def __getitem__(self, index): | |||
| target = [] | |||
| for k in self.order: | |||
| @@ -107,9 +109,8 @@ class PascalVOC(VisionDataset): | |||
| mask = mask[:, :, np.newaxis] | |||
| target.append(mask) | |||
| elif k == "info": | |||
| if image is None: | |||
| image = cv2.imread(self.images[index], cv2.IMREAD_COLOR) | |||
| info = [image.shape[0], image.shape[1], self.file_names[index]] | |||
| info = self.get_img_info(index, image) | |||
| info = [info["height"], info["width"], info["file_name"]] | |||
| target.append(info) | |||
| else: | |||
| raise NotImplementedError | |||
| @@ -119,6 +120,17 @@ class PascalVOC(VisionDataset): | |||
| def __len__(self): | |||
| return len(self.images) | |||
| def get_img_info(self, index, image=None): | |||
| if index not in self.img_infos: | |||
| if image is None: | |||
| image = cv2.imread(self.images[index], cv2.IMREAD_COLOR) | |||
| self.img_infos[index] = dict( | |||
| height=image.shape[0], | |||
| width=image.shape[1], | |||
| file_name=self.file_names[index], | |||
| ) | |||
| return self.img_infos[index] | |||
| def _trans_mask(self, mask): | |||
| label = np.ones(mask.shape[:2]) * 255 | |||
| for i in range(len(self.class_colors)): | |||
| @@ -171,25 +183,3 @@ class PascalVOC(VisionDataset): | |||
| "train", | |||
| "tvmonitor", | |||
| ) | |||
| class_colors = [ | |||
| [0, 0, 128], | |||
| [0, 128, 0], | |||
| [0, 128, 128], | |||
| [128, 0, 0], | |||
| [128, 0, 128], | |||
| [128, 128, 0], | |||
| [128, 128, 128], | |||
| [0, 0, 64], | |||
| [0, 0, 192], | |||
| [0, 128, 64], | |||
| [0, 128, 192], | |||
| [128, 0, 64], | |||
| [128, 0, 192], | |||
| [128, 128, 64], | |||
| [128, 128, 192], | |||
| [0, 64, 0], | |||
| [0, 64, 128], | |||
| [0, 192, 0], | |||
| [0, 192, 128], | |||
| [128, 64, 0], | |||
| ] | |||
| @@ -52,7 +52,7 @@ class QATModule(Module): | |||
| self.weight_fake_quant = safe_call(qconfig.weight_fake_quant) | |||
| def _enable_exec(self, with_module, func, enable): | |||
| if not with_module: | |||
| if not with_module or not func: | |||
| return | |||
| if enable: | |||
| func.enable() | |||
| @@ -26,40 +26,40 @@ class Sequential(Module): | |||
| import megengine as mge | |||
| import megengine.module as M | |||
| import megengine.functional as F | |||
| from collections import OrderedDict | |||
| batch_size = 64 | |||
| data = mge.tensor(np.zeros((batch_size, 1, 28, 28)), dtype=np.float32) | |||
| label = mge.tensor(np.zeros(batch_size,), dtype=np.int32) | |||
| data = data.reshape(batch_size, -1) | |||
| net = M.Sequential( | |||
| net0 = M.Sequential( | |||
| M.Linear(28 * 28, 320), | |||
| M.Linear(320, 500), | |||
| M.Linear(500, 320), | |||
| M.Linear(320, 10) | |||
| ) | |||
| pred = net(data) | |||
| pred0 = net0(data) | |||
| loss = F.cross_entropy_with_softmax(pred, label) | |||
| modules = OrderedDict() | |||
| modules["fc0"] = nn.Linear(28 * 28, 320) | |||
| modules["fc1"] = nn.Linear(320, 10) | |||
| net1 = nn.Sequential(modules) | |||
| pred1 = net1(data) | |||
| """ | |||
| def __init__(self, *args): | |||
| super().__init__() | |||
| self.layer_keys = [] | |||
| self.layer_values = [] | |||
| if len(args) == 1 and isinstance(args[0], OrderedDict): | |||
| for key, module in args[0].items(): | |||
| # self.add_module(key, module) | |||
| setattr(self, key, module) | |||
| self.layer_keys.append(key) | |||
| self.layer_values.append(module) | |||
| else: | |||
| for idx, module in enumerate(args): | |||
| # self.add_module(str(idx), module) | |||
| setattr(self, str(idx), module) | |||
| self.layer_keys.append(str(idx)) | |||
| self.layer_values.append(module) | |||
| def __getitem__(self, idx): | |||
| if isinstance(idx, slice): | |||
| @@ -67,11 +67,10 @@ class Sequential(Module): | |||
| OrderedDict(zip(self.layer_keys[idx], self.layer_values[idx])) | |||
| ) | |||
| else: | |||
| return self.layer_values[idx] | |||
| return getattr(self, self.layer_keys[idx]) | |||
| def __setitem__(self, idx, module): | |||
| key = self.layer_keys[idx] | |||
| self.layer_values[idx] = module | |||
| return setattr(self, key, module) | |||
| def __delitem__(self, idx): | |||
| @@ -79,11 +78,9 @@ class Sequential(Module): | |||
| for key in self.layer_keys[idx]: | |||
| delattr(self, key) | |||
| del self.layer_keys[idx] | |||
| del self.layer_values[idx] | |||
| else: | |||
| delattr(self, self.layer_keys[idx]) | |||
| del self.layer_keys[idx] | |||
| del self.layer_values[idx] | |||
| def __len__(self): | |||
| return len(self.layer_keys) | |||
| @@ -91,6 +88,10 @@ class Sequential(Module): | |||
| def __iter__(self): | |||
| return iter(self.layer_values) | |||
| @property | |||
| def layer_values(self): | |||
| return [getattr(self, key) for key in self.layer_keys] | |||
| def forward(self, inp): | |||
| for layer in self.layer_values: | |||
| inp = layer(inp) | |||
| @@ -0,0 +1,183 @@ | |||
| # -*- coding: utf-8 -*- | |||
| # MegEngine is Licensed under the Apache License, Version 2.0 (the "License") | |||
| # | |||
| # Copyright (c) 2014-2020 Megvii Inc. All rights reserved. | |||
| # | |||
| # Unless required by applicable law or agreed to in writing, | |||
| # software distributed under the License is distributed on an | |||
| # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| import os | |||
| import time | |||
| import numpy as np | |||
| import pytest | |||
| from megengine.data.collator import Collator | |||
| from megengine.data.dataloader import DataLoader | |||
| from megengine.data.dataset import ArrayDataset | |||
| from megengine.data.sampler import RandomSampler, SequentialSampler | |||
| from megengine.data.transform import PseudoTransform, Transform | |||
| def init_dataset(): | |||
| sample_num = 100 | |||
| rand_data = np.random.randint(0, 255, size=(sample_num, 1, 32, 32), dtype=np.uint8) | |||
| label = np.random.randint(0, 10, size=(sample_num,), dtype=int) | |||
| dataset = ArrayDataset(rand_data, label) | |||
| return dataset | |||
| def test_dataloader_init(): | |||
| dataset = init_dataset() | |||
| with pytest.raises(ValueError): | |||
| dataloader = DataLoader(dataset, num_workers=2, divide=True) | |||
| with pytest.raises(ValueError): | |||
| dataloader = DataLoader(dataset, num_workers=-1) | |||
| with pytest.raises(ValueError): | |||
| dataloader = DataLoader(dataset, timeout=-1) | |||
| with pytest.raises(ValueError): | |||
| dataloader = DataLoader(dataset, num_workers=0, divide=True) | |||
| dataloader = DataLoader(dataset) | |||
| assert isinstance(dataloader.sampler, SequentialSampler) | |||
| assert isinstance(dataloader.transform, PseudoTransform) | |||
| assert isinstance(dataloader.collator, Collator) | |||
| dataloader = DataLoader( | |||
| dataset, sampler=RandomSampler(dataset, batch_size=6, drop_last=False) | |||
| ) | |||
| assert len(dataloader) == 17 | |||
| dataloader = DataLoader( | |||
| dataset, sampler=RandomSampler(dataset, batch_size=6, drop_last=True) | |||
| ) | |||
| assert len(dataloader) == 16 | |||
| def test_dataloader_serial(): | |||
| dataset = init_dataset() | |||
| dataloader = DataLoader( | |||
| dataset, sampler=RandomSampler(dataset, batch_size=4, drop_last=False) | |||
| ) | |||
| for (data, label) in dataloader: | |||
| assert data.shape == (4, 1, 32, 32) | |||
| assert label.shape == (4,) | |||
| def test_dataloader_parallel(): | |||
| # set max shared memory to 100M | |||
| os.environ["MGE_PLASMA_MEMORY"] = "100000000" | |||
| dataset = init_dataset() | |||
| dataloader = DataLoader( | |||
| dataset, | |||
| sampler=RandomSampler(dataset, batch_size=4, drop_last=False), | |||
| num_workers=2, | |||
| divide=False, | |||
| ) | |||
| for (data, label) in dataloader: | |||
| assert data.shape == (4, 1, 32, 32) | |||
| assert label.shape == (4,) | |||
| dataloader = DataLoader( | |||
| dataset, | |||
| sampler=RandomSampler(dataset, batch_size=4, drop_last=False), | |||
| num_workers=2, | |||
| divide=True, | |||
| ) | |||
| for (data, label) in dataloader: | |||
| assert data.shape == (4, 1, 32, 32) | |||
| assert label.shape == (4,) | |||
| def test_dataloader_parallel_timeout(): | |||
| dataset = init_dataset() | |||
| class TimeoutTransform(Transform): | |||
| def __init__(self): | |||
| pass | |||
| def apply(self, input): | |||
| time.sleep(10) | |||
| return input | |||
| dataloader = DataLoader( | |||
| dataset, | |||
| sampler=RandomSampler(dataset, batch_size=4, drop_last=False), | |||
| transform=TimeoutTransform(), | |||
| num_workers=2, | |||
| timeout=2, | |||
| ) | |||
| with pytest.raises(RuntimeError, match=r".*timeout.*"): | |||
| data_iter = iter(dataloader) | |||
| batch_data = next(data_iter) | |||
| def test_dataloader_parallel_worker_exception(): | |||
| dataset = init_dataset() | |||
| class FakeErrorTransform(Transform): | |||
| def __init__(self): | |||
| pass | |||
| def apply(self, input): | |||
| y = x + 1 | |||
| return input | |||
| dataloader = DataLoader( | |||
| dataset, | |||
| sampler=RandomSampler(dataset, batch_size=4, drop_last=False), | |||
| transform=FakeErrorTransform(), | |||
| num_workers=2, | |||
| ) | |||
| with pytest.raises(RuntimeError, match=r"worker.*died"): | |||
| data_iter = iter(dataloader) | |||
| batch_data = next(data_iter) | |||
| def _multi_instances_parallel_dataloader_worker(): | |||
| dataset = init_dataset() | |||
| for divide_flag in [True, False]: | |||
| train_dataloader = DataLoader( | |||
| dataset, | |||
| sampler=RandomSampler(dataset, batch_size=4, drop_last=False), | |||
| num_workers=2, | |||
| divide=divide_flag, | |||
| ) | |||
| val_dataloader = DataLoader( | |||
| dataset, | |||
| sampler=RandomSampler(dataset, batch_size=10, drop_last=False), | |||
| num_workers=2, | |||
| divide=divide_flag, | |||
| ) | |||
| for idx, (data, label) in enumerate(train_dataloader): | |||
| assert data.shape == (4, 1, 32, 32) | |||
| assert label.shape == (4,) | |||
| if idx % 5 == 0: | |||
| for val_data, val_label in val_dataloader: | |||
| assert val_data.shape == (10, 1, 32, 32) | |||
| assert val_label.shape == (10,) | |||
| def test_dataloader_parallel_multi_instances(): | |||
| # set max shared memory to 100M | |||
| os.environ["MGE_PLASMA_MEMORY"] = "100000000" | |||
| _multi_instances_parallel_dataloader_worker() | |||
| def test_dataloader_parallel_multi_instances_multiprocessing(): | |||
| # set max shared memory to 100M | |||
| os.environ["MGE_PLASMA_MEMORY"] = "100000000" | |||
| import multiprocessing as mp | |||
| # mp.set_start_method("spawn") | |||
| processes = [] | |||
| for i in range(4): | |||
| p = mp.Process(target=_multi_instances_parallel_dataloader_worker) | |||
| p.start() | |||
| processes.append(p) | |||
| for p in processes: | |||
| p.join() | |||
| @@ -460,9 +460,9 @@ def test_sequential_named_children(): | |||
| modules["name2"] = Linear(5, 1) | |||
| m = Sequential(modules) | |||
| l = list(m.named_children()) | |||
| assert l[0][0] == "layer_values.0" | |||
| assert l[1][0] == "layer_values.1" | |||
| assert l[2][0] == "layer_values.2" | |||
| assert l[0][0] == "name0" | |||
| assert l[1][0] == "name1" | |||
| assert l[2][0] == "name2" | |||
| def test_state_dict(): | |||