|
|
|
@@ -6,16 +6,19 @@ import json |
|
|
|
import os |
|
|
|
|
|
|
|
from typing import Any |
|
|
|
from contextlib import contextmanager |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
import torch |
|
|
|
from torch import nn |
|
|
|
from torch.utils.data import TensorDataset, DataLoader |
|
|
|
from tqdm import tqdm |
|
|
|
from numpy.random import RandomState |
|
|
|
|
|
|
|
from . import cnn_gp |
|
|
|
from ..base import RegularStatSpecification |
|
|
|
from ..table.rkme import rkme_solve_qp |
|
|
|
from .... import setup_seed |
|
|
|
from ....logger import get_module_logger |
|
|
|
from ....utils import choose_device, allocate_cuda_idx |
|
|
|
|
|
|
|
@@ -36,6 +39,8 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
""" |
|
|
|
self.RKME_IMAGE_VERSION = 1 # Please maintain backward compatibility. |
|
|
|
|
|
|
|
self.msg=None |
|
|
|
|
|
|
|
self.z = None |
|
|
|
self.beta = None |
|
|
|
self._cuda_idx = allocate_cuda_idx() if cuda_idx is None else cuda_idx |
|
|
|
@@ -47,6 +52,7 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
if "model_config" not in kwargs |
|
|
|
else kwargs["model_config"] |
|
|
|
) |
|
|
|
self._random_generator = None |
|
|
|
|
|
|
|
super(RKMEImageSpecification, self).__init__(type=self.__class__.__name__) |
|
|
|
|
|
|
|
@@ -54,13 +60,11 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
def device(self): |
|
|
|
return self._device |
|
|
|
|
|
|
|
def _generate_models(self, n_models: int, channel: int = 3, fixed_seed=None): |
|
|
|
def _generate_models(self, n_models: int, channel: int = 3): |
|
|
|
model_class = functools.partial(_ConvNet_wide, channel=channel, **self.model_config) |
|
|
|
|
|
|
|
def __builder(i): |
|
|
|
if fixed_seed is not None: |
|
|
|
torch.manual_seed(fixed_seed[i]) |
|
|
|
return model_class().to(self._device) |
|
|
|
return model_class(random_generator=self._random_generator).to(self._device) |
|
|
|
|
|
|
|
return (__builder(m) for m in range(n_models)) |
|
|
|
|
|
|
|
@@ -71,6 +75,7 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
step_size: float = 0.01, |
|
|
|
steps: int = 100, |
|
|
|
resize: bool = True, |
|
|
|
sample_size: int = 5000, |
|
|
|
nonnegative_beta: bool = True, |
|
|
|
reduce: bool = True, |
|
|
|
verbose: bool = True, |
|
|
|
@@ -90,6 +95,8 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
Total rounds in the iterative optimization. |
|
|
|
resize : bool |
|
|
|
Whether to scale the image to the requested size, by default True. |
|
|
|
sample_size: int |
|
|
|
Size of sampled set used to generate specification |
|
|
|
nonnegative_beta : bool, optional |
|
|
|
True if weights for the reduced set are intended to be kept non-negative, by default False. |
|
|
|
reduce : bool, optional |
|
|
|
@@ -153,30 +160,42 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
self.beta = torch.from_numpy(self.beta).to(self._device) |
|
|
|
return |
|
|
|
|
|
|
|
random_models = list(self._generate_models(n_models=self.n_models, channel=X.shape[1])) |
|
|
|
self.z = torch.zeros(Z_shape).to(self._device).float().normal_(0, 1) |
|
|
|
with torch.no_grad(): |
|
|
|
x_features = self._generate_random_feature(X_train, random_models=random_models) |
|
|
|
self._update_beta(x_features, nonnegative_beta, random_models=random_models) |
|
|
|
# auto sample |
|
|
|
if len(X_train) > sample_size: |
|
|
|
indices = np.random.choice(len(X_train), size=sample_size, replace=False) |
|
|
|
X_train = X_train[indices] |
|
|
|
|
|
|
|
try: |
|
|
|
import torch_optimizer |
|
|
|
except ModuleNotFoundError: |
|
|
|
raise ModuleNotFoundError( |
|
|
|
f"RKMEImageSpecification is not available because 'torch-optimizer' is not installed! Please install it manually." |
|
|
|
) |
|
|
|
f"RKMEImageSpecification is not available because 'torch-optimizer' is not installed! Please install it manually.") |
|
|
|
|
|
|
|
optimizer = torch_optimizer.AdaBelief([{"params": [self.z]}], lr=step_size, eps=1e-16) |
|
|
|
# Cross-platform by default, unless the spec is specified to be generated specifically for local experiments. |
|
|
|
cross_platform = "experimental" not in kwargs or not kwargs["experimental"] |
|
|
|
# crucial |
|
|
|
with deterministic(cross_platform, self._device) as random_generator: |
|
|
|
self._random_generator = random_generator |
|
|
|
|
|
|
|
for _ in tqdm(range(steps)) if verbose else range(steps): |
|
|
|
# Regenerate Random Models |
|
|
|
random_models = list(self._generate_models(n_models=self.n_models, channel=X.shape[1])) |
|
|
|
self.z = torch.zeros(Z_shape).to(self._device).float() |
|
|
|
self._random_generator.normal_(self.z, 0, 1) |
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
|
x_features = self._generate_random_feature(X_train, random_models=random_models) |
|
|
|
self._update_z(x_features, optimizer, random_models=random_models) |
|
|
|
self._update_beta(x_features, nonnegative_beta, random_models=random_models) |
|
|
|
|
|
|
|
optimizer = torch_optimizer.AdaBelief([{"params": [self.z]}], lr=step_size, eps=1e-16) |
|
|
|
|
|
|
|
for _ in tqdm(range(steps)) if verbose else range(steps): |
|
|
|
# Regenerate Random Models |
|
|
|
random_models = list(self._generate_models(n_models=self.n_models, channel=X.shape[1])) |
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
|
x_features = self._generate_random_feature(X_train, random_models=random_models) |
|
|
|
self._update_z(x_features, optimizer, random_models=random_models) |
|
|
|
self._update_beta(x_features, nonnegative_beta, random_models=random_models) |
|
|
|
|
|
|
|
@torch.no_grad() |
|
|
|
def _update_beta(self, x_features: Any, nonnegative_beta: bool = True, random_models=None): |
|
|
|
Z = self.z |
|
|
|
@@ -331,7 +350,22 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
return K_12 |
|
|
|
|
|
|
|
def herding(self, T: int) -> np.ndarray: |
|
|
|
raise NotImplementedError("The function herding hasn't been supported in Image RKME Specification.") |
|
|
|
"""Iteratively sample examples from an unknown distribution with the help of its RKME specification |
|
|
|
|
|
|
|
Parameters |
|
|
|
---------- |
|
|
|
T : int |
|
|
|
Total iteration number for sampling. |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
np.ndarray |
|
|
|
A collection of examples which approximate the unknown distribution. |
|
|
|
""" |
|
|
|
indices = torch.multinomial(self.beta, T, replacement=True) |
|
|
|
mock = self.z[indices] + torch.randn_like(self.z[indices]) * 0.01 |
|
|
|
|
|
|
|
return mock.numpy() |
|
|
|
|
|
|
|
def _sampling_candidates(self, N: int) -> np.ndarray: |
|
|
|
raise NotImplementedError() |
|
|
|
@@ -383,7 +417,7 @@ class RKMEImageSpecification(RegularStatSpecification): |
|
|
|
rkme_load = json.loads(obj_text) |
|
|
|
rkme_load["z"] = torch.from_numpy(np.array(rkme_load["z"], dtype="float32")) |
|
|
|
rkme_load["beta"] = torch.from_numpy(np.array(rkme_load["beta"], dtype="float64")) |
|
|
|
|
|
|
|
|
|
|
|
for d in self.get_states(): |
|
|
|
if d in rkme_load.keys(): |
|
|
|
if d == "type" and rkme_load[d] != self.type: |
|
|
|
@@ -405,11 +439,46 @@ def _get_zca_matrix(X, reg_coef=0.1): |
|
|
|
return whitening_transform |
|
|
|
|
|
|
|
|
|
|
|
class RandomGenerator: |
|
|
|
|
|
|
|
def __init__(self, seed=0, cross_platform=True): |
|
|
|
self.cross_platform=cross_platform |
|
|
|
self.state = RandomState(seed) |
|
|
|
|
|
|
|
def normal_(self, tensor: torch.Tensor, mean=0.0, std=1.0): |
|
|
|
if self.cross_platform: |
|
|
|
data = self.state.normal(mean, std, size=tensor.shape) |
|
|
|
with torch.no_grad(): |
|
|
|
tensor.copy_(torch.asarray(data, dtype=tensor.dtype)) |
|
|
|
else: |
|
|
|
torch.nn.init.normal_(tensor, mean, std) |
|
|
|
|
|
|
|
|
|
|
|
@contextmanager |
|
|
|
def deterministic(cross_platform, device): |
|
|
|
torch.manual_seed(0) |
|
|
|
torch.cuda.manual_seed_all(0) |
|
|
|
deterministic_state = torch.backends.cudnn.deterministic |
|
|
|
torch.backends.cudnn.deterministic = True |
|
|
|
if cross_platform and torch.cuda.is_available(): |
|
|
|
torch.cuda.set_rng_state( |
|
|
|
new_state=torch.cuda.get_rng_state(device.index), |
|
|
|
device="cpu") |
|
|
|
|
|
|
|
yield RandomGenerator(seed=0, cross_platform=cross_platform) |
|
|
|
|
|
|
|
torch.backends.cudnn.deterministic = deterministic_state |
|
|
|
if cross_platform and torch.cuda.is_available(): |
|
|
|
torch.cuda.set_rng_state( |
|
|
|
new_state=torch.cuda.get_rng_state(device.index), |
|
|
|
device="cuda") |
|
|
|
|
|
|
|
|
|
|
|
class _ConvNet_wide(nn.Module): |
|
|
|
def __init__(self, channel, mu=None, sigma=None, k=2, net_width=128, net_depth=3, im_size=(32, 32)): |
|
|
|
def __init__(self, channel, random_generator, mu=None, sigma=None, k=2, net_width=128, net_depth=3, im_size=(32, 32)): |
|
|
|
self.k = k |
|
|
|
super().__init__() |
|
|
|
self.features, shape_feat = self._make_layers(channel, net_width, net_depth, im_size, mu, sigma) |
|
|
|
self.features, shape_feat = self._make_layers(channel, net_width, net_depth, im_size, mu, sigma, random_generator) |
|
|
|
# self.aggregation = nn.AvgPool2d(kernel_size=shape_feat[1]) |
|
|
|
|
|
|
|
def forward(self, x): |
|
|
|
@@ -418,14 +487,14 @@ class _ConvNet_wide(nn.Module): |
|
|
|
# out = self.aggregation(out).reshape(out.size(0), -1) |
|
|
|
return out |
|
|
|
|
|
|
|
def _make_layers(self, channel, net_width, net_depth, im_size, mu, sigma): |
|
|
|
def _make_layers(self, channel, net_width, net_depth, im_size, mu, sigma, random_generator): |
|
|
|
k = self.k |
|
|
|
|
|
|
|
layers = [] |
|
|
|
in_channels = channel |
|
|
|
shape_feat = [in_channels, im_size[0], im_size[1]] |
|
|
|
for d in range(net_depth): |
|
|
|
layers += [_build_conv2d_gaussian(in_channels, int(k * net_width), 3, 1, mean=mu, std=sigma)] |
|
|
|
layers += [_build_conv2d_gaussian(in_channels, int(k * net_width), random_generator, 3, 1, mean=mu, std=sigma)] |
|
|
|
shape_feat[0] = int(k * net_width) |
|
|
|
|
|
|
|
layers += [nn.ReLU(inplace=True)] |
|
|
|
@@ -438,15 +507,15 @@ class _ConvNet_wide(nn.Module): |
|
|
|
return nn.Sequential(*layers), shape_feat |
|
|
|
|
|
|
|
|
|
|
|
def _build_conv2d_gaussian(in_channels, out_channels, kernel=3, padding=1, mean=None, std=None): |
|
|
|
def _build_conv2d_gaussian(in_channels, out_channels, random_generator: RandomGenerator, kernel=3, padding=1, mean=None, std=None): |
|
|
|
layer = nn.Conv2d(in_channels, out_channels, kernel, padding=padding) |
|
|
|
if mean is None: |
|
|
|
mean = 0 |
|
|
|
if std is None: |
|
|
|
std = np.sqrt(2) / np.sqrt(layer.weight.shape[1] * layer.weight.shape[2] * layer.weight.shape[3]) |
|
|
|
# print('Initializing Conv. Mean=%.2f, std=%.2f'%(mean, std)) |
|
|
|
torch.nn.init.normal_(layer.weight, mean, std) |
|
|
|
torch.nn.init.normal_(layer.bias, 0, 0.1) |
|
|
|
random_generator.normal_(layer.weight, mean, std) |
|
|
|
random_generator.normal_(layer.bias, 0, 0.1) |
|
|
|
return layer |
|
|
|
|
|
|
|
|
|
|
|
|