Merge pull request !30865 from cathwong/ckw_batch_workers_pymultipr1.7
| @@ -97,7 +97,8 @@ PYBIND_REGISTER( | |||
| (void)py::class_<BatchNode, DatasetNode, std::shared_ptr<BatchNode>>(*m, "BatchNode", "to create a BatchNode") | |||
| .def(py::init([](const std::shared_ptr<DatasetNode> &self, int32_t batch_size, bool drop_remainder, bool pad, | |||
| const py::list &in_col_names, const py::list &out_col_names, const py::list &col_order, | |||
| const py::object &size_obj, const py::object &map_obj, const py::dict &pad_info) { | |||
| const py::object &size_obj, const py::object &map_obj, const py::dict &pad_info, | |||
| std::shared_ptr<PythonMultiprocessingRuntime> python_mp) { | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> c_pad_info; | |||
| if (pad) { | |||
| THROW_IF_ERROR(toPadInfo(pad_info, &c_pad_info)); | |||
| @@ -107,7 +108,7 @@ PYBIND_REGISTER( | |||
| py::function map_func = py::isinstance<py::function>(map_obj) ? map_obj.cast<py::function>() : py::function(); | |||
| auto batch = std::make_shared<BatchNode>(self, batch_size, drop_remainder, pad, toStringVector(in_col_names), | |||
| toStringVector(out_col_names), toStringVector(col_order), size_func, | |||
| map_func, c_pad_info); | |||
| map_func, c_pad_info, python_mp); | |||
| THROW_IF_ERROR(batch->ValidateParams()); | |||
| return batch; | |||
| })); | |||
| @@ -1,5 +1,5 @@ | |||
| /** | |||
| * Copyright 2019-2021 Huawei Technologies Co., Ltd | |||
| * Copyright 2019-2022 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| @@ -67,7 +67,8 @@ BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, | |||
| in_col_names_(cols_to_map), | |||
| pad_info_(pad_map), | |||
| batch_num_(0), | |||
| batch_cnt_(0) { | |||
| batch_cnt_(0), | |||
| python_mp_(nullptr) { | |||
| // Adjust connector queue size. After batch each row is batch_size times larger | |||
| worker_connector_size_ = std::max(1, worker_connector_size_ / start_batch_size_); | |||
| if (num_workers == 1) { | |||
| @@ -614,6 +615,7 @@ Status BatchOp::GetNextRowPullMode(TensorRow *const row) { | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status BatchOp::SendWaitFlagToWorker(int32_t worker_id) { | |||
| RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kWait)))); | |||
| return Status::OK(); | |||
| @@ -623,5 +625,43 @@ Status BatchOp::SendQuitFlagToWorker(int32_t worker_id) { | |||
| RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kQuit)))); | |||
| return Status::OK(); | |||
| } | |||
| Status BatchOp::AddNewWorkers(int32_t num_new_workers) { | |||
| RETURN_IF_NOT_OK(ParallelOp::AddNewWorkers(num_new_workers)); | |||
| if (python_mp_ != nullptr) { | |||
| CHECK_FAIL_RETURN_UNEXPECTED(num_new_workers > 0, "Number of workers added should be greater than 0."); | |||
| python_mp_->AddNewWorkers(num_new_workers); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status BatchOp::RemoveWorkers(int32_t num_workers) { | |||
| RETURN_IF_NOT_OK(ParallelOp::RemoveWorkers(num_workers)); | |||
| if (python_mp_ != nullptr) { | |||
| CHECK_FAIL_RETURN_UNEXPECTED(num_workers > 0, "Number of workers removed should be greater than 0."); | |||
| python_mp_->RemoveWorkers(num_workers); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| void BatchOp::SetPythonMp(std::shared_ptr<PythonMultiprocessingRuntime> python_mp) { | |||
| python_mp_ = std::move(python_mp); | |||
| } | |||
| Status BatchOp::Launch() { | |||
| // Launch Python multiprocessing. This will create the MP pool and shared memory if needed. | |||
| if (python_mp_) { | |||
| MS_LOG(DEBUG) << "Launch Python Multiprocessing for BatchOp:" << id(); | |||
| python_mp_->Launch(id()); | |||
| } | |||
| return DatasetOp::Launch(); | |||
| } | |||
| std::vector<int32_t> BatchOp::GetMPWorkerPIDs() const { | |||
| if (python_mp_ != nullptr) { | |||
| return python_mp_->GetPIDs(); | |||
| } | |||
| return DatasetOp::GetMPWorkerPIDs(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -1,5 +1,5 @@ | |||
| /** | |||
| * Copyright 2019-2021 Huawei Technologies Co., Ltd | |||
| * Copyright 2019-2022 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| @@ -26,6 +26,7 @@ | |||
| #include <utility> | |||
| #include <vector> | |||
| #include "minddata/dataset/api/python/python_mp.h" | |||
| #include "minddata/dataset/core/config_manager.h" | |||
| #include "minddata/dataset/core/tensor.h" | |||
| #include "minddata/dataset/engine/dataset_iterator.h" | |||
| @@ -218,6 +219,14 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc | |||
| return false; | |||
| } | |||
| /// Set the instance of Python multiprocessing which will passed from Python | |||
| /// \param python_mp PythonMultiprocessingRuntime | |||
| void SetPythonMp(std::shared_ptr<PythonMultiprocessingRuntime> python_mp); | |||
| /// Return the list of PIDs of worker processes | |||
| /// \return vector of int | |||
| std::vector<int32_t> GetMPWorkerPIDs() const override; | |||
| private: | |||
| // Worker thread for doing the memcpy of batch | |||
| // @param int32_t param workerId | |||
| @@ -284,6 +293,13 @@ class BatchOp : public ParallelOp<std::pair<std::unique_ptr<TensorQTable>, CBatc | |||
| py::function batch_size_func_; // Function pointer of batch size function | |||
| py::function batch_map_func_; // Function pointer of per batch map function | |||
| #endif | |||
| std::shared_ptr<PythonMultiprocessingRuntime> python_mp_; // python multiprocessing instance | |||
| protected: | |||
| Status Launch() override; | |||
| Status AddNewWorkers(int32_t num_new_workers) override; | |||
| Status RemoveWorkers(int32_t num_workers) override; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -1,5 +1,5 @@ | |||
| /** | |||
| * Copyright 2020-2021 Huawei Technologies Co., Ltd | |||
| * Copyright 2020-2022 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| @@ -34,7 +34,8 @@ BatchNode::BatchNode(std::shared_ptr<DatasetNode> child, int32_t batch_size, boo | |||
| const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names, | |||
| const std::vector<std::string> &col_order, py::function batch_size_func, | |||
| py::function batch_map_func, | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_map) | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_map, | |||
| std::shared_ptr<PythonMultiprocessingRuntime> python_mp) | |||
| : batch_size_(batch_size), | |||
| drop_remainder_(drop_remainder), | |||
| pad_(pad), | |||
| @@ -43,7 +44,8 @@ BatchNode::BatchNode(std::shared_ptr<DatasetNode> child, int32_t batch_size, boo | |||
| col_order_(col_order), | |||
| batch_size_func_(batch_size_func), | |||
| batch_map_func_(batch_map_func), | |||
| pad_map_(pad_map) { | |||
| pad_map_(pad_map), | |||
| python_mp_(python_mp) { | |||
| this->AddChild(child); | |||
| } | |||
| #endif | |||
| @@ -57,7 +59,7 @@ BatchNode::BatchNode(std::shared_ptr<DatasetNode> child, int32_t batch_size, boo | |||
| std::shared_ptr<DatasetNode> BatchNode::Copy() { | |||
| #ifdef ENABLE_PYTHON | |||
| auto node = std::make_shared<BatchNode>(nullptr, batch_size_, drop_remainder_, pad_, in_col_names_, out_col_names_, | |||
| col_order_, batch_size_func_, batch_map_func_, pad_map_); | |||
| col_order_, batch_size_func_, batch_map_func_, pad_map_, python_mp_); | |||
| #else | |||
| auto node = std::make_shared<BatchNode>(nullptr, batch_size_, drop_remainder_); | |||
| #endif | |||
| @@ -105,6 +107,9 @@ Status BatchNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) | |||
| in_col_names_, out_col_names_, batch_size_func_, batch_map_func_, pad_map_); | |||
| op->SetTotalRepeats(GetTotalRepeats()); | |||
| op->SetNumRepeatsPerEpoch(GetNumRepeatsPerEpoch()); | |||
| if (python_mp_ != nullptr) { | |||
| op->SetPythonMp(python_mp_); | |||
| } | |||
| node_ops->push_back(op); | |||
| #else | |||
| node_ops->push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_, | |||
| @@ -1,5 +1,5 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * Copyright 2020-2022 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| @@ -35,7 +35,8 @@ class BatchNode : public DatasetNode { | |||
| BatchNode(std::shared_ptr<DatasetNode> child, int32_t batch_size, bool drop_remainder, bool pad, | |||
| const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names, | |||
| const std::vector<std::string> &col_order, py::function batch_size_func, py::function batch_map_func, | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_map); | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_map, | |||
| std::shared_ptr<PythonMultiprocessingRuntime> python_mp = nullptr); | |||
| #endif | |||
| /// \brief Constructor #2 for C++ API to create a BatchNode | |||
| @@ -124,6 +125,7 @@ class BatchNode : public DatasetNode { | |||
| py::function batch_map_func_; | |||
| #endif | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_map_; | |||
| std::shared_ptr<PythonMultiprocessingRuntime> python_mp_; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -2400,7 +2400,7 @@ class BatchDataset(UnionBaseDataset): | |||
| def parse(self, children=None): | |||
| return cde.BatchNode(children[0], self.batch_size, self.drop_remainder, self.pad, self.input_columns, | |||
| self.output_columns, self.column_order, self.batch_size_func, self.per_batch_map, | |||
| self.pad_info) | |||
| self.pad_info, self.process_pool) | |||
| @staticmethod | |||
| def _is_ancestor_of_repeat(dataset): | |||
| @@ -2446,7 +2446,7 @@ class BatchDataset(UnionBaseDataset): | |||
| """ | |||
| if self.python_multiprocessing: | |||
| if self.per_batch_map is None: | |||
| logger.warning("per_batch_map is None so python_multiprocessing does not work.") | |||
| logger.warning("per_batch_map is None so python_multiprocessing is ignored for batch.") | |||
| return | |||
| # If user didn't specify num_parallel_workers, set it to default | |||
| @@ -1,4 +1,4 @@ | |||
| # Copyright 2021 Huawei Technologies Co., Ltd | |||
| # Copyright 2021-2022 Huawei Technologies Co., Ltd | |||
| # | |||
| # Licensed under the Apache License, Version 2.0 (the "License"); | |||
| # you may not use this file except in compliance with the License. | |||
| @@ -124,16 +124,18 @@ def test_autotune_train_simple_model(tmp_path): | |||
| ds.config.set_seed(original_seed) | |||
| def create_dataset_pyfunc_multiproc(data_path, batch_size=32, num_map_parallel_workers=1, max_rowsize=16): | |||
| def create_dataset_pyfunc_multiproc(data_path, batch_size=32, num_op_parallel_workers=1, max_rowsize=16): | |||
| """ | |||
| Create dataset with Python ops list and python_multiprocessing=True for Map op | |||
| Create Mnist dataset pipline with Python Multiprocessing enabled for | |||
| - Batch op using per_batch_map | |||
| - Map ops using Pyfuncs | |||
| """ | |||
| # Define dataset | |||
| # Define dataset with num_parallel_workers=8 for reasonable performance | |||
| data1 = ds.MnistDataset(data_path, num_parallel_workers=8) | |||
| data1 = data1.map(operations=[py_vision.ToType(np.int32)], input_columns="label", | |||
| num_parallel_workers=num_map_parallel_workers, | |||
| num_parallel_workers=num_op_parallel_workers, | |||
| python_multiprocessing=True, max_rowsize=max_rowsize) | |||
| # Setup transforms list which include Python ops | |||
| @@ -146,11 +148,18 @@ def create_dataset_pyfunc_multiproc(data_path, batch_size=32, num_map_parallel_w | |||
| lambda y: y | |||
| ] | |||
| compose_op = py_transforms.Compose(transforms_list) | |||
| data1 = data1.map(operations=compose_op, input_columns="image", num_parallel_workers=num_map_parallel_workers, | |||
| data1 = data1.map(operations=compose_op, input_columns="image", num_parallel_workers=num_op_parallel_workers, | |||
| python_multiprocessing=True, max_rowsize=max_rowsize) | |||
| # Callable function to swap order of 2 columns | |||
| def swap_columns(col1, col2, batch_info): | |||
| return (col2, col1,) | |||
| # Apply Dataset Ops | |||
| data1 = data1.batch(batch_size, drop_remainder=True) | |||
| data1 = data1.batch(batch_size, drop_remainder=True, per_batch_map=swap_columns, | |||
| input_columns=['image', 'label'], | |||
| output_columns=['mylabel', 'myimage'], | |||
| num_parallel_workers=num_op_parallel_workers, python_multiprocessing=True) | |||
| return data1 | |||
| @@ -24,16 +24,18 @@ import mindspore.dataset.vision.c_transforms as c_vision | |||
| import mindspore.dataset.vision.py_transforms as py_vision | |||
| from mindspore.dataset.vision import Inter | |||
| DATA_DIR = "../data/dataset/testCifar10Data" | |||
| CIFAR10_DATA_DIR = "../data/dataset/testCifar10Data" | |||
| CIFAR100_DATA_DIR = "../data/dataset/testCifar100Data" | |||
| def create_pyfunc_dataset(batch_size=32, repeat_size=1, num_parallel_workers=1, num_samples=None): | |||
| """ | |||
| Create Cifar10 dataset pipline with Map ops containing only Python functions and Python Multiprocessing enabled | |||
| for Map ops and Batch op | |||
| """ | |||
| # Define dataset | |||
| cifar10_ds = ds.Cifar10Dataset(DATA_DIR, num_samples=num_samples) | |||
| cifar10_ds = ds.Cifar10Dataset(CIFAR10_DATA_DIR, num_samples=num_samples) | |||
| cifar10_ds = cifar10_ds.map(operations=[py_vision.ToType(np.int32)], input_columns="label", | |||
| num_parallel_workers=num_parallel_workers, python_multiprocessing=True) | |||
| @@ -59,11 +61,12 @@ def create_pyfunc_dataset(batch_size=32, repeat_size=1, num_parallel_workers=1, | |||
| def create_pyop_cop_dataset(batch_size=32, repeat_size=1, num_parallel_workers=1, num_samples=None): | |||
| """ | |||
| Create Cifar10 dataset pipeline with Map ops containing just C Ops or just Pyfuncs | |||
| Create Cifar10 dataset pipeline with Map ops containing just C Ops or just Pyfuncs, and | |||
| Python Multiprocessing enabled for Map ops and Batch op | |||
| """ | |||
| # Define dataset | |||
| cifar10_ds = ds.Cifar10Dataset(DATA_DIR, num_samples=num_samples) | |||
| cifar10_ds = ds.Cifar10Dataset(CIFAR10_DATA_DIR, num_samples=num_samples) | |||
| # Map#1 - with Pyfunc | |||
| cifar10_ds = cifar10_ds.map(operations=[py_vision.ToType(np.int32)], input_columns="label", | |||
| @@ -103,11 +106,12 @@ def create_pyop_cop_dataset(batch_size=32, repeat_size=1, num_parallel_workers=1 | |||
| def create_mixed_map_dataset(batch_size=32, repeat_size=1, num_parallel_workers=1, num_samples=None): | |||
| """ | |||
| Create Cifar10 dataset pipeline with a Map op containing of both C Ops and Pyfuncs | |||
| Create Cifar10 dataset pipeline with a Map op containing of both C Ops and Pyfuncs, and | |||
| Python Multiprocessing enabled for Map ops and Batch op | |||
| """ | |||
| # Define dataset | |||
| cifar10_ds = ds.Cifar10Dataset(DATA_DIR, num_samples=num_samples) | |||
| cifar10_ds = ds.Cifar10Dataset(CIFAR10_DATA_DIR, num_samples=num_samples) | |||
| cifar10_ds = cifar10_ds.map(operations=[py_vision.ToType(np.int32)], input_columns="label", | |||
| num_parallel_workers=num_parallel_workers, python_multiprocessing=True) | |||
| @@ -119,13 +123,79 @@ def create_mixed_map_dataset(batch_size=32, repeat_size=1, num_parallel_workers= | |||
| hwc2chw_op = c_vision.HWC2CHW() | |||
| cifar10_ds = cifar10_ds.map( | |||
| operations=[lambda x: x, resize_op, rescale_op, rescale_nml_op, hwc2chw_op, lambda y: y], | |||
| input_columns="image", num_parallel_workers=num_parallel_workers, | |||
| python_multiprocessing=True) | |||
| input_columns="image", num_parallel_workers=num_parallel_workers, python_multiprocessing=True) | |||
| # Apply Dataset Ops | |||
| cifar10_ds = cifar10_ds.batch(batch_size, drop_remainder=True) | |||
| cifar10_ds = cifar10_ds.repeat(repeat_size) | |||
| return cifar10_ds | |||
| def create_per_batch_map_dataset(batch_size=32, repeat_size=1, num_parallel_workers=1, num_samples=None): | |||
| """ | |||
| Create Cifar100 dataset pipline with Batch op using per_batch_map and Python Multiprocessing enabled | |||
| """ | |||
| # Define dataset | |||
| cifar100_ds = ds.Cifar100Dataset(CIFAR100_DATA_DIR, num_samples=num_samples) | |||
| cifar100_ds = cifar100_ds.map(operations=[py_vision.ToType(np.int32)], input_columns="fine_label") | |||
| cifar100_ds = cifar100_ds.map(operations=[lambda z: z], input_columns="image") | |||
| # Callable function to delete 3rd column | |||
| def del_column(col1, col2, col3, batch_info): | |||
| return (col1, col2,) | |||
| # Apply Dataset Ops | |||
| buffer_size = 10000 | |||
| cifar100_ds = cifar100_ds.shuffle(buffer_size=buffer_size) | |||
| # Note: Test repeat before batch | |||
| cifar100_ds = cifar100_ds.repeat(repeat_size) | |||
| cifar100_ds = cifar100_ds.batch(batch_size, per_batch_map=del_column, | |||
| input_columns=['image', 'fine_label', 'coarse_label'], | |||
| output_columns=['image', 'label'], drop_remainder=True, | |||
| num_parallel_workers=num_parallel_workers, python_multiprocessing=True) | |||
| return cifar100_ds | |||
| def create_mp_dataset(batch_size=32, repeat_size=1, num_parallel_workers=1, num_samples=None): | |||
| """ | |||
| Create Cifar10 dataset pipline with Python Multiprocessing enabled for | |||
| - Batch op using batch_per_map | |||
| - Map ops using Pyfuncs | |||
| """ | |||
| # Define dataset | |||
| cifar10_ds = ds.Cifar10Dataset(CIFAR10_DATA_DIR, num_samples=num_samples) | |||
| cifar10_ds = cifar10_ds.map(operations=[py_vision.ToType(np.int32)], input_columns="label", | |||
| num_parallel_workers=num_parallel_workers, python_multiprocessing=True) | |||
| # Setup transforms list which include Python ops / Pyfuncs | |||
| transforms_list = [ | |||
| py_vision.ToPIL(), | |||
| py_vision.RandomGrayscale(prob=0.8), | |||
| np.array] # need to convert PIL image to a NumPy array to pass it to C++ operation | |||
| compose_op = py_transforms.Compose(transforms_list) | |||
| cifar10_ds = cifar10_ds.map(operations=compose_op, input_columns="image", | |||
| num_parallel_workers=num_parallel_workers, python_multiprocessing=True) | |||
| # Callable function to swap columns | |||
| def swap_columns(col1, col2, batch_info): | |||
| return (col2, col1,) | |||
| # Apply Dataset Ops | |||
| buffer_size = 10000 | |||
| cifar10_ds = cifar10_ds.shuffle(buffer_size=buffer_size) | |||
| cifar10_ds = cifar10_ds.batch(batch_size, drop_remainder=True) | |||
| cifar10_ds = cifar10_ds.batch(batch_size, drop_remainder=True, | |||
| per_batch_map=swap_columns, | |||
| input_columns=['image', 'label'], | |||
| output_columns=['mylabel', 'myimage'], | |||
| num_parallel_workers=num_parallel_workers, | |||
| python_multiprocessing=True) | |||
| cifar10_ds = cifar10_ds.repeat(repeat_size) | |||
| return cifar10_ds | |||
| @@ -155,7 +225,7 @@ class TestPythonMultiprocAutotune: | |||
| ds.config.set_enable_autotune(self.original_autotune) | |||
| @staticmethod | |||
| def test_cifar10_pyfunc_pipeline(): | |||
| def test_pymultiproc_at_map_pyfunc(): | |||
| """ | |||
| Feature: Python Multiprocessing with AutoTune | |||
| Description: Test pipeline with Map ops containing only Python function | |||
| @@ -169,7 +239,7 @@ class TestPythonMultiprocAutotune: | |||
| assert mycount1 == 12 | |||
| @staticmethod | |||
| def test_cifar10_pyfunc_pipeline_all_samples(): | |||
| def test_pymultiproc_at_map_pyfunc_pipeline_all_samples(): | |||
| """ | |||
| Feature: Python Multiprocessing with AutoTune | |||
| Description: Test pipeline with Map ops containing only Python function, with all samples in dataset | |||
| @@ -183,7 +253,7 @@ class TestPythonMultiprocAutotune: | |||
| assert mycount1 == 312 | |||
| @staticmethod | |||
| def test_cifar10_pyop_cop_pipeline(): | |||
| def test_pymultiproc_at_map_pyop_cop(): | |||
| """ | |||
| Feature: Python Multiprocessing with AutoTune | |||
| Description: Test pipeline with Map ops containing just C Ops or just Pyfuncs | |||
| @@ -196,14 +266,50 @@ class TestPythonMultiprocAutotune: | |||
| assert mycount1 == 37 | |||
| @staticmethod | |||
| def test_cifar10_mixed_map_pipeline(): | |||
| def test_pymultiproc_at_map_mixed(): | |||
| """ | |||
| Feature: Python Multiprocessing with AutoTune | |||
| Description: Test pipeline with a Map op containing of both C Ops and Pyfuncs | |||
| Expectation: Data pipeline executes successfully with correct number of rows | |||
| """ | |||
| mydata1 = create_mixed_map_dataset(32, 2, num_parallel_workers=12, num_samples=500) | |||
| mydata1 = create_mixed_map_dataset(32, 2, num_parallel_workers=2, num_samples=500) | |||
| mycount1 = 0 | |||
| for _ in mydata1.create_dict_iterator(num_epochs=1): | |||
| mycount1 += 1 | |||
| assert mycount1 == 30 | |||
| @staticmethod | |||
| def test_pymultiproc_at_per_batch_map(): | |||
| """ | |||
| Feature: Python Multiprocessing with AutoTune | |||
| Description: Test pipeline with Batch op using per_batch_map | |||
| Expectation: Data pipeline executes successfully with correct number of rows | |||
| """ | |||
| # Note: Set num_parallel_workers to minimum of 1 | |||
| mydata1 = create_per_batch_map_dataset(32, repeat_size=3, num_parallel_workers=1, num_samples=300) | |||
| mycount1 = 0 | |||
| for _ in mydata1.create_dict_iterator(num_epochs=1): | |||
| mycount1 += 1 | |||
| assert mycount1 == 28 | |||
| @staticmethod | |||
| def test_pymultiproc_at_pipeline(): | |||
| """ | |||
| Feature: Python Multiprocessing with AutoTune | |||
| Description: Test pipeline with Python multiprocessing enabled for dataset, map and batch ops | |||
| Expectation: Data pipeline executes successfully with correct number of rows | |||
| """ | |||
| mydata1 = create_mp_dataset(32, repeat_size=2, num_parallel_workers=2, num_samples=700) | |||
| mycount1 = 0 | |||
| for _ in mydata1.create_dict_iterator(num_epochs=1): | |||
| mycount1 += 1 | |||
| assert mycount1 == 42 | |||
| if __name__ == '__main__': | |||
| TestPythonMultiprocAutotune.test_pymultiproc_at_map_pyfunc() | |||
| TestPythonMultiprocAutotune.test_pymultiproc_at_map_pyfunc_pipeline_all_samples() | |||
| TestPythonMultiprocAutotune.test_pymultiproc_at_map_pyop_cop() | |||
| TestPythonMultiprocAutotune.test_pymultiproc_at_map_mixed() | |||
| TestPythonMultiprocAutotune.test_pymultiproc_at_per_batch_map() | |||
| TestPythonMultiprocAutotune.test_pymultiproc_at_pipeline() | |||