diff --git a/mindspore/_extends/parallel_compile/tbe_compiler/compiler.py b/mindspore/_extends/parallel_compile/tbe_compiler/compiler.py index f184be9cde..dc0f62a65b 100755 --- a/mindspore/_extends/parallel_compile/tbe_compiler/compiler.py +++ b/mindspore/_extends/parallel_compile/tbe_compiler/compiler.py @@ -26,6 +26,7 @@ build_in_impl_path = get_built_in_impl_path() # op function list op_build = "compile" + def _initialize(impl_path): """Initialize""" if impl_path == "": @@ -37,6 +38,7 @@ def _initialize(impl_path): sys.path.insert(0, op_module_name) + def _replace_range(args): for arg in args: if not arg.__contains__('range'): @@ -47,6 +49,7 @@ def _replace_range(args): if value < 0: range_item[index] = None + def build_op(build_type, json_str): """ call op functions with function name and input args json_str @@ -89,9 +92,9 @@ def build_op(build_type, json_str): op_module = __import__(op_name) else: if is_dynamic_shape: - op_module = __import__("impl.dynamic."+op_name, globals(), locals(), [op_name], 0) + op_module = __import__("impl.dynamic." + op_name, globals(), locals(), [op_name], 0) else: - op_module = __import__("impl."+op_name, globals(), locals(), [op_name], 0) + op_module = __import__("impl." + op_name, globals(), locals(), [op_name], 0) # get function if build_type == op_build: if custom_flag: @@ -149,6 +152,7 @@ def compile_with_json(json_str): ret = build_op(op_build, json_str) return ret + if __name__ == "__main__": in_args = sys.stdin.readline() result = compile_with_json(in_args) diff --git a/mindspore/_extends/parallel_compile/tbe_compiler/tbe_common.py b/mindspore/_extends/parallel_compile/tbe_compiler/tbe_common.py index c589129126..acbd78f812 100644 --- a/mindspore/_extends/parallel_compile/tbe_compiler/tbe_common.py +++ b/mindspore/_extends/parallel_compile/tbe_compiler/tbe_common.py @@ -15,6 +15,7 @@ """tbe common""" import os + class TBEException(Exception): """tbe exception class""" @@ -64,6 +65,7 @@ def _check_arg_info(item): if 'param_type' not in item or not item['param_type']: raise ValueError("Json string Errors, key:param_type not found.") + def get_input_output(io_info, args): """ Parse args. @@ -100,6 +102,7 @@ def get_input_output(io_info, args): if len(item) > 1: args.append(arg) + def get_attr(attr_info, args): """ Parse args. @@ -118,6 +121,7 @@ def get_attr(attr_info, args): if item["name"] != "isRef": args.append(item['value']) + def get_args(op_info, arg_type): """ Parse args. diff --git a/mindspore/_extends/parallel_compile/tbe_compiler/tbe_process.py b/mindspore/_extends/parallel_compile/tbe_compiler/tbe_process.py index 264e43edb5..8bd15294ad 100644 --- a/mindspore/_extends/parallel_compile/tbe_compiler/tbe_process.py +++ b/mindspore/_extends/parallel_compile/tbe_compiler/tbe_process.py @@ -19,10 +19,21 @@ import multiprocessing import subprocess import sys import os +import time import json +from mindspore import log from .tbe_common import check_kernel_info, TBEException from .helper import _op_select_format, _check_supported +# tune type +NO_TUNE = "NO_TUNE" +GA_TUNE = "GA" +RL_TUNE = "RL" +# job type +RL_COMPILE = "RL_COMPILE" +RL_OFFLINE = "RL_OFFLINE" +RL_ONLINE = "RL_ONLINE" + def create_tbe_parallel_process(): """ @@ -105,20 +116,32 @@ class TbeProcess: """tbe process""" def __init__(self): - self.__processe_num = multiprocessing.cpu_count() - self.default_num = 24 + self.__process_num = multiprocessing.cpu_count() + self.compile_process_num = 24 self.__pool = None self.__next_task_id = 1 self.__running_tasks = [] + self.__all_tune_tasks = [] + self.__running_tune_tasks = [] + self.__finish_tune_task = [] + self.__failed_tune_task = [] + self.__task_info = {} + self.__tuner = None + self.tune_process_num = 0 + self.tune_mode = None + self.offline_tune = False + self.auto_tune_op_list = None + self.tune_ops_name = os.getenv("TUNE_OPS_NAME") + self.selected_tune_ops = self.tune_ops_name.split(",") if self.tune_ops_name is not None else None def __del__(self): if self.__pool is not None: self.__pool.terminate() self.__pool.join() del self.__pool - - def init_auto_tune_env(self, mode): - return "Success" + if self.__tuner is not None: + self.__tuner.deinit() + del self.__tuner def init_process_num(self): """ @@ -129,28 +152,140 @@ class TbeProcess: process_num = os.getenv("MS_BUILD_PROCESS_NUM") res = "Success" if process_num is None: - res = "Success, using default build process num: " + str(self.default_num) + res = "Success, using default build process num: " + str(self.compile_process_num) elif process_num.isdigit(): if int(process_num) in range(1, 25): - self.default_num = int(process_num) - res = "Success, using custom build process num: " + str(self.default_num) + self.compile_process_num = int(process_num) + res = "Success, using custom build process num: " + str(self.compile_process_num) else: - res = "TBEException",\ + res = "TBEException", \ "ERROR: [MS_BUILD_PROCESS_NUM] should be in range(1, 25), but got : " + str(process_num) elif not process_num.isdigit(): res = "TBEException", "ERROR: [MS_BUILD_PROCESS_NUM] type should be a int num, but got :" + process_num return res + def init_auto_tune_env(self, tune_mode): + """ + Init tbe auto tune env + :param tune_mode: RL, GA or NO_TUNE + :return: Success or failed info + """ + self.tune_mode = tune_mode + if os.getenv("ENABLE_TUNE_DUMP", "").lower() == "true": + self.offline_tune = True + log.info("Tune offline mode is on...") + if self.tune_mode == "NO_TUNE" and not self.offline_tune: + log.info("[NO_TUNE] There is no need to initialize auto_tune related variables.") + return "Success" + + try: + # just for checking the following module if exist, will be used in tuner.py + import auto_tune_main + import schedule_search # pylint: disable=unused-import + self.auto_tune_op_list = auto_tune_main.enable_auto_tune_support() + except ImportError: + res = "TBEException", \ + "No module named `auto_tune` or `schedule_search`. If you want tune your op's performance," \ + "please configure `auto_tune` or `schedule_search` related environment variables." \ + "Try to set the following environment variables:" \ + "export fwk_path=/usr/local/Ascend/fwkacllib" \ + "export PYTHONPATH=${fwk_path}/python/site-packages:$PYTHONPATH" \ + "export PYTHONPATH=${fwk_path}/python/site-packages/auto_tune.egg/auto_tune:$PYTHONPATH" \ + "export PYTHONPATH=${fwk_path}/python/site-packages/schedule_search.egg:$PYTHONPATH" + return res + + from .tuner import TbeTuner + if self.compile_process_num > 2: + self.tune_process_num = self.compile_process_num / 2 + + if self.__tuner is None: + self.__tuner = TbeTuner(self.offline_tune, self.tune_mode) + + return "Success" + def close_pool(self): + """ + close tbe compilation pool + """ self.__pool.terminate() self.__pool.join() del self.__pool + def close_tuner(self): + """ + close tbe tuner + """ + self.__tuner.deinit() + del self.__tuner + def exit(self): + """ + exit tbe process + """ + log.info("start to exit tbe process...") if self.__pool is not None: stop_thread = threading.Thread(target=self.close_pool) stop_thread.daemon = True stop_thread.start() + log.info("tbe process poll exited.") + if self.__tuner is not None: + stop_tuner = threading.Thread(target=self.close_tuner) + stop_tuner.daemon = True + stop_tuner.start() + log.info("tbe process tuner exited.") + + def _if_tune_ops(self, op_json): + """ + Check if user assign ops that need tune + :param op_json: ori json + :return: bool True or False + """ + if self.tune_ops_name is None: + return True + if "fusion_op" in op_json: + full_name = op_json["fusion_op"]["full_name"] + else: + full_name = op_json["op_info"]["full_name"] + return full_name in self.selected_tune_ops + + def select_tune_mode(self, op_json): + """ + Select the corresponding tune mode from op json and env info for the op + :param op_json: ori json + :return: NO_TUNE RL_TUNE or GA_TUNE + """ + json_info = json.loads(op_json) + tune_mode = json_info["SocInfo"]["autoTilingMode"] + kernel_names = self.get_kernel_names(json_info) + if self.offline_tune: + if not self._if_tune_ops(json_info): + return NO_TUNE + return RL_TUNE + if not self._if_tune_ops(json_info): + tune_mode = NO_TUNE + if GA_TUNE in tune_mode: + for kernel_name in kernel_names: + if kernel_name in self.auto_tune_op_list: + return GA_TUNE + if RL_TUNE in tune_mode: + return RL_TUNE + + return NO_TUNE + + def get_kernel_names(self, json_info): + """ + Get kernel names from op json + :param json_info: ori json + :return: kernel names + """ + kernel_names = [] + if "fusion_op" in json_info: + for op in json_info["fusion_op"]["op_list"]: + if "func_name" in op: + kernel_names.append(op["func_name"]) + else: + kernel_names.append(json_info['op_info']['name']) + return kernel_names def start_compile_op(self, op_json): """ @@ -162,14 +297,49 @@ class TbeProcess: Returns: int, task id(>0). -1 if error """ - if self.__processe_num > self.default_num: - self.__processe_num = self.default_num task_id = self.__next_task_id self.__next_task_id = self.__next_task_id + 1 - if self.__pool is None: - self.__pool = multiprocessing.Pool(processes=self.__processe_num) - task_future = self.__pool.apply_async(func=run_compiler, args=(op_json,)) - self.__running_tasks.append((task_id, task_future)) + tune_mode = self.select_tune_mode(op_json) + self.__task_info[task_id] = op_json + if tune_mode == NO_TUNE: + if self.__process_num > self.compile_process_num: + self.__process_num = self.compile_process_num + if self.__pool is None: + self.__pool = multiprocessing.Pool(processes=self.__process_num) + task_future = self.__pool.apply_async(func=run_compiler, args=(op_json,)) + self.__running_tasks.append((task_id, task_future)) + else: + log.info("start_compile_op: op json:\n {}".format(op_json)) + if self.__tuner is None: + log.error("Please confirm that the mode isn't NO_TUNE and auto_tune already initialized.") + return task_id + if not self.__tuner.tune_init: + status = self.__tuner.init_tune_interface(op_json, self.tune_process_num) + if not status: + log.error("Auto tune init failed!") + return task_id + self.__tuner.tune_init = True + self.__all_tune_tasks.append(task_id) + self.__running_tune_tasks.append(task_id) + + if tune_mode == RL_TUNE: + ret, job_type = self.__tuner.rl_tune(task_id, op_json) + if job_type is RL_OFFLINE or job_type is RL_ONLINE: + if not ret: + # offline and online hit will return false + res = task_id, "Success", "Success" + self.__finish_tune_task.append(res) + self.__running_tune_tasks.remove(task_id) + elif job_type is RL_COMPILE: + if not ret: + res = task_id, "Fail", "Fail" + self.__finish_tune_task.append(res) + self.__running_tune_tasks.remove(task_id) + elif tune_mode == GA_TUNE: + self.__tuner.ga_tune(task_id, op_json) + else: + log.error("Unsupported Tune Mode!") + return task_id def wait_one(self): @@ -180,7 +350,7 @@ class TbeProcess: int, id of the finished task. -1 if error,0 if no unfinished task str, result of compile task """ - ret = 0, "Success" + ret = 0, "Failed", "Failed" if self.__running_tasks: task_id, task_future = self.__running_tasks.pop(0) ret_type, result = task_future.get(330) @@ -190,7 +360,46 @@ class TbeProcess: ret = task_id, ret_type + ":" + result, "_" else: ret = task_id, "Exception: Not support return type:" + str(ret_type), "_" - return ret + return ret + if self.__finish_tune_task: + ret = self.__finish_tune_task.pop() + return ret + if self.__running_tune_tasks: + query_count = 0 + total_query_count = len(self.__running_tune_tasks) * 2 * 10 + while query_count < total_query_count: + ret = self.__tuner.get_finish_tasks() + if not ret: + query_count = query_count + 1 + time.sleep(30) + log.info("{} of {} Task is Tuning({} Tasks tune fail),wait more 30 seconds...".format( + len(self.__running_tune_tasks), + len(self.__all_tune_tasks), len(self.__failed_tune_task))) + else: + for item in ret: + task_id = item['task_id'] + status_code = item['status_code'] + res = None + if status_code == 0: + res = task_id, "Success", "Success" + else: + self.__failed_tune_task.append(task_id) + log.error("task_id:{}, json:{}".format(task_id, self.__task_info[task_id])) + res = task_id, "Failed", "Failed" + self.__finish_tune_task.append(res) + self.__running_tune_tasks.remove(task_id) + ret = self.__finish_tune_task.pop() + return ret + log.error("Tune Task Timeout!!!") + log.error("AllTaskNum:{}, RunningTaskNum:{}, FailedTaskNum:{}".format(len(self.__all_tune_tasks), + len(self.__running_tune_tasks), + len(self.__failed_tune_task))) + return 0, "Failed", "Failed" + log.error("All Task Is Done!!!") + log.error("AllTaskNum:{}, RunningTaskNum:{}, FailedTaskNum:{}".format(len(self.__all_tune_tasks), + len(self.__running_tune_tasks), + len(self.__failed_tune_task))) + return -1, "Failed", "Failed" def reset_task_info(self): """ @@ -198,6 +407,14 @@ class TbeProcess: """ if self.__running_tasks: self.__running_tasks.clear() + if self.__all_tune_tasks: + self.__all_tune_tasks.clear() + if self.__running_tune_tasks: + self.__running_tune_tasks.clear() + if self.__finish_tune_task: + self.__finish_tune_task.clear() + if self.__failed_tune_task: + self.__failed_tune_task.clear() tbe_process = TbeProcess() diff --git a/mindspore/_extends/parallel_compile/tbe_compiler/tuner.py b/mindspore/_extends/parallel_compile/tbe_compiler/tuner.py new file mode 100644 index 0000000000..bfe09915a4 --- /dev/null +++ b/mindspore/_extends/parallel_compile/tbe_compiler/tuner.py @@ -0,0 +1,372 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""tuner process""" +import os +import datetime +import json +import sys +import traceback +from te.platform.cce_conf import te_set_version +from te.platform.fusion_manager import set_current_op_name +from te.platform.fusion_util import fusion_op, dump_fusion_json +from te.platform.parallel_compilation import init_multi_process_env, get_finished_compilation_task, \ + deinit_multi_process_env, dispatch_autotune_task, start_ga_multi_process +import auto_tune +from schedule_search.rl_online_tune import rl_tune_init, dispatch_fusion_tune_task, dispatch_single_tune_task, \ + rl_tune_deinit +from mindspore import log +from .tbe_common import get_args +from .re_construct_json import single_to_fusion, fusion_to_fusion + +TE_LOG_LEVEL = ["DEBUG", "INFO", "WARNING", "ERROR"] +RL_COMPILE = "RL_COMPILE" +RL_OFFLINE = "RL_OFFLINE" +RL_ONLINE = "RL_ONLINE" + +PLATFORM_FLAG = ["ascend310", "ascend910", "Hi3796CV300ES", "ascend710", "ascend610", "Hi3796CV300CS", "SD3403"] + + +class TbeTuner: + """tbe tuner for ga tune or rl tune""" + def __init__(self, offline_tune, tune_mode): + self.offline_tune = offline_tune + self.tune_init = False + self.rl_init = False + self.offline_dump_path = "./tune_dump" + if os.environ.get("TUNE_DUMP_PATH") is not None: + self.offline_dump_path = os.getenv("TUNE_DUMP_PATH", "") + self._creating_custom_path(tune_mode) + + def init_tune_interface(self, json_str, process_num): + """ + Initialize tuner interface + :param json_str: ori json + :param process_num : process num for tuner + :return: bool True or False + """ + json_info = json.loads(json_str) + soc_info = self.get_soc_info(json_info) + cur_cce_product_params = te_set_version(*soc_info) + if cur_cce_product_params is None: + log.warning("Set Soc Info failed.") + tune_mode = self.get_tune_mode(json_info) + ret = self.parallel_compilation_init(soc_info, tune_mode, process_num) + if not ret: + log.error("Init parallel compilation env failed") + return False + + return True + + def deinit(self): + """ + DeInitialize tuner interface + """ + deinit_multi_process_env() + if self.rl_init: + rl_tune_deinit() + + def get_tune_mode(self, json_info): + """ + Get the corresponding tune mode from op json and env info + :param json_info: ori json + :return: NO_TUNE RL_TUNE GA_TUNE or RL,GA + """ + tune_mode = json_info["SocInfo"]["autoTilingMode"] + if self.offline_tune: + tune_mode = "RL" + return tune_mode + + def __directory_creation(self, path, concat_path): + """ + Create directory + """ + path = os.path.join(path, concat_path) + if not os.path.isdir(path): + os.makedirs(path, 0o750) + return path + + def __creating_default_custom_path(self, tune_mode, base_custom_path): + """ + Create default custom path + """ + base_custom_path = self.__directory_creation(base_custom_path, "data") + tune_flag = [] + if "RL" in tune_mode: + tune_flag.append("rl") + if "GA" in tune_mode: + tune_flag.append("tiling") + + for tune_path in tune_flag: + real_path = self.__directory_creation(base_custom_path, tune_path) + for soc_version in PLATFORM_FLAG: + final_path = self.__directory_creation(real_path, soc_version) + final_path = self.__directory_creation(final_path, "custom") + + def _creating_custom_path(self, tune_mode): + """ + Create custom path + """ + if "NO_TUNE" in tune_mode: + return + + base_custom_path = os.getenv("TUNE_BANK_PATH", None) + tune_bank_flag = True + if not base_custom_path: + base_custom_path = os.path.dirname(os.path.realpath(auto_tune.__file__)) + base_custom_path = os.path.realpath(os.path.join(base_custom_path, "../../../")) + tune_bank_flag = False + + if not os.path.isdir(base_custom_path): + log.error("Check whether the tuning path [{}] exists.".format(base_custom_path)) + return + if not os.access(base_custom_path, os.R_OK | os.W_OK | os.X_OK): + log.error("Check whether the permission on the tuning path [{}] is correct.".format(base_custom_path)) + return + + if not tune_bank_flag: + self.__creating_default_custom_path(tune_mode, base_custom_path) + + def get_soc_info(self, json_info): + """ + Get soc info + :param json_info: ori json + :return: soc info + """ + soc_param = {} + soc_param["op_impl_mode"] = json_info["SocInfo"]["op_impl_mode"] + soc_param["op_debug_level"] = json_info["SocInfo"]["op_debug_level"] + soc_param["op_impl_mode_list"] = json_info["SocInfo"]["op_impl_mode_list"] + soc_param["op_debug_dir"] = '' + soc_param["vector_fp_ceiling"] = '' + soc_param['mdl_bank_path'] = '' + soc_param['op_bank_path'] = '' + + soc_info = [] + soc_info.append(json_info["SocInfo"]["socVersion"]) + soc_info.append(json_info["SocInfo"]["coreType"]) + soc_info.append(json_info["SocInfo"]["coreNum"]) + soc_info.append(json_info["SocInfo"]["l1Fusion"]) + soc_info.append(json_info["SocInfo"]["l2Mode"]) + soc_info.append(json_info["SocInfo"]["l2Fusion"]) + soc_info.append(soc_param) + + return soc_info + + def parallel_compilation_init(self, soc_info, tune_mode, process_num): + """ + Initialize parallel compilation framework for tuner + :param soc_info: soc info + :param tune_mode: tuner mode + :param process_num : process num for tuner + :return: bool True or False + """ + env_count = process_num + if "TE_PARALLEL_COMPILER" in os.environ: + env_count = os.getenv("TE_PARALLEL_COMPILER") + log.info("TE_PARALLEL_COMPILER is set to {}".format(env_count)) + if int(env_count) > process_num: + env_count = process_num + log.info("change process count to {}".format(process_num)) + os.environ["TE_PARALLEL_COMPILER"] = str(int(env_count)) + pid_str = os.getpid() + time_str = datetime.datetime.now().strftime('%Y%m%d_%H%M%S%f')[:-3] + pid_ts = "{}_pid{}".format(time_str, pid_str) + + embedding = False + enable_event = False + te_log_level = os.environ.get("TE_LOGLEVEL") + glog_level = os.environ.get("GLOG_v") + if glog_level is not None and te_log_level is None: + os.environ["TE_LOGLEVEL"] = TE_LOG_LEVEL[int(glog_level)] + global_loglevel = int(glog_level) + elif glog_level is None and te_log_level is None: + os.environ["TE_LOGLEVEL"] = TE_LOG_LEVEL[2] + global_loglevel = 3 + else: + if te_log_level > len(TE_LOG_LEVEL): + log.error("Invalid environment TE_LOGLEVEL:{}".format(te_log_level)) + te_log_level = 2 + os.environ["TE_LOGLEVEL"] = TE_LOG_LEVEL[int(te_log_level)] + global_loglevel = int(te_log_level) + + ret = init_multi_process_env(embedding, soc_info, tune_mode, global_loglevel, enable_event, pid_ts) + if ret is None: + log.error("Init multiprocess env failed") + return False + process_count = ret[0] + log.info("Init multiprocess env success with {} process".format(process_count)) + if "RL" in tune_mode: + res_queue = ret[1] + live_checker = ret[2] + termin_event = ret[3] + ret = rl_tune_init(soc_info, res_queue, live_checker, termin_event, global_loglevel, pid_ts) + if not ret: + log.error("RL env init failed!") + return False + self.rl_init = True + log.info("RL Tune init success.") + if "GA" in tune_mode: + start_ga_multi_process(tune_mode) + log.info("GA Tune init success.") + return True + + def rl_tune(self, task_id, op_json): + """ + RL tune for single op and fusion op + :param task_id: task id for this op to tune + :param op_json: op's info + :return: tune result + """ + json_info = json.loads(op_json) + if "fusion_op" in json_info: + ret = self.fusion_rl_tune(task_id, json_info) + else: + ret = self.single_rl_tune(task_id, json_info) + return ret + + def ga_tune(self, task_id, op_json): + """ + GA tune for single op and fusion op + :param task_id: task id for this op to tune + :param op_json: op's info + """ + json_info = json.loads(op_json) + if "fusion_op" in json_info: + self.fusion_ga_tune(task_id, json_info) + else: + self.single_ga_tune(task_id, json_info) + + def single_rl_tune(self, task_id, json_info): + """ + RL tune for single op + :param task_id: task id for this op to tune + :param json_info: op's info + :return: tune result + """ + if self.offline_tune: + converted_json = single_to_fusion(json.dumps(json_info), tune_mode="RL") + op_type = json_info['op_info']['name'] + kernel_name = json_info['op_info']['kernel_name'] + op_module = __import__("impl." + op_type, globals(), locals(), [op_type], 0) + op_module_name = "impl." + op_type + py_fn_name = json_info['op_info']['name'] + op_func = getattr(op_module, py_fn_name, None) + + set_current_op_name(kernel_name) + inputs_args = get_args(json_info['op_info'], 'inputs') + outputs_args = get_args(json_info['op_info'], 'outputs') + attrs_args = get_args(json_info['op_info'], 'attrs') + op_args = inputs_args, outputs_args, attrs_args + # todo build with build_single_op_from_c + base_kernel = './kernel_meta/' + kernel_name + '.o' + job_type = RL_COMPILE + try: + op_func(*inputs_args, *outputs_args, *attrs_args, kernel_name=kernel_name) + # pylint: disable=broad-except + except Exception: + exc_type, exc_value, _ = sys.exc_info() + log.error( + "exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc())) + return False, job_type + if self.offline_tune: + job_type = RL_OFFLINE + dump_fusion_json(converted_json, self.offline_dump_path) + else: + job_type = RL_ONLINE + graph_id = 0 + l1size = 0 # todo need to verify + ret = dispatch_single_tune_task(graph_id, task_id, l1size, base_kernel, kernel_name, op_module_name, + op_module_name + "@" + op_module_name, op_type, op_type, op_args) + return ret, job_type + + def get_op_module_names(self, json_info): + """ + Get op module names from op info json + :param json_info: op's info + :return: op module names + """ + op_module_name = "" + for op in json_info["fusion_op"]["op_list"]: + if "module_name" in op: + op_module_name = op_module_name + op["module_name"] + "," + return op_module_name[:-1] + + def fusion_rl_tune(self, task_id, json_info): + """ + RL tune for fusion op + :param task_id: task id for this op to tune + :param json_info: op's info + :return: tune result + """ + if 'fusion_op' not in json_info or not json_info['fusion_op']: + raise ValueError("Json string Errors, key:fusion_op not found.") + kernel_name = json_info["fusion_op"]["fusion_op_name"] + set_current_op_name(kernel_name) + converted_json = fusion_to_fusion(json.dumps(json_info), tune_mode="RL") + job_type = RL_COMPILE + base_kernel = './kernel_meta/' + kernel_name + '.o' + try: + fusion_op(converted_json) + # pylint: disable=broad-except + except Exception: + exc_type, exc_value, _ = sys.exc_info() + log.error( + "exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc())) + return False, job_type + if self.offline_tune: + job_type = RL_OFFLINE + dump_fusion_json(converted_json, self.offline_dump_path) + else: + job_type = RL_ONLINE + graph_id = 0 + l1size = 0 + op_model_name = self.get_op_module_names(json_info) + ret = dispatch_fusion_tune_task(graph_id, task_id, l1size, base_kernel, kernel_name, op_model_name, + converted_json) + return ret, job_type + + def fusion_ga_tune(self, task_id, json_info): + """ + GA tune for fusion op + :param task_id: task id for this op to tune + :param json_info: op's info + """ + if 'fusion_op' not in json_info or not json_info['fusion_op']: + raise ValueError("Json string Errors, key:fusion_op not found.") + kernel_name = json_info["fusion_op"]["fusion_op_name"] + converted_json = fusion_to_fusion(json.dumps(json_info), tune_mode="GA") + graph_id = 0 + l1size = 0 + dispatch_autotune_task(graph_id, task_id, l1size, converted_json, [], kernel_name) + + def single_ga_tune(self, task_id, json_info): + """ + GA tune for single op + :param task_id: task id for this op to tune + :param json_info: op's info + """ + converted_json = single_to_fusion(json.dumps(json_info), tune_mode="GA") + graph_id = 0 + l1size = 0 + kernel_name = json_info["fusion_op_name"] + dispatch_autotune_task(graph_id, task_id, l1size, converted_json, [], kernel_name) + + def get_finish_tasks(self): + """ + Get finish task from parallel compilation framework + :return task info list + """ + ret = get_finished_compilation_task(0) + return ret diff --git a/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc b/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc index 084fd8b8d9..bf653041d6 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.cc @@ -80,8 +80,8 @@ std::map KernelFusion(const std::vector size_t hash_id = GenFusionJsonHash(fusion_op); auto json_name = fusion_kernel_name.append("_").append(std::to_string(hash_id)).append("_").append(std::to_string(device_id)); - fusion_op["graph_id"] = fusion_scope_iter.graph_id; fusion_op["fusion_op_name"] = json_name; + fusion_op["full_name"] = fusion_scope_iter.full_name; // get io size std::vector input_size_list; std::vector output_size_list; diff --git a/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.h b/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.h index badd8f50b8..f0f86d71c0 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.h +++ b/mindspore/ccsrc/backend/kernel_compiler/kernel_fusion.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "backend/kernel_compiler/kernel.h" namespace mindspore { namespace kernel { @@ -26,15 +27,15 @@ namespace kernel { * @brief fuse op and return a callable mod */ struct FusionScopeInfo { - FusionScopeInfo(int64_t id, uint32_t g_id, std::vector in, std::vector comp, + FusionScopeInfo(int64_t id, std::string f_name, std::vector in, std::vector comp, std::vector out) : scope_id(id), - graph_id(g_id), + full_name(f_name), input_nodes(std::move(in)), compute_nodes(std::move(comp)), output_nodes(std::move(out)) {} int64_t scope_id{}; - uint32_t graph_id{}; + std::string full_name{}; std::vector input_nodes; std::vector compute_nodes; std::vector output_nodes; diff --git a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc index f1227336b0..ed51dffcc4 100644 --- a/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc +++ b/mindspore/ccsrc/backend/kernel_compiler/tbe/tbe_kernel_build.cc @@ -123,7 +123,6 @@ bool TbeKernelJsonCreator::GenTbeSingleKernelJson(const std::shared_ptrcast()); auto func_name = op_info_ptr->kernel_name(); - op_info_json["graph_id"] = AnfAlgo::GetGraphId(anf_node.get()); op_info_json[kJName] = func_name; op_info_json[kJModuleName] = std::string("impl.") + func_name; op_info_json[kJPyModulePath] = kPyPath; @@ -163,7 +162,6 @@ bool TbeKernelJsonCreator::GenTbeSingleKernelJson(const std::shared_ptrfullname_with_scope(); // create attr_desc nlohmann::json attr_desc; diff --git a/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/fusion_base_pass.h b/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/fusion_base_pass.h index 9013d006db..76cd8c980b 100644 --- a/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/fusion_base_pass.h +++ b/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/fusion_base_pass.h @@ -43,7 +43,7 @@ const int8_t MULTI_ELTWISE_SIZE = 4; using FusedNodeRecord = std::vector>; struct BufferFusionInfo_t { - uint32_t graph_id; + std::string full_name; std::vector anf_nodes; std::vector inputs_list; std::vector outputs_list; diff --git a/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/ub_pattern_fusion.cc b/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/ub_pattern_fusion.cc index ec6353221e..a2420eb92f 100644 --- a/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/ub_pattern_fusion.cc +++ b/mindspore/ccsrc/backend/optimizer/ascend/buffer_fusion/ub_pattern_fusion.cc @@ -387,7 +387,6 @@ void RemoveCircle(const session::KernelGraph &kernel_graph, void UbPatternFusion::GetBufferFusionInfo(session::KernelGraph *kernel_graph, std::unordered_map *buffer_fusion_infos) const { MS_EXCEPTION_IF_NULL(buffer_fusion_infos); - auto graph_id = kernel_graph->graph_id(); GetFusionScopeComputeNodeList(kernel_graph, buffer_fusion_infos); GetFusionScopeInputNodeList(*kernel_graph, buffer_fusion_infos); GetFusionScopeOutputNodeList(kernel_graph, buffer_fusion_infos); @@ -397,7 +396,11 @@ void UbPatternFusion::GetBufferFusionInfo(session::KernelGraph *kernel_graph, for (auto &buffer_fusion_info : *buffer_fusion_infos) { buffer_fusion_info.second.kernel_build_info = CreateFusionOpKernelInfo(buffer_fusion_info.second.inputs_list, buffer_fusion_info.second.outputs_list); - buffer_fusion_info.second.graph_id = graph_id; + // just for full_name_with_scope for every buffer_fusion_info. + auto fusion_node = CreateFusionOp(buffer_fusion_info.second.inputs_list, buffer_fusion_info.second.outputs_list, + buffer_fusion_info.second.anf_nodes, kernel_graph); + MS_EXCEPTION_IF_NULL(fusion_node); + buffer_fusion_info.second.full_name = fusion_node->fullname_with_scope(); } } @@ -412,7 +415,7 @@ bool UbPatternFusion::FuseBufferFusionPattern(session::KernelGraph *kernel_graph buffer_fusion_infos.begin(), buffer_fusion_infos.end(), std::back_inserter(fusion_scope_infos), [](const std::pair &buffer_fusion_info) -> mindspore::kernel::FusionScopeInfo { return mindspore::kernel::FusionScopeInfo( - buffer_fusion_info.first, buffer_fusion_info.second.graph_id, buffer_fusion_info.second.inputs_list, + buffer_fusion_info.first, buffer_fusion_info.second.full_name, buffer_fusion_info.second.inputs_list, buffer_fusion_info.second.anf_nodes, buffer_fusion_info.second.outputs_list); }); auto kernel_mods = mindspore::kernel::KernelFusion(fusion_scope_infos); @@ -447,6 +450,7 @@ bool UbPatternFusion::ReplaceFusionOp(std::unordered_map(buffer_fusion_info.anf_nodes[0]->debug_info())); auto buffer_fusion = CreateFusionOp(buffer_fusion_info.inputs_list, buffer_fusion_info.outputs_list, buffer_fusion_info.anf_nodes, kernel_graph); + buffer_fusion->set_fullname_with_scope(buffer_fusion_info.full_name); AnfAlgo::SetSelectKernelBuildInfo(buffer_fusion_info.kernel_build_info, buffer_fusion.get()); // Set abstract of fusion_op node std::vector types;