Browse Source

[AUTO_PARALLEL]Fix insert nodes error

feature/build-system-rewrite
huangxinjing 4 years ago
parent
commit
5e325ac336
4 changed files with 100 additions and 17 deletions
  1. +1
    -1
      mindspore/ccsrc/frontend/parallel/pipeline_transformer/pipeline_transformer.cc
  2. +1
    -3
      mindspore/ccsrc/frontend/parallel/step_parallel.cc
  3. +3
    -4
      mindspore/ccsrc/frontend/parallel/step_parallel_utils.cc
  4. +95
    -9
      tests/ut/python/parallel/test_pipeline_split.py

+ 1
- 1
mindspore/ccsrc/frontend/parallel/pipeline_transformer/pipeline_transformer.cc View File

@@ -92,7 +92,7 @@ bool PipelineTransformer::NeedGrad(const CNodePtr &cnode, const CNodePtr &graph_
for (auto &input : cnode->inputs()) {
auto temp = input;
while (IsPrimitiveCNode(temp, prim::kPrimLoad) || IsPrimitiveCNode(temp, prim::kPrimCast)) {
auto input_cnode = input->cast<CNodePtr>();
auto input_cnode = temp->cast<CNodePtr>();
temp = input_cnode->input(1);
}
if (temp->isa<Parameter>()) {


+ 1
- 3
mindspore/ccsrc/frontend/parallel/step_parallel.cc View File

@@ -970,9 +970,7 @@ std::pair<bool, CNodePtr> FindCNode(const AnfNodePtr &anode, const std::string &
if (use_apply == nullptr || !IsValueNode<Primitive>(use_apply->input(0))) {
continue;
}
if (ParallelContext::GetInstance()->enable_parallel_optimizer()) {
use_apply = SkipTrivialNodesMoveDown(manager, use_apply);
}
use_apply = SkipTrivialNodesMoveDown(manager, use_apply);
ValueNodePtr prim_anf_node = use_apply->input(0)->cast<ValueNodePtr>();
MS_EXCEPTION_IF_NULL(prim_anf_node);
PrimitivePtr node_prim = prim_anf_node->value()->cast<PrimitivePtr>();


+ 3
- 4
mindspore/ccsrc/frontend/parallel/step_parallel_utils.cc View File

@@ -50,12 +50,11 @@
namespace mindspore {
namespace parallel {
bool IsSomePrimitive(const CNodePtr &cnode, const std::string &name) {
if (!cnode) {
return false;
}
if (!cnode) return false;
ValueNodePtr anf_node = cnode->input(0)->cast<ValueNodePtr>();
MS_EXCEPTION_IF_NULL(anf_node);
if (!anf_node) return false;
PrimitivePtr prim = anf_node->value()->cast<PrimitivePtr>();
if (!prim) return false;
return (prim->name() == name);
}



+ 95
- 9
tests/ut/python/parallel/test_pipeline_split.py View File

@@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
import os
import shutil
import glob

import numpy as np
import mindspore as ms
import mindspore.nn as nn
@@ -57,7 +61,7 @@ class DatasetLenet():


class MatMulCell(nn.Cell):
def __init__(self, strategy1, strategy2, param=None):
def __init__(self, strategy1, strategy2, param=None, dtype=ms.float32):
super().__init__()
self.param = Parameter(initializer("zeros", [64, 64]), name="param")
if param is not None:
@@ -65,19 +69,21 @@ class MatMulCell(nn.Cell):
self.param1 = Parameter(initializer("zeros", [64, 64]), name="param1")
self.matmul = P.MatMul().shard(strategy1)
self.matmul1 = P.MatMul().shard(strategy2)
self.cast = P.Cast()
self.dtype = dtype

def construct(self, x):
out = self.matmul(x, self.param)
out = self.matmul1(out, self.param1)
out = self.matmul(self.cast(x, self.dtype), self.cast(self.param, self.dtype))
out = self.matmul1(out, self.cast(self.param1, self.dtype))
return out


class Net(nn.Cell):
def __init__(self, strategy1, strategy2, param=None):
def __init__(self, strategy1, strategy2, param=None, dtype=ms.float32):
super().__init__()
self.block = nn.CellList()
for i in range(2):
cell = MatMulCell(strategy1, strategy2, param)
cell = MatMulCell(strategy1, strategy2, param, dtype)
cell.pipeline_stage = i
self.block.append(cell)

@@ -88,9 +94,9 @@ class Net(nn.Cell):


class PipelineSplit(nn.Cell):
def __init__(self, strategy1, strategy2):
def __init__(self, strategy1, strategy2, dtype=ms.float32):
super().__init__()
self.cell = Net(strategy1, strategy2)
self.cell = Net(strategy1, strategy2, dtype=dtype)
self.cell.block[0].matmul.add_prim_attr("parameter_start", 0)

def construct(self, x, label):
@@ -99,10 +105,10 @@ class PipelineSplit(nn.Cell):


class PipelineSplit2(nn.Cell):
def __init__(self, strategy1, strategy2):
def __init__(self, strategy1, strategy2, dtype=ms.float32):
super().__init__()
self.param = Parameter(initializer("zeros", [64, 64]), name="param")
self.cell = Net(strategy1, strategy2, self.param)
self.cell = Net(strategy1, strategy2, self.param, dtype)
self.cell.block[0].matmul.add_prim_attr("parameter_start", 0)

def construct(self, x, label):
@@ -353,3 +359,83 @@ def test_pipeline_split_shared_parameter_with_micro_batch_interleaved_stage1_opt
optimizer = nn.Lamb(params, learning_rate=0.01)
model = Model(net, optimizer=optimizer)
model.train(2, dataset, dataset_sink_mode=False)


def run_pipeline_split_function(pipeline_net, micro_batch_interleaved=1):
"""
Feature: test PipelineSplitSharedParameter with MicroBatchInterleaved in auto parallel.
Description: net with MicroBatchInterleaved in semi auto parallel.
Expectation: success.
"""
data = Tensor(np.ones([32, 64]), dtype=ms.float32)
label = Tensor(np.ones([64, 64]), dtype=ms.float32)

net = PipelineCell(MicroBatchInterleaved(pipeline_net, micro_batch_interleaved), 4)
params = net.infer_param_pipeline_stage()
dataset = DatasetLenet(data, label, 3)
optimizer = nn.Lamb(params, learning_rate=0.01)
model = Model(net, optimizer=optimizer)
model.train(2, dataset, dataset_sink_mode=False)


class TestPipelineSplitWithNoOptimizer:
def setup_method(self):
self.output_path = './graphs' + self.__str__()
context.set_context(save_graphs=True,
save_graphs_path=self.output_path)

def teardown_method(self):
shutil.rmtree(self.output_path)

def cat_fp16_from_ir(self, pattern, target_count):
"""
This function will check the float16 count with the golden one.
:param pattern: The match pattern for the specific count
:param target_count: The gold float16 count in the Ir files
"""
ir_files = glob.glob(os.path.join(self.output_path, 'rank_0', '*_validate*.ir'))
assert len(ir_files) == 1
appear_count = 0
with open(ir_files[0], 'r') as fp:
for line in fp:
if pattern in line:
appear_count += 1
assert appear_count == target_count

def test_pipeline_with_no_parallel_optimizer_and_micro(self):
"""
Feature: Test Pipeline with Mirror Operator.
Description: When using fp16 computation, there should be only one mirror operator for one parameter.
Expectation: the number of the float16 tensor is not equal to 16, 16 is obtained by manually checked graph.
the number of the Mirror is not equal to 2, 2 is obtained by manually checked graph.
"""
context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2,
enable_parallel_optimizer=False)
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
strategy1 = ((4, 1), (1, 1))
strategy2 = ((2, 1), (1, 1))
pipeline_net = PipelineSplit(strategy1, strategy2, dtype=ms.float16)
run_pipeline_split_function(pipeline_net, micro_batch_interleaved=1)
self.cat_fp16_from_ir(pattern='grad_mirror_MirrorMicroStepOperator',
target_count=2)
self.cat_fp16_from_ir(pattern='Cast(',
target_count=16)

def test_pipeline_with_micro_batch_no_parallel_optimizer(self):
"""
Feature: Test Pipeline with Mirror Operator, when enabled the micro batch interleave.
Description: When using fp16 computation, there should be only one mirror operator for one parameter.
Expectation: the number of the float16 tensor is not equal to 16, 16 is obtained by manually checked graph.
the number of the Mirror is not equal to 2, 2 is obtained by manually checked graph.
"""
context.set_auto_parallel_context(device_num=8, global_rank=0, pipeline_stages=2,
enable_parallel_optimizer=False)
context.set_auto_parallel_context(parallel_mode="semi_auto_parallel")
strategy1 = ((4, 1), (1, 1))
strategy2 = ((2, 1), (1, 1))
pipeline_net = PipelineSplit(strategy1, strategy2, dtype=ms.float16)
run_pipeline_split_function(pipeline_net, micro_batch_interleaved=2)
self.cat_fp16_from_ir(pattern='grad_mirror_MirrorMicroStepOperator',
target_count=2)
self.cat_fp16_from_ir(pattern='Cast(',
target_count=28)

Loading…
Cancel
Save