|
- # Copyright 2019 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- """Tuner for finding best config for operators"""
- import logging
- import time
- import json
- import os
- import numpy as np
- from multiprocessing import Process
- from tvm.autotvm.tuner.xgboost_cost_model import XgbCostModel
- from tvm.autotvm.tuner.sa_model_optimizer import SimulatedAnnealingOptimizer
- from akg.auto_tune.space import ConfigSpace
- from akg.auto_tune.runner import KernelRunner
-
- logger = logging.getLogger('fuzz.tune.autotuning.tuner')
-
-
- class Tuner:
- """Basic tuner class
-
- Parameters
- ----------
- runner: KernelRunner
- This is for run kernels in physical device
- config_space: ConfigSpace
- The space of configs
- n_parallel: int
- How many kernels are processed in a turn
- """
-
- def __init__(self, runner: KernelRunner, index_table: list, config_space: ConfigSpace, n_parallel: int = 1):
- self._runner = runner
- self._index_table = index_table
- self._space = config_space
- self._n_parallel = n_parallel
-
- # trial plan
- self._trials = []
- self._trial_pt = 0
- self._visited = set()
-
- # observed samples
- self._xs = []
- self._ys = []
-
- # keep the current best
- self._best_config = None # type: ConfigEntity
- self._index_table = list() # used to parse best config into attrs
- self._best_time = np.inf
- self._best_iter = 0
- self._tuning_time = 0.0
- self._original_time = np.inf
-
- @property
- def best_config(self):
- return self._best_config
-
- @property
- def best_time(self):
- return self._best_time
-
- @property
- def best_iter(self):
- return self._best_iter
-
- @property
- def tuning_time(self):
- return self._tuning_time
-
- @property
- def original_time(self):
- return self._original_time
-
- @property
- def xs(self):
- return self._xs
-
- @property
- def ys(self):
- return self._ys
-
- def info(self):
- print('space size:', self._space.length)
- print('best config:', self._best_config)
- print('best time:', self._best_time)
- print('best_iter:', self._best_iter)
- print('tuning time:', self._tuning_time, 'secs')
-
- def next_batch(self, batch_size: int, is_add_visited=True):
- """extract next batch with xgboost model"""
- ret = []
- counter = 0
- if not is_add_visited:
- return [self._space.get(index) for index in range(min(batch_size, self._space.length))]
- while counter < batch_size and self._space.has_next():
- index = 0
- while self._trial_pt < len(self._trials):
- index = self._trials[self._trial_pt]
- if index not in self._visited:
- break
- self._trial_pt += 1
-
- if self._trial_pt >= len(self._trials):
- # if the trial list is empty choose randomly
- index = self._space.fetch_index()
-
- ret.append(self._space.get(index))
- self._visited.add(index)
-
- counter += 1
- return ret
-
- def next_config(self, batch_size: int):
- """extract next config orderly"""
- ret = []
- counter = 0
- while counter < batch_size and self._space.has_next():
- index = self._space.fetch_next_index()
- ret.append(self._space.get(index))
- self._visited.add(index)
- counter += 1
- return ret
-
- def export_configs(self, configs: list, output_file: str, append: bool = True, desc=""):
- """export configs"""
- mode = "a" if append else "w"
- with open(output_file, mode) as f:
- for x, y in configs:
- f.write("{} | {} | {}\n".format(desc, json.dumps(x._asdict()), y))
-
- def export_dim_configs(self, configs, output_file: str, append: bool = True, key=""):
- """export dim configs"""
- mode = "a" if append else "w"
- data = {}
- try:
- if os.path.isfile(output_file):
- with open(output_file, 'r') as f:
- data = json.load(f)
- except IOError as e:
- logger.debug("get dim info from [%s] failed: %s", output_file, str(e))
- with open(output_file, mode) as f:
- import re
- data[key] = configs
- s = json.dumps(data, sort_keys=True)
- s = re.sub(r',\s*"', ',\n"', s)
- s = '{\n' + s[1:-1] + '\n}'
- f.write(s)
-
- def export_dim_configs_for_keys(self, configs, output_file: str, append: bool = True, keys=[]):
- """export dim configs"""
- mode = "a" if append else "w"
- data = {}
- try:
- if os.path.isfile(output_file):
- with open(output_file, 'r') as f:
- data = json.load(f)
- except IOError as e:
- logger.debug("get dim info from [%s] failed: %s", output_file, str(e))
- with open(output_file, mode) as f:
- import copy
- data_tmp = copy.deepcopy(data)
- res_key = []
- for key in keys:
- if key in data_tmp:
- data_tmp = data_tmp[key]
- res_key.append(key)
- tmp = copy.deepcopy(configs)
- info = {}
- for key in reversed(keys):
- if not key in res_key:
- info = {key: tmp}
- tmp = copy.deepcopy(info)
- data_change = data
- for key in res_key:
- data_change = data_change[key]
- data_change.update(**info)
- s = json.dumps(data, sort_keys=True, indent=4)
- f.write(s)
-
- def load_configs(self, input_file: str):
- """load configs"""
- configs = []
- file_path = os.path.realpath(input_file)
- if os.path.isfile(file_path):
- with open(file_path, "r") as f:
- for line in f:
- x, y, _ = line.split('|')
- configs.append((self._space.input_type(**json.loads(x)), np.float64(y)))
- return configs
-
- def tune(self, least_try_times: int, output_file: str = None):
- """grid search all configs"""
- i = 0
- while i < least_try_times:
- if not self._space.has_next():
- break
- configs = self.next_config(min(self._n_parallel, least_try_times - i))
- run_times = self._runner.run(configs, self._best_time)
- results = []
- for idx, conf in enumerate(configs):
- results.append((conf.input_id, run_times[idx]))
- # keep best config
- if self.best_time > run_times[idx]:
- self._best_time = run_times[idx]
- self._best_iter = i + idx
- self._best_config = conf
-
- i += len(results)
-
- # update
- for res in results:
- self._xs.append(res[0])
- self._ys.append(res[1])
- if output_file:
- configs = [(self._space.get(res[0]).input, res[1]) for res in results]
- self.export_configs(configs, output_file)
- return run_times
-
-
- class ModelBasedTuner(Tuner):
- """Model based tuner
- This tuner will fit a cost model and use an optimizer to find the maximums of the cost model as next trials
-
- Parameters
- ----------
- plan_size: int
- Tuner will re-fit model per `plan_size` new measure samples
- pre_model: CostModel
- The cost model that predicts the speed of a config (IR)
- """
-
- def __init__(self, runner, index_table, config_space, n_parallel=1, plan_size=32, pre_model=None):
- super(ModelBasedTuner, self).__init__(runner, index_table, config_space, n_parallel)
- self.__plan_size = plan_size
-
- if pre_model is not None:
- self.__cost_model = pre_model
- self.__cost_model.reset_space(self._space)
- else:
- self.__cost_model = XgbCostModel(self._space)
-
- self.__model_optimizer = SimulatedAnnealingOptimizer(self._space)
- self.__train_ct = 0
-
- self.__is_auto_set_dim = True
-
- # time to leave
- self.__ttl = None
- self.__least_try_times = None
- self.__early_stopping = None
-
- self.__model_run_time = 0.0
-
- def info(self):
- super(ModelBasedTuner, self).info()
- print('model run time:', self.__model_run_time, 'secs')
-
- def model_res(self):
- self.__cost_model.fit(self._xs, self._ys, self.__plan_size)
- best_configs = self.__model_optimizer.find_best(
- self.__cost_model, self.__plan_size, self._visited)
- self._trials = best_configs
-
- def tune(self, least_try_times: int, output_file: str = None):
- early_stopping = least_try_times
- self.__least_try_times = least_try_times
- self.__early_stopping = early_stopping
-
- old_level = logger.level
- i = 0
- error_ct = 0
-
- tuning_start = time.time()
- while (i < self._space.length and (i < least_try_times
- or (self._best_time > self._original_time - 0.9
- and i < least_try_times * 3))):
- if not self._space.has_next():
- break
- iter_start = time.time()
- if not self.__is_auto_set_dim:
- configs = self.next_batch(min(self._n_parallel, self._space.length - i))
- else:
- configs = self.next_batch(min(self._n_parallel, self._space.length - i), False)
-
- logger.debug('--indexes: %s', str([x.input_id for x in configs]))
-
- run_times = self._runner.run(configs, self._best_time, self.__is_auto_set_dim)
- if self.__is_auto_set_dim:
- # profiling start fail occasionally
- run_fail = 9999999999.0
- run_times = [x for x in run_times if x != run_fail]
- if len(run_times) == 0:
- self._original_time = run_fail
- else:
- from operator import add
- from functools import reduce
- self._original_time = reduce(add, run_times) / len(run_times)
- self._best_time = self._original_time
- self._best_iter = -1
- self._best_config = None
- run_times = None
- self.__is_auto_set_dim = False
- continue
-
- results = []
- for idx, conf in enumerate(configs):
- results.append((conf.input_id, run_times[idx]))
- # keep best config
- if self._best_time - 600 > run_times[idx]:
- self._best_time = run_times[idx]
- self._best_iter = i + idx
- self._best_config = conf
-
- i += len(results)
- self.__ttl = min(early_stopping + self.best_iter, self._space.length) - i
-
- start = time.time()
- # update
- for res in results:
- self._xs.append(res[0])
- self._ys.append(res[1])
- if output_file:
- configs = [(self._space.get(res[0]).input, res[1]) for res in results]
- desc = str(self._runner.op_desc)
- self.export_configs(configs, output_file, desc=desc)
-
- # if we have enough new training samples
- if len(self._xs) >= self.__plan_size * (self.__train_ct + 1):
- p = Process(target=self.model_res)
- p.start()
- p.join()
- self._trial_pt = 0
- self.__train_ct += 1
-
- end = time.time()
- logger.debug('model running time: %f seconds', end - start)
- self.__model_run_time += end - start
-
- iter_end = time.time()
- logger.debug('iter time: %f seconds', iter_end - iter_start)
-
- if self._best_iter > 0 and i >= self.best_iter + early_stopping:
- logger.warning('Early stopped. Best iter: %d', self._best_iter)
- return
-
- if self._best_time < 1000:
- logger.warning('Early stopped for this is a small shape. Best iter: %d', self._best_iter)
- return
-
- logger.debug("tuning time already, %f", time.time() - tuning_start)
- if time.time() - tuning_start > 7200:
- logger.warning('Early stopped because of too long time. Best iter: %d', self._best_iter)
- return
-
- if error_ct > 150:
- logging.warning('Too many errors happen in the tuning. Now is in debug mode')
- logger.setLevel(logging.DEBUG)
- else:
- logger.setLevel(old_level)
-
- self._tuning_time += time.time() - tuning_start
|