|
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126 |
- #!/usr/bin/env python3
- # coding: utf-8
- # Copyright 2019-2021 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- """util"""
- import sys
- import gc
- import inspect
- import datetime
- import os
- import uuid
- import logging
- import time
- import random
- import subprocess
- import re
- import logging
- import tvm
- from timeit import default_timer as timer
- from threading import Thread
- from functools import reduce
- import numpy as np
-
- import akg
- from akg.build_module import help_tiling_level
- from akg import backend as cce
- import akg.tvm
- from akg.tvm import autotvm
- from akg.tvm import rpc
- from akg.utils import result_analysis as ra_util
- from akg.utils import format_transform as ft_util
- from akg.utils import custom_tiling as ct_util
- from akg.utils import validation_check as vc_util
- from akg.utils.dsl_create import TensorUtils
- from akg.backend.parsing_profiling_data import HWTSLogParser
- from akg.backend.parsing_profiling_data import validate_and_normalize_path
-
-
- sh = logging.StreamHandler(sys.stdout)
- logging.getLogger().addHandler(sh)
- logging.getLogger().setLevel(logging.INFO)
-
- rpc_machine = {}
- rpc_lb = {}
-
- kc_air_mode = "CCE"
-
- PERFORMANCE_TEST_FILE = "PERFORMANCE_TEST_FILE"
- BINDS = "binds"
- CUDA = "cuda"
- CCE = "cce"
- RANDOM_SEED_NUM = 20
- PROF_ERROR_CODE = 9999999999
-
- WGT_WIDTH = 16
- INP_WIDTH = 16
- OUT_WIDTH = 16
- BLOCK_IN = 16
- BLOCK_OUT = 16
- BLOCK_REDUCE = 16
- INP_ELEM_BYTES = (BLOCK_IN * BLOCK_REDUCE * INP_WIDTH // 8)
- WGT_ELEM_BYTES = (BLOCK_OUT * BLOCK_REDUCE * WGT_WIDTH // 8)
- OUT_ELEM_BYTES = (BLOCK_IN * BLOCK_OUT * OUT_WIDTH // 8)
- GLB_ELEM_BYTES = (16 * OUT_WIDTH // 8)
-
-
- def debug_mode(debug_flag):
- """
- Pass to enable tpu debug mode.
-
- Args:
- debug_flag (int): The dbeug flag to be passed.
-
- Returns:
- list of function, the pass to set to build_config(add_lower_pass=tpu.debug_mode(mode)).
- """
- # the number in pass_list such as 0,1,2,3 represents the order of the pass called
- pass_list = []
- if debug_flag == 1:
- pass_list.append((0, ir_pass.inject_dma_intrin))
- return pass_list
-
- def func_time_required(func_name):
- """Checking the Time Required for Function Running."""
- def wrapper(*args, **kwargs):
- t0 = time.time()
- result = func_name(*args, **kwargs)
- t1 = time.time()
- logging.info("func_time_required func:%s, running:%lf seconds", func_name.__name__, (t1 - t0))
- return result
- return wrapper
-
-
- def create_code(kernel_name, code_path=None, code=None, code_type=CCE):
- """
- Create cce or cuda file.
-
- Args:
- kernel_name: file name.
- code_path: file path.
- code: code.
- code_type: code type.
- """
- if code_type == CCE:
- postfix = ".cce"
- elif code_type == CUDA:
- postfix = ".cu"
- else:
- logging.info("the target code type %s is not supported.", code_type)
-
- if not code_path:
- code_path = "./"
-
- if code_type == CCE and len(code_path) > 4 and code_path[-4:].lower() == postfix:
- real_path = code_path
- elif code_type == CUDA and len(code_path) > 3 and code_path[-3:].lower() == postfix:
- real_path = code_path
- else:
- if code_path[-1] == r"/":
- real_path = code_path + kernel_name + postfix
- else:
- real_path = code_path + r"/" + kernel_name + postfix
- dir_path = r"/".join(real_path.split(r"/")[:-1])
- if not os.path.isdir(dir_path):
- os.makedirs(dir_path)
-
- with open(real_path, 'wt') as ss:
- ss.write(code)
-
-
-
- def gen_name_kernel(kernel, dtype, shapes):
- """generate kernel name."""
- def _flat_array(srclist, dstlist):
- for i in srclist:
- if isinstance(i, (list, tuple)):
- _flat_array(i, dstlist)
- else:
- dstlist.append(i)
- res = ''
- flat = []
- _flat_array(shapes, flat)
- for s in flat:
- res = "%s%s'_'" % (res, s)
- res = "%s_%s%s" % (kernel, res, dtype)
- return res
-
-
- def load_rpc_server_info(mode):
- """
- load rpc server host and port info.
-
- Args:
- mode (str): string of runtime choose, can set ca aic and rpc.
- """
- env_dic = os.environ
- if env_dic.get('RPC_HOST') and env_dic.get('RPC_PORT'):
- return None
-
- if mode == 'rpc_cloud':
- logging.error("runtime_mode=rpc_cloud must set 1980 host ip and port!")
- raise Exception("ERROR:runtime_mode=rpc_cloud must set 1980 host ip and port!")
-
- rpc_server_info_config = env_dic.get('RPC_SERVER_INFO_FILE')
- if not rpc_server_info_config:
- logging.error("runtime_mode=rpc must set RPC_SERVER_INFO_FILE for rpc server info config")
- raise Exception("ERROR:runtime_mode=rpc must set RPC_SERVER_INFO_FILE for rpc server info config")
-
- # load rpc server host and port info from local file.
- import json
- with open(rpc_server_info_config, 'r') as f:
- info = json.load(f)
-
- for i in info:
- rpc_machine[i] = info[i]
- rpc_lb[i] = 0.0
- return None
-
-
- def dispatch(rank=0):
- """Function for lock waiting dispatch handle version 1."""
- def _sort_by_value(d):
- items = list(d.items())
- random.shuffle(items)
- items.sort(key=lambda x: x[1])
- return [item[0] for item in items]
- for k, v in rpc_lb.items():
- logging.info("######rpc_lb[%s]=%f", rpc_machine[k][0], v)
- lb_list = _sort_by_value(rpc_lb)
- if len(lb_list) > rank:
- return lb_list[rank]
- return lb_list[len(lb_list) - 1]
-
-
- def commit(remote, weight):
- rpc_lb[remote] = weight
-
-
- @func_time_required
- def mod_launch_rpc_worker(mod, args, outputs, host, port, tuning=False):
- """internal RPC worker, should be called by mod_launch_rpc_thread."""
- logging.info("%s:====start connect to rpc ip: %s, rpc port: %d ",
- datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), host, port)
- remote = rpc.connect(host, port, session_timeout=300)
- logging.info("%s:====connect to rpc ip: %s, rpc port: %d finished ",
- datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), host, port)
- uuid_str = uuid.uuid4().hex
- temp_file_name = "stackvm_%s.o" % uuid_str
- mod.save(temp_file_name)
- remote.upload(temp_file_name)
- remote_mod = remote.load_module(temp_file_name)
- ctx = remote.cce()
- arg_list = []
- for a in args:
- arg_list.append(akg.tvm.nd.array(a, ctx))
- start_time = timer()
- remote_mod(*arg_list)
- ctx.sync()
- if os.path.exists(temp_file_name):
- os.remove(temp_file_name)
- out_list = []
- for i in outputs:
- out = arg_list[len(arg_list) + i if i < 0 else i].asnumpy()
- out_list.append(out)
- # this time measure is no accurate now, to be improved soon
- t = timer() - start_time
- if not tuning:
- return out_list[0] if len(out_list) == 1 else tuple(out_list)
- stat_info = {"run_time": t}
- return out_list[0] if len(out_list) == 1 else tuple(out_list), stat_info
-
-
- def mod_launch_rpc_thread(mode, mod, args, outputs, results, need_retry, retry, tuning=False):
- """internal RPC thread, should be called by mod_launch_rpc_multithread."""
- remoteevb = '0'
- host = None
- port = None
- env_dic = os.environ
- if env_dic.get('RPC_HOST') and env_dic.get('RPC_PORT'):
- host = env_dic.get('RPC_HOST')
- port = int(env_dic.get('RPC_PORT'))
- else:
- if mode == 'rpc_cloud':
- logging.error("runtime_mode=rpc_cloud must set 1980 host ip and port!")
- raise Exception("ERROR:runtime_mode=rpc_cloud must set 1980 host ip and port!")
- remoteevb = dispatch(retry)
- host = rpc_machine[remoteevb][0]
- port = rpc_machine[remoteevb][1]
-
- start_time = timer()
- end_time = 0.0
- logging.debug("rpc ip: %s, rpc port: %d", host, port)
- try:
- out_list = mod_launch_rpc_worker(mod, args, outputs, host, port, tuning=tuning)
- end_time = timer()
- t = end_time - start_time
- if not env_dic.get('RPC_HOST'):
- commit(remoteevb, 20 if t > 20 else t)
- logging.info("===this round host is %s time is %f", host, (end_time - start_time))
- results[retry] = out_list
- except RuntimeError:
- need_retry[retry] = True
- end_time = timer()
- logging.error("===Failed! this round host is %s time is %f", host, (end_time - start_time))
- if not env_dic.get('RPC_HOST'):
- commit(remoteevb, end_time - start_time + 20 * (retry + 1))
- logging.error("rpc retry error: %d %s", retry, sys.exc_info())
-
-
- def mod_launch_rpc(mode, mod, args, outputs, tuning=False):
- """
- launch rpc or rpc_cloud module with retry.
-
- Note:
- To minimize waiting time of struggler RPC servers, we wait for a short timeout and spawn
- a new thread after the timeout.
- In normal case, RPC would complete before the short timeout, so, only one thread will be created.
- When the RPC server is slow, we create multiple threads that run concurrently.
- We wait for the first thread that successfully completes its work and return the result.
- If a thread fails (an exception is raised), we spawn a new thread to retry.
- Newly spawned threads will use different RPC servers.
- We bound the maximum number of threads, i.e. maximum number of retries.
- """
- max_num_threads = 5
-
- import operator
- arg_filter = filter(lambda x: isinstance(x, np.ndarray), args)
- arg_tensor = list(arg_filter)
- tensor_size = reduce(operator.add, [reduce(operator.mul, arg.shape) for arg in arg_tensor])
- expected_upload_speed = 5e6
- expected_upload_time = int(tensor_size / expected_upload_speed)
-
- timeout_before_spawning_new_thread = 200 + expected_upload_time
- poll_interval = 1
- thread_timeout = 400 + expected_upload_time * 3
-
- load_rpc_server_info(mode)
-
- threads = [None] * max_num_threads
- results = [None] * max_num_threads
- need_retry = [None] * max_num_threads
- retried = [False] * max_num_threads
- for thread_index in range(max_num_threads):
- if thread_index > 0:
- logging.error("Thread %d run for %d seconds, spawn a new thread to retry",
- (thread_index - 1), timeout_before_spawning_new_thread)
- threads[thread_index] = Thread(target=mod_launch_rpc_thread,
- args=(mode, mod, args, outputs, results, need_retry, thread_index, tuning))
- # daemonize the thread to prevent long running threads from hanging the whole process
- threads[thread_index].daemon = True
- threads[thread_index].start()
- poll_count = timeout_before_spawning_new_thread // poll_interval
- while poll_count > 0:
- poll_count -= 1
- # wait for the newly created thread, because it is most likely to complete first
- threads[thread_index].join(poll_interval)
- for poll_index in range(thread_index + 1):
- if not threads[poll_index].is_alive() and not need_retry[poll_index]:
- return results[poll_index]
- if need_retry[poll_index] and not retried[poll_index]:
- logging.error("Thread %d exit with error, spawn a new thread immediately", poll_index)
- poll_count = 0
- retried[poll_index] = True
-
- logging.error("All %d threads are created, poll the threads until the first one exits normally, \
- or all threads exit abnormally or timeout", max_num_threads)
- poll_count = thread_timeout // poll_interval
- for _ in range(poll_count):
- threads[max_num_threads - 1].join(poll_interval)
- exit_thread_count = 0
- for poll_index in range(max_num_threads):
- if not threads[poll_index].is_alive() and not need_retry[poll_index]:
- return results[poll_index]
- if not threads[poll_index].is_alive():
- exit_thread_count += 1
- if exit_thread_count == max_num_threads:
- logging.error("All %d threads exit abnormally", max_num_threads)
- return None
-
- logging.error("All %d threads timeout", max_num_threads)
- return None
-
-
- def profiling_mode_run(mod, args, outputs, tuning, device_id):
- """
- Function for collecting cycle data from device.
-
- Args:
- mod: CCE Module.
- args: list or tuple of numpy array.
- outputs: list or tuple of output argment index.
- tuning: tuning model.
- device_id: device_id on device.
- """
- ctx = akg.tvm.ndarray.cce(device_id)
-
- tvm.get_global_func("ascend_start_profiling")(device_id)
-
- arg_list = []
- for a in args:
- arg_list.append(akg.tvm.nd.array(a, ctx))
-
- time_before_launch = time.time()
- mod(*arg_list)
- ctx.sync()
-
- tvm.get_global_func("ascend_stop_profiling")()
-
- out_list = []
- cycle = profiling_analyse(device_id, time_before_launch)
- for i in outputs:
- out = arg_list[len(arg_list) + i if i < 0 else i].asnumpy()
- out_list.append(out)
- logging.info('=====parsing cycles==============================')
- if cycle != PROF_ERROR_CODE:
- logging.info(cycle)
- else:
- logging.error("OOPS, can't correctly parsing cycles!")
- TestUtils.record_cycle(cycle)
- logging.info('=====parsing cycles==============================')
- if tuning:
- return out_list[0] if len(out_list) == 1 else tuple(out_list), {'run_time': cycle}
- return out_list[0] if len(out_list) == 1 else tuple(out_list)
-
-
- def profiling_analyse(device_id, time_before_launch):
- """analyse profiling."""
-
- def exec_cmds_with_pipe(cmd_list):
- cmd_num = len(cmd_list)
- if cmd_num <= 1:
- raise RuntimeError("length of cmd_list should be greater than 1.")
- ps = []
- for i, cmd in enumerate(cmd_list):
- if i == 0:
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
- else:
- p = subprocess.Popen(cmd, stdin=ps[-1].stdout, stdout=subprocess.PIPE)
- ps.append(p)
- for p in ps:
- p.wait()
- return ps[-1].communicate()
-
- if not isinstance(device_id, int):
- raise TypeError("device_id must be an integer.")
-
- try:
- public_path = os.getenv('PROFILING_DIR')
- if public_path is None:
- raise RuntimeError("Environment PROFILING_DIR not set!")
- return None
-
- public_path = validate_and_normalize_path(public_path)
- cmd_list = [
- ["find", public_path, "-iname", "*.log.%d" % device_id, "-printf", "'%T+\t%p\n'"],
- ["grep", "JOB"],
- ["sort", "-r"],
- ["head", "-n10"],
- ["awk", "{print $2}"],
- ["head", "-n1"],
- ]
- p = exec_cmds_with_pipe(cmd_list)
- for _ in range(5):
- if p[0].decode('utf8').strip() == '':
- time.sleep(1)
- try:
- job_file = p[0].decode('utf8').strip().split('/')[-2]
- except BaseException:
- logging.warning("failed to decode profiling result")
- return None
- logging.debug("job file is: %s", job_file)
-
- file_abs_path = public_path + "/" + job_file
- file_create_time = os.path.getctime(file_abs_path)
-
- if file_create_time < time_before_launch:
- raise RuntimeError("The JOB file is too old")
- return None
-
- hwtslog_parser = HWTSLogParser(file_abs_path)
- return hwtslog_parser.execute()
- except SyntaxError as e:
- logging.error(e)
- return PROF_ERROR_CODE
-
- def mod_launch_air(mod, args, outputs):
- """launch mod on kc_air."""
- if kc_air_mode == "CUDA":
- ctx = akg.tvm.ndarray.gpu(0)
- else:
- ctx = akg.tvm.ndarray.cce(0)
- arg_list = []
- for a in args:
- if isinstance(a, np.ndarray):
- arg_list.append(akg.tvm.nd.array(a, ctx))
- elif isinstance(a, (list, tuple)):
- for aa in a:
- if isinstance(aa, np.ndarray):
- arg_list.append(akg.tvm.nd.array(aa, ctx))
- else:
- arg_list.append(aa)
- else:
- arg_list.append(a)
- for retry in range(3):
- need_retry = False
- try:
- mod(*arg_list)
- ctx.sync()
- out_list = []
- if not need_retry:
- for i in outputs:
- out = arg_list[len(arg_list) + i if i < 0 else i].asnumpy()
- out_list.append(out)
- return out_list[0] if len(out_list) == 1 else tuple(out_list)
- except RuntimeError:
- need_retry = True
- logging.error("kc_air retry error: %d %s", retry, sys.exc_info())
- logging.error("kc_air runtime error, please check!")
- return None
-
- @func_time_required
- def mod_launch(mod, args, outputs=(-1,), tuning=False, device_id=0, expect=None, repeat_time=400):
- """
- unified run CCE kernel api.
-
- Args:
- mod (str): CCE Module, string of runtime choose, can set ca aic and rpc.
- args (Union[list, tuple]): list or tuple of numpy array.
- outputs (Union[list, tuple]): list or tuple of output argment index.
- tuning (bool): tuning model.
- device_id: device_id on device.
- expect: when mode in ["compile_cloud", "compile_mini"], return it.
-
- Returns:
- output numpy array, or tuple of numpy array if multi-output.
- """
-
- gc.collect()
- if mod.imported_modules[0].type_key == CUDA:
- ctx = akg.tvm.context(CUDA, device_id)
- mod_args = [akg.tvm.nd.array(a, ctx) for a in args]
- mod(*mod_args)
- out_list = [mod_args[len(args) + i if i < 0 else i].asnumpy() for i in outputs]
- if not tuning:
- return out_list[0] if len(out_list) == 1 else tuple(out_list)
- else:
- cycles = get_gpu_cycles(mod, *mod_args, device_id=device_id, save_log=True, repeat_time=repeat_time)
- return out_list[0] if len(out_list) == 1 else tuple(out_list), {'run_time': cycles}
-
- stat_info = {}
- profiling_mode = get_profiling_mode()
- if profiling_mode:
- return profiling_mode_run(mod, args, outputs, tuning, device_id)
- mode = get_runtime_mode()
- if mode == 'aic':
- output = aic_model.launch(mod, args, outputs)
- if not tuning:
- return output
- ra_util.get_ticks(stat_info)
- return output, stat_info
- if mode == 'aic_cloud':
- output = aic_model.launch(mod, args, outputs, spec=aic_model.Spec.CLOUD)
- if not tuning:
- return output
- ra_util.get_ticks(stat_info)
- return output, stat_info
- if mode in ('rpc', 'rpc_cloud'):
- return mod_launch_rpc(mode, mod, args, outputs, tuning)
- if mode in ('ca', 'air', 'air_cloud'):
- return mod_launch_air(mod, args, outputs)
- if mode in ("compile_cloud", "compile_mini"):
- return expect
- if mode in ("csim", "ccesim", "cdiff"):
- from akg.backend.csim import csim_launch
- return csim_launch(args, outputs)
- if mode == "cpu":
- tvm_array = []
- ctx = akg.tvm.context("llvm", 0)
- for _, args_val in enumerate(args):
- tvm_temp = akg.tvm.nd.array(args_val, ctx)
- tvm_array.append(tvm_temp)
- mod(*tvm_array)
- return tvm_array[-1].asnumpy()
-
- raise ValueError("mode must be aic, rpc, aic_cloud, ca, compile_cloud, compile_mini, cpu, csim, ccesim or cdiff")
-
-
- def gen_kernel_name(input_shapes, input_types, op_attrs=None, kernel_name=""):
- """generate kernel name."""
- dir_max_length = 250
- shape_info = ''
- for _, (shape, dtype) in enumerate(zip(input_shapes, input_types)):
- if isinstance(shape, (list, tuple)) and shape and isinstance(shape[0], (list, tuple)):
- for _, tmp_shape in enumerate(shape):
- vc_util.check_shape(tmp_shape)
- tmp_shape = list(tmp_shape)
- str_tmp_shape = [str(tmp) for tmp in tmp_shape]
- shape_info = "%s_%s_%s" % (shape_info, dtype, '_'.join(str_tmp_shape))
- elif isinstance(shape, akg.tvm.tensor.Tensor):
- for tmp_shape in shape.shape:
- if isinstance(tmp_shape, akg.tvm.expr.Var):
- str_shape = tmp_shape.name
- else:
- str_shape = str(tmp_shape)
- shape_info = "%s_%s_%s" % (shape_info, dtype, '_'.join(str_shape))
- else:
- vc_util.check_shape(shape)
- if isinstance(shape, akg.tvm.expr.Var):
- shape = [shape]
- shape = list(shape)
- str_shape = [str(i) for i in shape]
- shape_info = "%s_%s_%s" % (shape_info, dtype, '_'.join(str_shape))
-
- if op_attrs is not None:
- for tmp in op_attrs:
- if isinstance(tmp, (list, tuple)):
- for ele in tmp:
- if isinstance(ele, (list, tuple)):
-
- str_tmp = [str(i) for i in ele]
- shape_info = shape_info + '_' + '_'.join(str_tmp)
- else:
- shape_info = shape_info + '_' + str(ele)
-
- elif isinstance(tmp, (int, float)):
- shape_info = shape_info + '_' + str(tmp)
-
- elif isinstance(tmp, (str)):
- shape_info = shape_info + '_' + tmp
-
- elif isinstance(tmp, (np.ndarray)):
- shape = list(tmp.shape)
- str_shape = [str(i) for i in shape]
- shape_info = shape_info + '_' + '_'.join(str_shape)
-
- kernel_name = kernel_name + shape_info
- kernel_name = re.sub(r'[^0-9a-zA-Z]+', '_', kernel_name)
- if len(kernel_name) > dir_max_length:
- logging.info("Dir name %s exceed maximal length, use first %d char as dir name.", kernel_name, dir_max_length)
- kernel_name = kernel_name[:dir_max_length]
- return kernel_name
-
-
- @func_time_required
- def op_build_test(op_func, input_shapes, input_types, op_attrs=None, kernel_name="",
- attrs=None, log_cce=False, dump_ir=True, dump_code=True,
- polyhedral=True, tuning=False):
- """
- Return module from op_build with given inputs, distinguish tuning mode.
-
- Args:
- op_func (function returning an op or (op, [op_vars])): The op build function
- input_shapes(iterable of iterable of int): the dim sizes for input for op
- input_types (iterable of iterable of str): the dtypes for each input
- op_attrs (list or tuple): extra attributes for the op.
- kernel_name (str): name of op.
- attrs (dict): tiling parameter.
- log_cce (bool): False by default.
- dump_ir (bool): True by default.
- dump_code (bool): False by default.
- polyhedral (bool): True by default.
- tuning (bool): False by default.
-
- Return:
- module.
- """
- if isinstance(attrs, dict) and 'tuning' in attrs.keys():
- kernel_name = kernel_name
- else:
- kernel_name = gen_kernel_name(input_shapes, input_types, op_attrs, kernel_name)
- logging.debug('kernel_name---------- %s', str(kernel_name))
- mod = op_build(op_func, input_shapes, input_types, op_attrs, kernel_name,
- attrs, log_cce, dump_ir, dump_code,
- polyhedral, tuning)
- return mod
-
-
- def recursive_copy(obj):
- """
- Copy a container object recursively
-
- Args:
- obj (list, tuple, dict or object): input container object.
-
- Return:
- copied object.
- """
- if isinstance(obj, list):
- return [recursive_copy(it) for it in obj]
- if isinstance(obj, tuple):
- return tuple([recursive_copy(it) for it in obj])
- if isinstance(obj, dict):
- copy_obj = dict()
- for key in obj:
- copy_obj[key] = recursive_copy(obj[key])
- return copy_obj
- return obj
-
- def gen_inputs_and_shape_params(input_shapes, input_types, inputs, shape_params):
- """
- Generate akg.tvm.placeholder as inputs for op with given input_shapes and input_types
-
- Args:
- input_shapes(iterable of iterable of int): the dim sizes for input for op.
- input_types (iterable of iterable of str): the dtypes for each input.
- inputs (list): None by default.
- shape_params (list): None by default.
-
- """
- for i, (shape, dtype) in enumerate(zip(input_shapes, input_types)):
- if isinstance(shape, (list, tuple)) and shape and isinstance(shape[0], (list, tuple)):
- tmp_input = []
- for j, tmp_shape in enumerate(shape):
- tmp_input.append(akg.tvm.placeholder(tmp_shape, dtype, "input_%d_%d" % (i + 1, j + 1)))
- for tmp in tmp_shape:
- if isinstance(tmp, akg.tvm.expr.Var):
- shape_params.append(tmp)
- inputs.append(tmp_input)
- elif isinstance(shape, (list, tuple)) and shape and isinstance(shape[0], akg.tvm.expr.Var):
- inputs.append(akg.tvm.placeholder(shape, dtype, "input_%d" % (i + 1)))
- for tmp_shape in shape:
- if isinstance(tmp_shape, akg.tvm.expr.Var):
- shape_params.append(tmp_shape)
- elif isinstance(shape, akg.tvm.tensor.Tensor):
- inputs.append(shape)
- for tmp_shape in shape.shape:
- shape_params.append(tmp_shape)
- else:
- inputs.append(akg.tvm.placeholder(shape, dtype, "input_%d" % (i + 1)))
-
- def gen_attrs_params(op_attrs, attrs_params):
- """
- Parsing attrs given by op_attrs.
-
- Args:
- op_attrs (list or tuple): extra attributes for the op.
- attrs_params (list): None by default.
-
- """
- for tmp_attr in op_attrs:
- if isinstance(tmp_attr, (list, tuple)) and tmp_attr and isinstance(tmp_attr[0], akg.tvm.expr.Var):
- for attr_param in tmp_attr:
- if isinstance(attr_param, akg.tvm.expr.Var):
- attrs_params.append(attr_param)
- elif isinstance(tmp_attr, akg.tvm.expr.Var):
- attrs_params.append(tmp_attr)
-
- def get_dim_from_func_map(attrs, op_func, args):
- """
- Get tiling parameter from map defined in op_func.
-
- Args:
- attrs (dict): tiling parameter.
- op_func (function returning an op or (op, [op_vars])): The op build function.
-
- """
- if attrs is None or 'dim' not in attrs or not attrs['dim']:
- dim_info = ""
- if attrs is None:
- attrs = dict()
-
- if op_func.__name__ in ct_util.set_dim_func_map.keys():
- value = ct_util.set_dim_func_map[op_func.__name__]
- if inspect.isfunction(value):
- dim_info = value(*args)
- elif isinstance(value, dict):
- key = []
- key.append(ft_util.convert_to_list(input_shapes))
- key.append(ft_util.convert_to_list(input_types))
- if op_attrs is not None:
- key.append(op_attrs)
- key = str(tuple(key))
-
- if key in value.keys():
- dim_info = ct_util.set_dims(value[key])
- else:
- raise RuntimeError("Registered set_dim_map is invalid. Must be a function or a dict!")
- if isinstance(dim_info, (list, tuple)):
- dim_info = dim_info[0]
-
- attrs['dim'] = dim_info
- return attrs
-
- def parsing_output(output, attrs, compute_func, sch_tmpl, gpu_binds):
- """
- Parsing the outputs of op.
-
- Args:
- output (iterable of iterable of akg.tvm.tensor): the outputs of op.
- attrs (dict): tiling parameter.
- compute_func (function): None by default, func for doing compute_inline or other.
- sch_tmpl (dict): None by default.
- gpu_binds (dict): None by default.
- """
- if isinstance(output, (list, tuple)):
- from inspect import isfunction
- new_outputs = []
- for elem in output:
- if isfunction(elem):
- compute_func = elem
- elif isinstance(elem, dict):
- for key, value in elem.items():
- if key not in attrs or not attrs[key]:
- attrs[key] = value
- elif isinstance(elem, (list, tuple)):
- new_outputs += elem
- else:
- new_outputs.append(elem)
-
- output = new_outputs
- elif isinstance(output, dict):
- sch_tmpl = output
- output = sch_tmpl['output']
- gpu_binds = sch_tmpl['binds']
- return output, compute_func, sch_tmpl, gpu_binds
-
- def gen_op_var(inputs, output, op_var):
- """
- Combine inputs and outputs about the op.
-
- Args:
- inputs(list): the inputs of op.
- output(list): the outputs of op.
- op_var (list): inputs and outputs for the op.
- """
- for xx in inputs:
- if isinstance(xx, list):
- for x in xx:
- op_var.append(x)
- else:
- op_var.append(xx)
- if isinstance(output, (list, tuple)):
- op_var = op_var + [i for i in output if TensorUtils.is_output_value(i)]
- else:
- if TensorUtils.is_output_value(output):
- op_var = op_var + [output]
- return op_var
-
- def gen_shape_var(attrs_params, shape_params, shape_var):
- """
- Combine shape of inputs and extra attributes about the op.
-
- Args:
- attrs_params(list): shape of inputs for the op
- shape_params(list): extra attributes for the op
- shape_var (list): shape of inputs and extra attributes for the op.
- """
- if attrs_params:
- [shape_var.append(i) for i in attrs_params if i not in shape_var]
- [shape_var.append(i) for i in shape_params if i not in shape_var]
-
- def gen_spaces_dim_key(op_func, args, s, op_var, kernel_name, attrs, polyhedral, tuning, target):
- """
- Generate tiling parameter.
-
- Args:
- op_func (function returning an op or (op, [op_vars])): The op build function.
- args (Union[list, tuple]): list or tuple of numpy array.
- s (dict): schedule of op.
- op_var (list): the akg.tvm.tensor of inputs and outputs for op.
- kernel_name (str): name of op.
- attrs (dict): tiling parameter.
- polyhedral (bool): True by default.
- tuning (bool): False by default.
-
- Return:
- tiling parameter.
- """
- set_dim_key = ""
- if op_func.__name__ in ct_util.set_dim_func_map.keys():
- func_ = ct_util.set_dim_func_map[op_func.__name__]
- if inspect.isfunction(func_):
- set_dim_key = func_(*args)[1]
- elif op_func.__name__ in ct_util.gen_key_func_map.keys():
- func_ = ct_util.gen_key_func_map[op_func.__name__]
- if inspect.isfunction(func_):
- set_dim_key = func_(*args)
- with akg.build_config(dump_pass_ir=True):
- spaces = akg.lower(s, op_var, name=kernel_name, attrs=attrs, polyhedral=polyhedral, tuning=tuning,
- target=target)
- if set_dim_key == "":
- set_dim_key = str(args)
- return spaces, set_dim_key
-
- def create_gpu_mod(sch_tmpl, s, op_func, op_var, shape_var, kernel_name, attrs, polyhedral, binds, dump_ir, dump_code,
- tuning):
- """
- Return module for op of gpu.
-
- Args:
- sch_tmpl (dict): schedule of op and the others.
- s (dict): schedule of op.
- op_func (function returning an op or (op, [op_vars])): The op build function.
- op_var (list): the akg.tvm.tensor of inputs and outputs for op.
- shape_var (list): shape of inputs and extra attributes for the op.
- kernel_name (str): name of op.
- attrs (dict): tiling parameter.
- polyhedral (bool): True by default.
- binds (dict): BINDS
- dump_ir (bool): True by default.
- dump_code (bool): False by default.
- tuning (bool): False by default.
-
- Return:
- module.
- """
-
- if sch_tmpl is not None or (attrs and attrs.get("target", "cce") == "cuda"):
- if kernel_name == "":
- kernel_name = op_func.__name__ if sch_tmpl is None else sch_tmpl['op_name']
-
- target = CUDA
-
- if sch_tmpl is not None:
- if sch_tmpl['target'] != CUDA:
- raise ValueError("Only support cuda as target when using schedule template.")
- global kc_air_mode
- kc_air_mode = "CUDA"
- with akg.tvm.target.cuda() as target:
- if not tuning:
- s = sch_tmpl['schedule'](sch_tmpl['output'])
- with akg.tvm.build_config(dump_pass_ir=dump_ir):
- mod = akg.build(s, op_var, "cuda", shape_var, name=kernel_name, attrs=attrs,
- polyhedral=False, binds=binds)
- else:
- @autotvm.template
- def _autotune_template():
- s = sch_tmpl['schedule'](sch_tmpl['output'])
- return (s, op_var)
-
- # create autotune task
- task = autotvm.task.create(_autotune_template,
- args=list(),
- target='cuda')
-
- print("task config: ", task.config_space)
-
- # set measure_option
- measure_option = autotvm.measure_option(
- builder=autotvm.LocalBuilder(),
- runner=autotvm.LocalRunner(repeat=5, min_repeat_ms=150, timeout=4)
- )
-
- # Begin tuning, log records to file `kernel_name.log`
- tuner = autotvm.tuner.RandomTuner(task)
- if not os.path.exists(kernel_name + '.log'):
- tuner.tune(n_trial=len(task.config_space),
- measure_option=measure_option,
- callbacks=[autotvm.callback.log_to_file(kernel_name + '.log')])
-
- # query best config
- dispatch_context = autotvm.apply_history_best(kernel_name + '.log')
- best_config = dispatch_context.query(task.target, task.workload)
- print("\nBest config is:")
- print(best_config)
-
- # apply best config
- with autotvm.apply_history_best(kernel_name + '.log'):
- s, op_var = _autotune_template()
- mod = akg.build(s, op_var, "cuda", shape_var, name=kernel_name, attrs=attrs,
- polyhedral=False, binds=gpu_binds)
- else :
- with akg.build_config(dump_pass_ir=dump_ir):
- mod = akg.build(s, op_var, target, shape_var, name=kernel_name, attrs=attrs, polyhedral=polyhedral,
- binds=binds)
- if dump_code:
- source_code = mod.imported_modules[0].get_source()
- create_code(kernel_name, "./", source_code, CUDA)
- return mod
-
- def op_build(op_func, input_shapes, input_types, op_attrs=None, kernel_name="",
- attrs=None, log_cce=False, dump_ir=True, dump_code=True,
- polyhedral=True, tuning=False):
- """
- Return module built from op_func with given inputs.
-
- Args:
- op_func (function returning an op or (op, [op_vars])): The op build function.
- input_shapes(iterable of iterable of int): the dim sizes for input for op.
- input_types (iterable of iterable of str): the dtypes for each input.
- op_attrs (list or tuple): extra attributes for the op.
- kernel_name (str): name of op.
- attrs (dict): tiling parameter.
- log_cce (bool): False by default.
- dump_ir (bool): True by default.
- dump_code (bool): False by default.
- polyhedral (bool): True by default.
- tuning (bool): False by default.
-
- Return:
- module.
- """
- inputs = []
- shape_params = [] # save all the shape params for dynamic_shape cases
- gen_inputs_and_shape_params(input_shapes, input_types, inputs, shape_params)
-
- attrs_params = []
- if op_attrs is not None:
- args = inputs + op_attrs
- gen_attrs_params(op_attrs, attrs_params)
- else:
- args = inputs
-
- # backup inputs because the tensor names may be updated inside op_func
- inputs_backup = recursive_copy(inputs)
-
- output = op_func(*args)
-
- # restore inputs to make sure that tensor names are not changed by op_func
- inputs = inputs_backup
- # set dim
- attrs = get_dim_from_func_map(attrs, op_func, args)
-
- compute_func = None # func which is defined in dsl for doing compute_inline or other
- sch_tmpl = None
- gpu_binds = None
- output, compute_func, sch_tmpl, gpu_binds = parsing_output(output, attrs, compute_func, sch_tmpl, gpu_binds)
-
- op_var = []
- op_var = gen_op_var(inputs, output, op_var)
-
- shape_var = []
- gen_shape_var(attrs_params, shape_params, shape_var)
-
- if sch_tmpl is not None:
- return create_gpu_mod(sch_tmpl, None, op_func, op_var, shape_var, kernel_name, attrs, polyhedral, gpu_binds,
- dump_ir, dump_code, tuning)
-
- if isinstance(output, (list, tuple)):
- tmp = []
- for x in list(output):
- if isinstance(x, tuple):
- tmp.append(x[0].op)
- else:
- tmp.append(x.op)
- s = akg.tvm.create_schedule(tmp)
- else:
- s = akg.tvm.create_schedule(output.op)
-
- if compute_func is not None:
- compute_func(s)
- polyhedral = False
-
- target = CCE
- if attrs and attrs.get("target", "cce") == CUDA:
- target = CUDA
-
- level = attrs.get("help_tiling") if attrs and "help_tiling" in attrs else None
- if tuning or (level is not None and level > help_tiling_level['None']):
- return gen_spaces_dim_key(op_func, args, s, op_var, kernel_name, attrs, polyhedral, tuning, target)
- mode = get_runtime_mode()
- if mode == "cpu":
- mod = akg.tvm.build(s, op_var, "llvm")
- if not os.path.isdir("./cpu/ir/"):
- os.makedirs("./cpu/ir/")
- with os.fdopen(os.open("./cpu/ir/" + kernel_name + ".cc", os.O_WRONLY | os.O_CREAT, 0o400), 'w') as irf:
- irf.write(akg.tvm.lower(s, op_var, shape_var, simple_mode=True))
- return mod
-
- binds = None if not attrs else attrs.pop(BINDS, None)
- if target == CUDA:
- return create_gpu_mod(None, s, op_func, op_var, shape_var, kernel_name, attrs, polyhedral, binds, dump_ir,
- dump_code, tuning)
-
- target = CCE
- with akg.build_config(dump_pass_ir=dump_ir):
- mod = akg.build(s, op_var, target, shape_var, name=kernel_name, attrs=attrs, polyhedral=polyhedral, binds=binds)
-
- source_code = mod.imported_modules[0].get_source()
- if log_cce:
- logging.debug("#################cce code####################")
- logging.debug(source_code)
- if dump_code:
- create_code(kernel_name, "./", source_code, target)
- return mod
-
-
- def get_runtime_mode():
- """get runtime mode."""
- env_dic = os.environ
- if not env_dic.get('RUNTIME_MODE'):
- mode = 'rpc_cloud'
- else:
- mode = env_dic.get('RUNTIME_MODE')
- return mode
-
-
- def get_profiling_mode():
- """get profiling mode."""
- env_dic = os.environ
- if env_dic.get('PROFILING_MODE') and env_dic.get('PROFILING_MODE').lower() == "true":
- return True
- return False
-
-
- def product_is_mini():
- """check whether in mini environment."""
- mode = get_runtime_mode()
- if mode in ('rpc', 'air', 'aic', 'compile_mini'):
- return True
- return False
-
-
- def get_available_devices_num():
- """get available devives num."""
- env_dic = os.environ
- try:
- return int(env_dic.get('DEVICE_TOTAL_NUM').lower()) if env_dic.get('DEVICE_TOTAL_NUM') else 1
- except NameError as e:
- logging.error(e)
- return 1
-
-
- def get_device_id():
- """get device id."""
- env_dic = os.environ
- try:
- return int(env_dic.get('DEVICE_ID').lower()) if env_dic.get('DEVICE_ID') else 0
- except NameError as e:
- logging.error(e)
- return 0
-
- def get_gpu_cycles(mod, *mod_args, device_id=0, save_log=False, repeat_time=400):
- "get gpu profiling cycles."
- func = tvm.get_global_func('GPUProfilerInit')
- func("")
- from akg.utils.result_analysis import gpu_profiling
- gpu_profiling(mod, *mod_args, repeat_time=repeat_time, device_id=device_id)
- func = tvm.get_global_func('GPUProfilerStop')
- a = func()
- return int(a)
-
- class TestUtils:
- """Class for getting cycle and core num."""
- @staticmethod
- def record_cycle(cycle):
- if os.environ.get(PERFORMANCE_TEST_FILE):
- result_file = os.environ.get(PERFORMANCE_TEST_FILE)
- with open(result_file, "a+") as f:
- f.write("{0}\n".format(cycle))
-
- @staticmethod
- def record_core(stmt):
- """Function for getting performance data from cores."""
- def get_core_num():
- core_num = 1
- if hasattr(stmt, 'attr_key') and stmt.attr_key == 'thread_extent':
- core_num = stmt.value
- return core_num
- if os.environ.get(PERFORMANCE_TEST_FILE):
- result_file = os.environ.get(PERFORMANCE_TEST_FILE)
- with open(result_file, "a+") as f:
- f.write("{0}; ".format(get_core_num()))
-
|