# Copyright 2020 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ """Learning rate scheduler.""" import math from collections import Counter import numpy as np def linear_warmup_lr(current_step, warmup_steps, base_lr, init_lr): """Linear learning rate.""" lr_inc = (float(base_lr) - float(init_lr)) / float(warmup_steps) lr = float(init_lr) + lr_inc * current_step return lr def warmup_step_lr(lr, lr_epochs, steps_per_epoch, warmup_epochs, max_epoch, gamma=0.1): """Warmup step learning rate.""" base_lr = lr warmup_init_lr = 0 total_steps = int(max_epoch * steps_per_epoch) warmup_steps = int(warmup_epochs * steps_per_epoch) milestones = lr_epochs milestones_steps = [] for milestone in milestones: milestones_step = milestone * steps_per_epoch milestones_steps.append(milestones_step) lr_each_step = [] lr = base_lr milestones_steps_counter = Counter(milestones_steps) for i in range(total_steps): if i < warmup_steps: lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr) else: lr = lr * gamma**milestones_steps_counter[i] lr_each_step.append(lr) return np.array(lr_each_step).astype(np.float32) def multi_step_lr(lr, milestones, steps_per_epoch, max_epoch, gamma=0.1): return warmup_step_lr(lr, milestones, steps_per_epoch, 0, max_epoch, gamma=gamma) def step_lr(lr, epoch_size, steps_per_epoch, max_epoch, gamma=0.1): lr_epochs = [] for i in range(1, max_epoch): if i % epoch_size == 0: lr_epochs.append(i) return multi_step_lr(lr, lr_epochs, steps_per_epoch, max_epoch, gamma=gamma) def warmup_cosine_annealing_lr(lr, steps_per_epoch, warmup_epochs, max_epoch, T_max, eta_min=0): """Cosine annealing learning rate.""" base_lr = lr warmup_init_lr = 0 total_steps = int(max_epoch * steps_per_epoch) warmup_steps = int(warmup_epochs * steps_per_epoch) lr_each_step = [] for i in range(total_steps): last_epoch = i // steps_per_epoch if i < warmup_steps: lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr) else: lr = eta_min + (base_lr - eta_min) * (1. + math.cos(math.pi*last_epoch / T_max)) / 2 lr_each_step.append(lr) return np.array(lr_each_step).astype(np.float32) def warmup_cosine_annealing_lr_V2(lr, steps_per_epoch, warmup_epochs, max_epoch, T_max, eta_min=0): """Cosine annealing learning rate V2.""" base_lr = lr warmup_init_lr = 0 total_steps = int(max_epoch * steps_per_epoch) warmup_steps = int(warmup_epochs * steps_per_epoch) last_lr = 0 last_epoch_V1 = 0 T_max_V2 = int(max_epoch*1/3) lr_each_step = [] for i in range(total_steps): last_epoch = i // steps_per_epoch if i < warmup_steps: lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr) else: if i < total_steps*2/3: lr = eta_min + (base_lr - eta_min) * (1. + math.cos(math.pi*last_epoch / T_max)) / 2 last_lr = lr last_epoch_V1 = last_epoch else: base_lr = last_lr last_epoch = last_epoch-last_epoch_V1 lr = eta_min + (base_lr - eta_min) * (1. + math.cos(math.pi * last_epoch / T_max_V2)) / 2 lr_each_step.append(lr) return np.array(lr_each_step).astype(np.float32) def warmup_cosine_annealing_lr_sample(lr, steps_per_epoch, warmup_epochs, max_epoch, T_max, eta_min=0): """Warmup cosine annealing learning rate.""" start_sample_epoch = 60 step_sample = 2 tobe_sampled_epoch = 60 end_sampled_epoch = start_sample_epoch + step_sample*tobe_sampled_epoch max_sampled_epoch = max_epoch+tobe_sampled_epoch T_max = max_sampled_epoch base_lr = lr warmup_init_lr = 0 total_steps = int(max_epoch * steps_per_epoch) total_sampled_steps = int(max_sampled_epoch * steps_per_epoch) warmup_steps = int(warmup_epochs * steps_per_epoch) lr_each_step = [] for i in range(total_sampled_steps): last_epoch = i // steps_per_epoch if last_epoch in range(start_sample_epoch, end_sampled_epoch, step_sample): continue if i < warmup_steps: lr = linear_warmup_lr(i + 1, warmup_steps, base_lr, warmup_init_lr) else: lr = eta_min + (base_lr - eta_min) * (1. + math.cos(math.pi*last_epoch / T_max)) / 2 lr_each_step.append(lr) assert total_steps == len(lr_each_step) return np.array(lr_each_step).astype(np.float32)