Browse Source

Merge pull request #138 from Learnware-LAMDA/enh_fix_image_workflow

[ENH, FIX, MNT] Add Image WorkFlow
tags/v0.3.2
Gene GitHub 2 years ago
parent
commit
5a500abefb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 531 additions and 887 deletions
  1. +0
    -0
      examples/__init__.py
  2. +31
    -0
      examples/dataset_image_workflow/README.md
  3. +62
    -0
      examples/dataset_image_workflow/config.py
  4. +0
    -26
      examples/dataset_image_workflow/example_files/example_init.py
  5. +0
    -8
      examples/dataset_image_workflow/example_files/example_yaml.yaml
  6. +0
    -183
      examples/dataset_image_workflow/example_files/model.py
  7. +0
    -283
      examples/dataset_image_workflow/get_data.py
  8. +0
    -221
      examples/dataset_image_workflow/main.py
  9. +82
    -0
      examples/dataset_image_workflow/model.py
  10. +76
    -156
      examples/dataset_image_workflow/utils.py
  11. +258
    -0
      examples/dataset_image_workflow/workflow.py
  12. +3
    -1
      learnware/reuse/ensemble_pruning.py
  13. +14
    -6
      learnware/reuse/job_selector.py
  14. +1
    -1
      learnware/specification/regular/image/rkme.py
  15. +4
    -2
      learnware/specification/regular/table/rkme.py

+ 0
- 0
examples/__init__.py View File


+ 31
- 0
examples/dataset_image_workflow/README.md View File

@@ -0,0 +1,31 @@
# Image Dataset Workflow Example

## Introduction

For the CIFAR-10 dataset, we sampled the training set unevenly by category and constructed unbalanced training datasets for the 50 learnwares that contained only some of the categories. This makes it unlikely that there exists any learnware in the learnware market that can accurately handle all categories of data; only the learnware whose training data is closest to the data distribution of the target task is likely to perform well on the target task. Specifically, the probability of each category being sampled obeys a random multinomial distribution, with a non-zero probability of sampling on only 4 categories, and the sampling ratio is 0.4: 0.4: 0.1: 0.1. Ultimately, the training set for each learnware contains 12,000 samples covering the data of 4 categories in CIFAR-10.

We constructed 50 target tasks using data from the test set of CIFAR-10. Similar to constructing the training set for the learnwares, in order to allow for some variation between tasks, we sampled the test set unevenly. Specifically, the probability of each category being sampled obeys a random multinomial distribution, with non-zero sampling probability on 6 categories, and the sampling ratio is 0.3: 0.3: 0.1: 0.1: 0.1: 0.1. Ultimately, each target task contains 3000 samples covering the data of 6 categories in CIFAR-10.

With this experimental setup, we evaluated the performance of RKME Image by calculating the mean accuracy across all users.

| Metric | Value |
|--------------------------------------|---------------------|
| Mean in Market (Single) | 0.346 |
| Best in Market (Single) | 0.688 |
| Top-1 Reuse (Single) | 0.534 |
| Job Selector Reuse (Multiple) | 0.534 |
| Average Ensemble Reuse (Multiple) | 0.676 |

In some specific settings, the user will have a small number of labeled samples. In such settings, learning the weight of selected learnwares on a limited number of labeled samples can result in a better performance than training directly on a limited number of labeled samples.

<div align=center>
<img src="../../docs/_static/img/image_labeled.png" alt="Image Limited Labeled Data" style="width:50%;" />
</div>

## Run the code

Run the following command to start the ``image_example``.

```bash
python workflow.py image_example
```

+ 62
- 0
examples/dataset_image_workflow/config.py View File

@@ -0,0 +1,62 @@
from learnware.tests.benchmarks import BenchmarkConfig


image_benchmark_config = BenchmarkConfig(
name="CIFAR-10",
user_num=100,
learnware_ids=[
"00002207",
"00002208",
"00002209",
"00002210",
"00002211",
"00002212",
"00002213",
"00002214",
"00002215",
"00002216",
"00002217",
"00002218",
"00002219",
"00002220",
"00002221",
"00002222",
"00002223",
"00002224",
"00002225",
"00002226",
"00002227",
"00002228",
"00002229",
"00002230",
"00002231",
"00002232",
"00002233",
"00002234",
"00002235",
"00002236",
"00002237",
"00002238",
"00002239",
"00002240",
"00002241",
"00002242",
"00002243",
"00002244",
"00002245",
"00002246",
"00002247",
"00002248",
"00002249",
"00002250",
"00002251",
"00002252",
"00002253",
"00002254",
"00002255",
"00002256",
],
test_data_path="CIFAR-10/test_data.zip",
train_data_path="CIFAR-10/train_data.zip",
extra_info_path="CIFAR-10/extra_info.zip",
)

+ 0
- 26
examples/dataset_image_workflow/example_files/example_init.py View File

@@ -1,26 +0,0 @@
import os
import joblib
import numpy as np
from learnware.model import BaseModel
from .model import ConvModel
import torch


class Model(BaseModel):
def __init__(self):
super().__init__(input_shape=(3, 32, 32), output_shape=(10,))
dir_path = os.path.dirname(os.path.abspath(__file__))
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = ConvModel(channel=3, n_random_features=10).to(self.device)
self.model.load_state_dict(torch.load(os.path.join(dir_path, "conv_model.pth")))
self.model.eval()

def fit(self, X: np.ndarray, y: np.ndarray):
pass

def predict(self, X: np.ndarray) -> np.ndarray:
X = torch.Tensor(X).to(self.device)
return self.model(X)

def finetune(self, X: np.ndarray, y: np.ndarray):
pass

+ 0
- 8
examples/dataset_image_workflow/example_files/example_yaml.yaml View File

@@ -1,8 +0,0 @@
model:
class_name: Model
kwargs: {}
stat_specifications:
- module_path: learnware.specification
class_name: RKMEImageSpecification
file_name: rkme.json
kwargs: {}

+ 0
- 183
examples/dataset_image_workflow/example_files/model.py View File

@@ -1,183 +0,0 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np


class Linear(nn.Module):
def __init__(self, input_feature=256, num_classes=10):
super().__init__()
self.linear_1 = nn.Linear(input_feature, 128)
self.dropout_1 = nn.Dropout(p=0.5)
self.linear_2 = nn.Linear(128, 128)
self.dropout_2 = nn.Dropout(p=0.5)
self.linear_3 = nn.Linear(128, num_classes)

def forward(self, x):
out1 = F.relu(self.dropout_1(self.linear_1(x)))
out2 = F.relu(self.dropout_2(self.linear_2(out1)))
out = self.linear_3(out2)
return out


class OriginModel(nn.Module):
def __init__(self, last_layer_feature=256):
super().__init__()
self.linear_1 = nn.Linear(last_layer_feature, 128)
self.linear_2 = nn.Linear(128, 128)
self.linear_3 = nn.Linear(128, 10)

def forward(self, x):
out = F.relu(self.linear_1(x))
out = F.relu(self.linear_2(out))
out = self.linear_3(out)
return out


class ConvModel(nn.Module):
def __init__(
self,
channel,
n_random_features,
net_width=64,
net_depth=3,
net_act="relu",
net_norm="batchnorm",
net_pooling="avgpooling",
im_size=(32, 32),
):
super().__init__()
# print('Building Conv Model')
self.features, shape_feat = self._make_layers(
channel, net_width, net_depth, net_norm, net_act, net_pooling, im_size
)
num_feat = shape_feat[0] * shape_feat[1] * shape_feat[2]
self.classifier = GaussianLinear(num_feat, n_random_features)

def forward(self, x):
out = self.features(x)
out = out.reshape(out.size(0), -1)
out = self.classifier(out)
return out

def _get_activation(self, net_act):
if net_act == "sigmoid":
return nn.Sigmoid()
elif net_act == "relu":
return nn.ReLU(inplace=True)
elif net_act == "leakyrelu":
return nn.LeakyReLU(negative_slope=0.01)
elif net_act == "gelu":
return nn.SiLU()
else:
exit("unknown activation function: %s" % net_act)

def _get_pooling(self, net_pooling):
if net_pooling == "maxpooling":
return nn.MaxPool2d(kernel_size=2, stride=2)
elif net_pooling == "avgpooling":
return nn.AvgPool2d(kernel_size=2, stride=2)
elif net_pooling == "none":
return None
else:
exit("unknown net_pooling: %s" % net_pooling)

def _get_normlayer(self, net_norm, shape_feat):
# shape_feat = (c*h*w)
if net_norm == "batchnorm":
return nn.BatchNorm2d(shape_feat[0], affine=True)
elif net_norm == "layernorm":
return nn.LayerNorm(shape_feat, elementwise_affine=True)
elif net_norm == "instancenorm":
return nn.GroupNorm(shape_feat[0], shape_feat[0], affine=True)
elif net_norm == "groupnorm":
return nn.GroupNorm(4, shape_feat[0], affine=True)
elif net_norm == "none":
return None
else:
exit("unknown net_norm: %s" % net_norm)

def _make_layers(self, channel, net_width, net_depth, net_norm, net_act, net_pooling, im_size):
layers = []
in_channels = channel
# if im_size[0] == 28:
# im_size = (32, 32)
shape_feat = [in_channels, im_size[0], im_size[1]]
for d in range(net_depth):
# print(shape_feat)
layers += [Conv2d_gaussian(in_channels, net_width, kernel_size=3, padding=1)]
# layers += [nn.Conv2d(in_channels, net_width, kernel_size=3, padding='same')]
shape_feat[0] = net_width
if net_norm != "none":
layers += [self._get_normlayer(net_norm, shape_feat)]
layers += [self._get_activation(net_act)]
in_channels = net_width
if net_pooling != "none":
layers += [self._get_pooling(net_pooling)]
shape_feat[1] //= 2
shape_feat[2] //= 2

return nn.Sequential(*layers), shape_feat


class Conv2d_gaussian(torch.nn.Conv2d):
def reset_parameters(self) -> None:
# Setting a=sqrt(5) in kaiming_uniform is the same as initializing with
# uniform(-1/sqrt(k), 1/sqrt(k)), where k = weight.size(1) * prod(*kernel_size)
# For more details see: https://github.com/pytorch/pytorch/issues/15314#issuecomment-477448573
# torch.nn.init.kaiming_normal_(self.weight, a= math.sqrt(5))
# W has shape out, in, h, w
torch.nn.init.normal_(
self.weight, 0, np.sqrt(2) / np.sqrt(self.weight.shape[1] * self.weight.shape[2] * self.weight.shape[3])
)
if self.bias is not None:
fan_in, _ = torch.nn.init._calculate_fan_in_and_fan_out(self.weight)
# print(fan_in)
if fan_in != 0:
# bound = 0 * 1 / math.sqrt(fan_in)
# torch.nn.init.uniform_(self.bias, -bound, bound)
# torch.nn.init.uniform_(self.bias, -bound, bound)
torch.nn.init.normal_(self.bias, 0, 0.1)


class GaussianLinear(torch.nn.Module):
__constants__ = ["in_features", "out_features"]
in_features: int
out_features: int
weight: torch.Tensor

def __init__(
self, in_features: int, out_features: int, bias: bool = True, device=None, dtype=None, funny=False
) -> None:
factory_kwargs = {"device": device, "dtype": dtype}
super(GaussianLinear, self).__init__()
self.funny = funny
self.in_features = in_features
self.out_features = out_features
self.weight = torch.nn.Parameter(torch.empty((out_features, in_features), **factory_kwargs))
if bias:
self.bias = torch.nn.Parameter(torch.empty(out_features, **factory_kwargs))
else:
self.register_parameter("bias", None)
self.reset_parameters()

def reset_parameters(self) -> None:
# Setting a=sqrt(5) in kaiming_uniform is the same as initializing with
# uniform(-1/sqrt(in_features), 1/sqrt(in_features)). For details, see
# https://github.com/pytorch/pytorch/issues/57109
# torch.nn.init.kaiming_normal_(self.weight, a=1 * np.sqrt(5))
torch.nn.init.normal_(self.weight, 0, np.sqrt(2) / np.sqrt(self.in_features))
# torch.nn.init.normal_(self.weight, 0, 3/np.sqrt(self.in_features))
if self.bias is not None:
fan_in, _ = torch.nn.init._calculate_fan_in_and_fan_out(self.weight)
bound = 1 / np.sqrt(fan_in) if fan_in > 0 else 0
# torch.nn.init.uniform_(self.bias, -bound, bound)
torch.nn.init.normal_(self.bias, 0, 0.1)

def forward(self, input: torch.Tensor) -> torch.Tensor:
return torch.nn.functional.linear(input, self.weight, self.bias)

def extra_repr(self) -> str:
return "in_features={}, out_features={}, bias={}".format(
self.in_features, self.out_features, self.bias is not None
)

+ 0
- 283
examples/dataset_image_workflow/get_data.py View File

@@ -1,283 +0,0 @@
import torch
from torchvision import datasets, transforms
import torch.nn.functional as F
from scipy.ndimage.interpolation import rotate as scipyrotate

import numpy as np


def get_fashion_mnist(data_root="./data", output_channels=1, image_size=28):
ds_train = datasets.FashionMNIST(
data_root,
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)
X_train = ds_train.data
y_train = ds_train.targets
ds_test = datasets.FashionMNIST(
data_root,
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)

X_test = ds_test.data
y_test = ds_test.targets

X_train = X_train[:, None, :, :].float()
X_test = X_test[:, None, :, :].float()

if output_channels > 1:
X_train = torch.cat([X_train for i in range(output_channels)], 1)
X_test = torch.cat([X_test for i in range(output_channels)], 1)

X_test = (X_test - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))
X_train = (X_train - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))

return X_train, y_train, X_test, y_test


def get_mnist(data_root="./data/", output_channels=1, image_size=28):
ds_train = datasets.MNIST(
data_root,
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)
X_train = []

for x, _ in ds_train:
X_train.append(x)
X_train = torch.stack(X_train)

y_train = ds_train.targets
ds_test = datasets.MNIST(
data_root,
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)

X_test = []

for x, _ in ds_test:
X_test.append(x)
X_test = torch.stack(X_test)

y_test = ds_test.targets

if output_channels > 1:
X_train = torch.cat([X_train for i in range(output_channels)], 1)
X_test = torch.cat([X_test for i in range(output_channels)], 1)

X_test = (X_test - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))
X_train = (X_train - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))

return X_train, y_train, X_test, y_test


def get_cifar10(data_root="./data/", output_channels=3, image_size=32):
ds_train = datasets.CIFAR10(
data_root,
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)
X_train = ds_train.data
y_train = ds_train.targets
ds_test = datasets.CIFAR10(
data_root,
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)

X_test = ds_test.data
y_test = ds_test.targets

X_train = torch.Tensor(np.moveaxis(X_train, 3, 1))
y_train = torch.Tensor(y_train).long()
X_test = torch.Tensor(np.moveaxis(X_test, 3, 1))
y_test = torch.Tensor(y_test).long()

if output_channels == 1:
X_train = torch.mean(X_train, 1, keepdim=True)
X_test = torch.mean(X_test, 1, keepdim=True)

X_test = (X_test - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))
X_train = (X_train - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))

return X_train, y_train, X_test, y_test


def get_svhn(output_channels=1, image_size=32):
ds_train = datasets.SVHN(
"./data/",
split="train",
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)
X_train = ds_train.data
y_train = ds_train.labels
ds_test = datasets.SVHN(
"./data/",
split="test",
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)

X_test = ds_test.data
y_test = ds_test.labels

X_train = torch.Tensor(X_train)
y_train = torch.Tensor(y_train).long()
X_test = torch.Tensor(X_test)
y_test = torch.Tensor(y_test).long()

if output_channels == 1:
X_train = torch.mean(X_train, 1, keepdim=True)
X_test = torch.mean(X_test, 1, keepdim=True)

X_test = (X_test - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))
X_train = (X_train - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))

return X_train, y_train, X_test, y_test


def get_cifar100(data_root="./data/", output_channels=3, image_size=32):
ds_train = datasets.CIFAR100(
data_root,
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)
X_train = ds_train.data
y_train = ds_train.targets
ds_test = datasets.CIFAR100(
data_root,
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Resize([image_size, image_size])]),
)

X_test = ds_test.data
y_test = ds_test.targets

X_train = torch.Tensor(np.moveaxis(X_train, 3, 1))
y_train = torch.Tensor(y_train).long()
X_test = torch.Tensor(np.moveaxis(X_test, 3, 1))
y_test = torch.Tensor(y_test).long()

if output_channels == 1:
X_train = torch.mean(X_train, 1, keepdim=True)
X_test = torch.mean(X_test, 1, keepdim=True)

X_test = (X_test - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))
X_train = (X_train - torch.mean(X_train, [0, 2, 3], keepdim=True)) / (torch.std(X_train, [0, 2, 3], keepdim=True))

return X_train, y_train, X_test, y_test


def get_zca_matrix(X, reg_coef=0.1):
X_flat = X.reshape(X.shape[0], -1)
cov = (X_flat.T @ X_flat) / X_flat.shape[0]
reg_amount = reg_coef * torch.trace(cov) / cov.shape[0]
u, s, _ = torch.svd(cov.cuda() + reg_amount * torch.eye(cov.shape[0]).cuda())
inv_sqrt_zca_eigs = s ** (-0.5)
whitening_transform = torch.einsum("ij,j,kj->ik", u, inv_sqrt_zca_eigs, u)

return whitening_transform.cpu()


def layernorm_data(X):
X_processed = X - torch.mean(X, [1, 2, 3], keepdim=True)
X_processed = X_processed / torch.sqrt(torch.sum(X_processed**2, [1, 2, 3], keepdim=True))

return X_processed


def transform_data(X, whitening_transform):
if len(whitening_transform.shape) == 2:
X_flat = X.reshape(X.shape[0], -1)
X_flat = X_flat @ whitening_transform
return X_flat.view(*X.shape)
else:
X_flat = X.reshape(X.shape[0], -1)
X_flat = torch.einsum("nd, ndi->ni", X_flat, whitening_transform)
return X_flat.view(*X.shape)


def scale_to_zero_one(X):
mins = torch.min(X.view(X.shape[0], -1), 1)[0].view(-1, 1, 1, 1)
maxes = torch.max(X.view(X.shape[0], -1), 1)[0].view(-1, 1, 1, 1)
return (X - mins) / (maxes - mins)


def augment(images, dc_aug_param, device):
# This can be sped up in the future.

if dc_aug_param != None and dc_aug_param["strategy"] != "none":
scale = dc_aug_param["scale"]
crop = dc_aug_param["crop"]
rotate = dc_aug_param["rotate"]
noise = dc_aug_param["noise"]
strategy = dc_aug_param["strategy"]

shape = images.shape
mean = []
for c in range(shape[1]):
mean.append(float(torch.mean(images[:, c])))

def cropfun(i):
im_ = torch.zeros(shape[1], shape[2] + crop * 2, shape[3] + crop * 2, dtype=torch.float, device=device)
for c in range(shape[1]):
im_[c] = mean[c]
im_[:, crop : crop + shape[2], crop : crop + shape[3]] = images[i]
r, c = np.random.permutation(crop * 2)[0], np.random.permutation(crop * 2)[0]
images[i] = im_[:, r : r + shape[2], c : c + shape[3]]

def scalefun(i):
h = int((np.random.uniform(1 - scale, 1 + scale)) * shape[2])
w = int((np.random.uniform(1 - scale, 1 + scale)) * shape[2])
tmp = F.interpolate(
images[i : i + 1],
[h, w],
)[0]
mhw = max(h, w, shape[2], shape[3])
im_ = torch.zeros(shape[1], mhw, mhw, dtype=torch.float, device=device)
r = int((mhw - h) / 2)
c = int((mhw - w) / 2)
im_[:, r : r + h, c : c + w] = tmp
r = int((mhw - shape[2]) / 2)
c = int((mhw - shape[3]) / 2)
images[i] = im_[:, r : r + shape[2], c : c + shape[3]]

def rotatefun(i):
im_ = scipyrotate(
images[i].cpu().data.numpy(),
angle=np.random.randint(-rotate, rotate),
axes=(-2, -1),
cval=np.mean(mean),
)
r = int((im_.shape[-2] - shape[-2]) / 2)
c = int((im_.shape[-1] - shape[-1]) / 2)
images[i] = torch.tensor(im_[:, r : r + shape[-2], c : c + shape[-1]], dtype=torch.float, device=device)

def noisefun(i):
images[i] = images[i] + noise * torch.randn(shape[1:], dtype=torch.float, device=device)

augs = strategy.split("_")

for i in range(shape[0]):
choice = np.random.permutation(augs)[0] # randomly implement one augmentation
if choice == "crop":
cropfun(i)
elif choice == "scale":
scalefun(i)
elif choice == "rotate":
rotatefun(i)
elif choice == "noise":
noisefun(i)

return images

+ 0
- 221
examples/dataset_image_workflow/main.py View File

@@ -1,221 +0,0 @@
import numpy as np
import torch
from tqdm import tqdm

from get_data import *
import os
import random

from learnware.specification import RKMEImageSpecification
from learnware.reuse.averaging import AveragingReuser
from utils import generate_uploader, generate_user, ImageDataLoader, train, eval_prediction
from learnware.learnware import Learnware
import time

from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.market.easy import database_ops
from learnware.learnware import Learnware
import learnware.specification as specification
from learnware.logger import get_module_logger

from shutil import copyfile, rmtree
import zipfile

logger = get_module_logger("image_test", level="INFO")
origin_data_root = "./data/origin_data"
processed_data_root = "./data/processed_data"
tmp_dir = "./data/tmp"
learnware_pool_dir = "./data/learnware_pool"
dataset = "cifar10"
n_uploaders = 30
n_users = 20
n_classes = 10
data_root = os.path.join(origin_data_root, dataset)
data_save_root = os.path.join(processed_data_root, dataset)
user_save_root = os.path.join(data_save_root, "user")
uploader_save_root = os.path.join(data_save_root, "uploader")
model_save_root = os.path.join(data_save_root, "uploader_model")
os.makedirs(data_root, exist_ok=True)
os.makedirs(user_save_root, exist_ok=True)
os.makedirs(uploader_save_root, exist_ok=True)
os.makedirs(model_save_root, exist_ok=True)


semantic_specs = [
{
"Data": {"Values": ["Tabular"], "Type": "Class"},
"Task": {"Values": ["Classification"], "Type": "Class"},
"Library": {"Values": ["Pytorch"], "Type": "Class"},
"Scenario": {"Values": ["Business"], "Type": "Tag"},
"Description": {"Values": "", "Type": "String"},
"Name": {"Values": "learnware_1", "Type": "String"},
"Output": {"Dimension": 10},
"License": {"Values": ["MIT"], "Type": "Class"},
}
]

user_semantic = {
"Data": {"Values": ["Tabular"], "Type": "Class"},
"Task": {"Values": ["Classification"], "Type": "Class"},
"Library": {"Values": ["Pytorch"], "Type": "Class"},
"Scenario": {"Values": ["Business"], "Type": "Tag"},
"Description": {"Values": "", "Type": "String"},
"Name": {"Values": "", "Type": "String"},
"License": {"Values": ["MIT"], "Type": "Class"},
}


def prepare_data():
if dataset == "cifar10":
X_train, y_train, X_test, y_test = get_cifar10(data_root)
elif dataset == "mnist":
X_train, y_train, X_test, y_test = get_mnist(data_root)
else:
return
generate_uploader(X_train, y_train, n_uploaders=n_uploaders, data_save_root=uploader_save_root)
generate_user(X_test, y_test, n_users=n_users, data_save_root=user_save_root)


def prepare_model():
dataloader = ImageDataLoader(data_save_root, train=True)
for i in range(n_uploaders):
logger.info("Train on uploader: %d" % (i))
X, y = dataloader.get_idx_data(i)
model = train(X, y, out_classes=n_classes)
model_save_path = os.path.join(model_save_root, "uploader_%d.pth" % (i))
torch.save(model.state_dict(), model_save_path)
logger.info("Model saved to '%s'" % (model_save_path))


def prepare_learnware(data_path, model_path, init_file_path, yaml_path, save_root, zip_name):
os.makedirs(save_root, exist_ok=True)
tmp_spec_path = os.path.join(save_root, "rkme.json")
tmp_model_path = os.path.join(save_root, "conv_model.pth")
tmp_yaml_path = os.path.join(save_root, "learnware.yaml")
tmp_init_path = os.path.join(save_root, "__init__.py")
tmp_model_file_path = os.path.join(save_root, "model.py")
mmodel_file_path = "./example_files/model.py"

# Computing the specification from the whole dataset is too costly.
X = np.load(data_path)
indices = np.random.choice(len(X), size=2000, replace=False)
X_sampled = X[indices]

st = time.time()
user_spec = RKMEImageSpecification(cuda_idx=0)
user_spec.generate_stat_spec_from_data(X=X_sampled)
ed = time.time()
logger.info("Stat spec generated in %.3f s" % (ed - st))
user_spec.save(tmp_spec_path)
copyfile(model_path, tmp_model_path)
copyfile(yaml_path, tmp_yaml_path)
copyfile(init_file_path, tmp_init_path)
copyfile(mmodel_file_path, tmp_model_file_path)
zip_file_name = os.path.join(learnware_pool_dir, "%s.zip" % (zip_name))
with zipfile.ZipFile(zip_file_name, "w", compression=zipfile.ZIP_DEFLATED) as zip_obj:
zip_obj.write(tmp_spec_path, "rkme.json")
zip_obj.write(tmp_model_path, "conv_model.pth")
zip_obj.write(tmp_yaml_path, "learnware.yaml")
zip_obj.write(tmp_init_path, "__init__.py")
zip_obj.write(tmp_model_file_path, "model.py")
rmtree(save_root)
logger.info("New Learnware Saved to %s" % (zip_file_name))
return zip_file_name


def prepare_market():
image_market = instantiate_learnware_market(market_id="cifar10", name="easy", rebuild=True)
try:
rmtree(learnware_pool_dir)
except:
pass
os.makedirs(learnware_pool_dir, exist_ok=True)
for i in tqdm(range(n_uploaders), total=n_uploaders, desc="Preparing..."):
data_path = os.path.join(uploader_save_root, "uploader_%d_X.npy" % (i))
model_path = os.path.join(model_save_root, "uploader_%d.pth" % (i))
init_file_path = "./example_files/example_init.py"
yaml_file_path = "./example_files/example_yaml.yaml"
new_learnware_path = prepare_learnware(
data_path, model_path, init_file_path, yaml_file_path, tmp_dir, "%s_%d" % (dataset, i)
)
semantic_spec = semantic_specs[0]
semantic_spec["Name"]["Values"] = "learnware_%d" % (i)
semantic_spec["Description"]["Values"] = "test_learnware_number_%d" % (i)
image_market.add_learnware(new_learnware_path, semantic_spec)

logger.info("Total Item: %d" % (len(image_market)))
curr_inds = image_market._get_ids()
logger.info("Available ids: " + str(curr_inds))


def test_search(gamma=0.1, load_market=True):
if load_market:
image_market = instantiate_learnware_market(market_id="cifar10", name="easy")
else:
prepare_market()
image_market = instantiate_learnware_market(market_id="cifar10", name="easy")
logger.info("Number of items in the market: %d" % len(image_market))

select_list = []
avg_list = []
improve_list = []
job_selector_score_list = []
ensemble_score_list = []
for i in tqdm(range(n_users), total=n_users, desc="Searching..."):
user_data_path = os.path.join(user_save_root, "user_%d_X.npy" % (i))
user_label_path = os.path.join(user_save_root, "user_%d_y.npy" % (i))
user_data = np.load(user_data_path)
user_label = np.load(user_label_path)
user_stat_spec = RKMEImageSpecification(cuda_idx=0)
user_stat_spec.generate_stat_spec_from_data(X=user_data, resize=False)
user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMETableSpecification": user_stat_spec})
logger.info("Searching Market for user: %d" % i)
search_result = image_market.search_learnware(user_info)
single_result = search_result.get_single_results()
acc_list = []
for idx, single_item in enumerate(single_result[:5]):
pred_y = single_item.learnware.predict(user_data)
acc = eval_prediction(pred_y, user_label)
acc_list.append(acc)
logger.info(
"Search rank: %d, score: %.3f, learnware_id: %s, acc: %.3f"
% (idx, single_item.score, single_item.learnware.id, acc)
)

# test reuse (job selector)
# reuse_baseline = JobSelectorReuser(learnware_list=mixture_learnware_list, herding_num=100)
# reuse_predict = reuse_baseline.predict(user_data=user_data)
# reuse_score = eval_prediction(reuse_predict, user_label)
# job_selector_score_list.append(reuse_score)
# print(f"mixture reuse loss: {reuse_score}")

# test reuse (ensemble)
single_learnware_list = [single_item.learnware for single_item in single_result]
reuse_ensemble = AveragingReuser(learnware_list=single_learnware_list[:3], mode="vote_by_prob")
ensemble_predict_y = reuse_ensemble.predict(user_data=user_data)
ensemble_score = eval_prediction(ensemble_predict_y, user_label)
ensemble_score_list.append(ensemble_score)
print(f"reuse accuracy (vote_by_prob): {ensemble_score}\n")

select_list.append(acc_list[0])
avg_list.append(np.mean(acc_list))
improve_list.append((acc_list[0] - np.mean(acc_list)) / np.mean(acc_list))

logger.info(
"Accuracy of selected learnware: %.3f +/- %.3f, Average performance: %.3f +/- %.3f"
% (np.mean(select_list), np.std(select_list), np.mean(avg_list), np.std(avg_list))
)
logger.info(
"Ensemble Reuse Performance: %.3f +/- %.3f" % (np.mean(ensemble_score_list), np.std(ensemble_score_list))
)


if __name__ == "__main__":
logger.info("=" * 40)
logger.info(f"n_uploaders:\t{n_uploaders}")
logger.info(f"n_users:\t{n_users}")
logger.info("=" * 40)

prepare_data()
prepare_model()
test_search(load_market=False)

+ 82
- 0
examples/dataset_image_workflow/model.py View File

@@ -0,0 +1,82 @@
from torch import nn


class ConvModel(nn.Module):
def __init__(
self,
channel,
n_random_features,
net_width=64,
net_depth=3,
net_act="relu",
net_norm="batchnorm",
net_pooling="avgpooling",
im_size=(32, 32),
):
super().__init__()
self.features, shape_feat = self._make_layers(
channel, net_width, net_depth, net_norm, net_act, net_pooling, im_size
)
num_feat = shape_feat[0] * shape_feat[1] * shape_feat[2]
self.classifier = nn.Linear(num_feat, n_random_features)

def forward(self, x):
out = self.features(x)
out = out.reshape(out.size(0), -1)
out = self.classifier(out)
return out

def _get_activation(self, net_act):
if net_act == "sigmoid":
return nn.Sigmoid()
elif net_act == "relu":
return nn.ReLU(inplace=True)
elif net_act == "leakyrelu":
return nn.LeakyReLU(negative_slope=0.01)
elif net_act == "gelu":
return nn.SiLU()
else:
raise Exception("unknown activation function: %s" % net_act)

def _get_pooling(self, net_pooling):
if net_pooling == "maxpooling":
return nn.MaxPool2d(kernel_size=2, stride=2)
elif net_pooling == "avgpooling":
return nn.AvgPool2d(kernel_size=2, stride=2)
elif net_pooling == "none":
return None
else:
raise Exception("unknown net_pooling: %s" % net_pooling)

def _get_normlayer(self, net_norm, shape_feat):
if net_norm == "batchnorm":
return nn.BatchNorm2d(shape_feat[0], affine=True)
elif net_norm == "layernorm":
return nn.LayerNorm(shape_feat, elementwise_affine=True)
elif net_norm == "instancenorm":
return nn.GroupNorm(shape_feat[0], shape_feat[0], affine=True)
elif net_norm == "groupnorm":
return nn.GroupNorm(4, shape_feat[0], affine=True)
elif net_norm == "none":
return None
else:
raise Exception("unknown net_norm: %s" % net_norm)

def _make_layers(self, channel, net_width, net_depth, net_norm, net_act, net_pooling, im_size):
layers = []
in_channels = channel
shape_feat = [in_channels, im_size[0], im_size[1]]
for d in range(net_depth):
layers += [nn.Conv2d(in_channels, net_width, kernel_size=3, padding="same")]

shape_feat[0] = net_width
if net_norm != "none":
layers += [self._get_normlayer(net_norm, shape_feat)]
layers += [self._get_activation(net_act)]
in_channels = net_width
if net_pooling != "none":
layers += [self._get_pooling(net_pooling)]
shape_feat[1] //= 2
shape_feat[2] //= 2

return nn.Sequential(*layers), shape_feat

+ 76
- 156
examples/dataset_image_workflow/utils.py View File

@@ -1,174 +1,94 @@
import os
import torch
import numpy as np
import random
import math
from torch import optim, nn
from torch.utils.data import DataLoader, Dataset

from learnware.utils import choose_device


@torch.no_grad()
def evaluate(model, evaluate_set: Dataset, device=None, distribution=True):
device = choose_device(0) if device is None else device

if isinstance(model, nn.Module):
model.eval()
mapping = lambda m, x: m(x)
else:
mapping = lambda m, x: m.predict(x)

criterion = nn.CrossEntropyLoss(reduction="sum")
total, correct, loss = 0, 0, torch.as_tensor(0.0, dtype=torch.float32, device=device)
dataloader = DataLoader(evaluate_set, batch_size=1024, shuffle=True)
for i, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
out = mapping(model, X)
if not torch.is_tensor(out):
out = torch.from_numpy(out).to(device)

if distribution:
loss += criterion(out, y)
_, predicted = torch.max(out.data, 1)
else:
predicted = out

import torch
import torch.nn as nn
import torch.optim as optim
total += y.size(0)
correct += (predicted == y).sum().item()

from example_files.model import ConvModel
acc = correct / total * 100
loss = loss / total

if isinstance(model, nn.Module):
model.train()

class ImageDataLoader:
def __init__(self, data_root, train: bool = True):
self.data_root = data_root
self.train = train
return loss.item(), acc

def get_idx_data(self, idx=0):
if self.train:
X_path = os.path.join(self.data_root, "uploader", "uploader_%d_X.npy" % (idx))
y_path = os.path.join(self.data_root, "uploader", "uploader_%d_y.npy" % (idx))
if not (os.path.exists(X_path) and os.path.exists(y_path)):
raise Exception("Index Error")
X = np.load(X_path)
y = np.load(y_path)
else:
X_path = os.path.join(self.data_root, "user", "user_%d_X.npy" % (idx))
y_path = os.path.join(self.data_root, "user", "user_%d_y.npy" % (idx))
if not (os.path.exists(X_path) and os.path.exists(y_path)):
raise Exception("Index Error")
X = np.load(X_path)
y = np.load(y_path)
return X, y


def generate_uploader(data_x, data_y, n_uploaders=50, data_save_root=None):
if data_save_root is None:
return
os.makedirs(data_save_root, exist_ok=True)
for i in range(n_uploaders):
random_class_num = random.randint(6, 10)
cls_indx = list(range(10))
random.shuffle(cls_indx)
selected_cls_indx = cls_indx[:random_class_num]
rest_cls_indx = cls_indx[random_class_num:]
selected_data_indx = []
for cls in selected_cls_indx:
data_indx = list(torch.where(data_y == cls)[0])
# print(type(data_indx))
random.shuffle(data_indx)
data_num = random.randint(800, 2000)
selected_indx = data_indx[:data_num]
selected_data_indx = selected_data_indx + selected_indx
for cls in rest_cls_indx:
flag = random.randint(0, 1)
if flag == 0:
continue
data_indx = list(torch.where(data_y == cls)[0])
random.shuffle(data_indx)
data_num = random.randint(20, 80)
selected_indx = data_indx[:data_num]
selected_data_indx = selected_data_indx + selected_indx
selected_X = data_x[selected_data_indx].numpy()
selected_y = data_y[selected_data_indx].numpy()
print(selected_X.dtype, selected_y.dtype)
print(selected_X.shape, selected_y.shape)
X_save_dir = os.path.join(data_save_root, "uploader_%d_X.npy" % (i))
y_save_dir = os.path.join(data_save_root, "uploader_%d_y.npy" % (i))
np.save(X_save_dir, selected_X)
np.save(y_save_dir, selected_y)
print("Saving to %s" % (X_save_dir))


def generate_user(data_x, data_y, n_users=50, data_save_root=None):
if data_save_root is None:
return
os.makedirs(data_save_root, exist_ok=True)
for i in range(n_users):
random_class_num = random.randint(3, 6)
cls_indx = list(range(10))
random.shuffle(cls_indx)
selected_cls_indx = cls_indx[:random_class_num]
selected_data_indx = []
for cls in selected_cls_indx:
data_indx = list(torch.where(data_y == cls)[0])
# print(type(data_indx))
random.shuffle(data_indx)
data_num = random.randint(150, 350)
selected_indx = data_indx[:data_num]
selected_data_indx = selected_data_indx + selected_indx
# print('Total Index:', len(selected_data_indx))
selected_X = data_x[selected_data_indx].numpy()
selected_y = data_y[selected_data_indx].numpy()
print(selected_X.shape, selected_y.shape)
X_save_dir = os.path.join(data_save_root, "user_%d_X.npy" % (i))
y_save_dir = os.path.join(data_save_root, "user_%d_y.npy" % (i))
np.save(X_save_dir, selected_X)
np.save(y_save_dir, selected_y)
print("Saving to %s" % (X_save_dir))


# Train Uploaders' models
def train(X, y, out_classes, epochs=35, batch_size=128):
print(X.shape, y.shape)
input_feature = X.shape[1]
data_size = X.shape[0]
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ConvModel(channel=input_feature, n_random_features=out_classes).to(device)
model.train()

# Adam optimizer with learning rate 1e-3
# optimizer = optim.Adam(model.parameters(), lr=1e-3)
def train_model(
model: nn.Module,
train_set: Dataset,
valid_set: Dataset,
save_path: str,
epochs=35,
batch_size=128,
device=None,
verbose=True,
):
device = choose_device(0) if device is None else device

# SGD optimizer with learning rate 1e-2
model.train()
optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.9)

# mean-squared error loss
criterion = nn.CrossEntropyLoss()
dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
best_loss = 100000

for epoch in range(epochs):
running_loss = []
indx = list(range(data_size))
random.shuffle(indx)
curr_X = X[indx]
curr_y = y[indx]
for i in range(math.floor(data_size / batch_size)):
inputs, annos = curr_X[i * batch_size : (i + 1) * batch_size], curr_y[i * batch_size : (i + 1) * batch_size]
inputs = torch.from_numpy(inputs).to(device)
annos = torch.from_numpy(annos).to(device)
# print(inputs.dtype, annos.dtype)
out = model(inputs)
model.train()
for i, (X, y) in enumerate(dataloader):
X, y = X.to(device=device), y.to(device=device)
optimizer.zero_grad()
loss = criterion(out, annos)
out = model(X)
loss = criterion(out, y)
loss.backward()
optimizer.step()
running_loss.append(loss.item())
# print('Epoch: %d, Average Loss: %.3f'%(epoch+1, np.mean(running_loss)))

# Train Accuracy
acc = test(X, y, model)
model.train()
return model


def test(test_X, test_y, model, batch_size=128):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.eval()
total, correct = 0, 0
data_size = test_X.shape[0]
for i in range(math.ceil(data_size / batch_size)):
inputs, annos = test_X[i * batch_size : (i + 1) * batch_size], test_y[i * batch_size : (i + 1) * batch_size]
inputs = torch.Tensor(inputs).to(device)
annos = torch.Tensor(annos).to(device)
out = model(inputs)
_, predicted = torch.max(out.data, 1)
total += annos.size(0)
correct += (predicted == annos).sum().item()
acc = correct / total * 100
print("Accuracy: %.2f" % (acc))
return acc


def eval_prediction(pred_y, target_y):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if not isinstance(pred_y, np.ndarray):
pred_y = pred_y.detach().cpu().numpy()
predicted = np.argmax(pred_y, 1)
# print(predicted)
# annos = torch.from_numpy(target_y).to(device)
annos = target_y
total = annos.shape[0]
correct = (predicted == annos).sum().item()
criterion = nn.CrossEntropyLoss()
return correct / total
valid_loss, valid_acc = evaluate(model, valid_set, device=device)
train_loss, train_acc = evaluate(model, train_set, device=device)
if valid_loss < best_loss:
best_loss = valid_loss

torch.save(model.state_dict(), save_path)
if verbose:
print("Epoch: {}, Valid Best Accuracy: {:.3f}% ({:.3f})".format(epoch + 1, valid_acc, valid_loss))
if valid_acc > 99.0:
if verbose:
print("Early Stopping at 99% !")
break

if verbose and (epoch + 1) % 5 == 0:
print(
"Epoch: {}, Train Average Loss: {:.3f}, Accuracy {:.3f}%, Valid Average Loss: {:.3f}".format(
epoch + 1, np.mean(running_loss), train_acc, valid_loss
)
)

+ 258
- 0
examples/dataset_image_workflow/workflow.py View File

@@ -0,0 +1,258 @@
import os
import fire
import time
import torch
import pickle
import random
import tempfile
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset

from learnware.utils import choose_device
from learnware.client import LearnwareClient
from learnware.logger import get_module_logger
from learnware.specification import generate_stat_spec
from learnware.tests.benchmarks import LearnwareBenchmark
from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.reuse import JobSelectorReuser, AveragingReuser, EnsemblePruningReuser
from model import ConvModel
from utils import train_model, evaluate
from config import image_benchmark_config

logger = get_module_logger("image_workflow", level="INFO")


class ImageDatasetWorkflow:
def _plot_labeled_peformance_curves(self, all_user_curves_data):
plt.figure(figsize=(10, 6))
plt.xticks(range(len(self.n_labeled_list)), self.n_labeled_list)

styles = [
{"color": "navy", "linestyle": "-", "marker": "o"},
{"color": "magenta", "linestyle": "-.", "marker": "d"},
]
labels = ["User Model", "Multiple Learnware Reuse (EnsemblePrune)"]

user_array, pruning_array = all_user_curves_data
for array, style, label in zip([user_array, pruning_array], styles, labels):
mean_curve = np.array([item[0] for item in array])
std_curve = np.array([item[1] for item in array])
plt.plot(mean_curve, **style, label=label)
plt.fill_between(
range(len(mean_curve)),
mean_curve - std_curve,
mean_curve + std_curve,
color=style["color"],
alpha=0.2,
)

plt.xlabel("Amout of Labeled User Data", fontsize=14)
plt.ylabel("1 - Accuracy", fontsize=14)
plt.title(f"Results on Image Experimental Scenario", fontsize=16)
plt.legend(fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(self.fig_path, "image_labeled_curves.svg"), bbox_inches="tight", dpi=700)

def _prepare_market(self, rebuild=False):
client = LearnwareClient()
self.image_benchmark = LearnwareBenchmark().get_benchmark(image_benchmark_config)
self.image_market = instantiate_learnware_market(market_id=self.image_benchmark.name, rebuild=rebuild)
self.user_semantic = client.get_semantic_specification(self.image_benchmark.learnware_ids[0])
self.user_semantic["Name"]["Values"] = ""

if len(self.image_market) == 0 or rebuild == True:
for learnware_id in self.image_benchmark.learnware_ids:
with tempfile.TemporaryDirectory(prefix="image_benchmark_") as tempdir:
zip_path = os.path.join(tempdir, f"{learnware_id}.zip")
for i in range(20):
try:
semantic_spec = client.get_semantic_specification(learnware_id)
client.download_learnware(learnware_id, zip_path)
self.image_market.add_learnware(zip_path, semantic_spec)
break
except:
time.sleep(1)
continue

logger.info("Total Item: %d" % (len(self.image_market)))

def image_example(self, rebuild=False):
np.random.seed(1)
random.seed(1)
self._prepare_market(rebuild)
self.n_labeled_list = [100, 200, 500, 1000, 2000, 4000]
self.repeated_list = [10, 10, 10, 3, 3, 3]
device = choose_device(0)

self.root_path = os.path.dirname(os.path.abspath(__file__))
self.fig_path = os.path.join(self.root_path, "figs")
self.curve_path = os.path.join(self.root_path, "curves")
self.model_path = os.path.join(self.root_path, "models")
os.makedirs(self.fig_path, exist_ok=True)
os.makedirs(self.curve_path, exist_ok=True)
os.makedirs(self.model_path, exist_ok=True)

select_list = []
avg_list = []
best_list = []
improve_list = []
job_selector_score_list = []
ensemble_score_list = []
all_learnwares = self.image_market.get_learnwares()

for i in range(self.image_benchmark.user_num):
test_x, test_y = self.image_benchmark.get_test_data(user_ids=i)
train_x, train_y = self.image_benchmark.get_train_data(user_ids=i)

test_x = torch.from_numpy(test_x)
test_y = torch.from_numpy(test_y)
test_dataset = TensorDataset(test_x, test_y)

user_stat_spec = generate_stat_spec(type="image", X=test_x, whitening=False)
user_info = BaseUserInfo(semantic_spec=self.user_semantic, stat_info={user_stat_spec.type: user_stat_spec})
logger.info("Searching Market for user: %d" % (i))

search_result = self.image_market.search_learnware(user_info)
single_result = search_result.get_single_results()
multiple_result = search_result.get_multiple_results()

print(f"search result of user{i}:")
print(
f"single model num: {len(single_result)}, max_score: {single_result[0].score}, min_score: {single_result[-1].score}"
)

acc_list = []
for idx in range(len(all_learnwares)):
learnware = all_learnwares[idx]
loss, acc = evaluate(learnware, test_dataset)
acc_list.append(acc)

learnware = single_result[0].learnware
best_loss, best_acc = evaluate(learnware, test_dataset)
best_list.append(np.max(acc_list))
select_list.append(best_acc)
avg_list.append(np.mean(acc_list))
improve_list.append((best_acc - np.mean(acc_list)) / np.mean(acc_list))
print(f"market mean accuracy: {np.mean(acc_list)}, market best accuracy: {np.max(acc_list)}")
print(
f"Top1-score: {single_result[0].score}, learnware_id: {single_result[0].learnware.id}, acc: {best_acc}"
)

if len(multiple_result) > 0:
mixture_id = " ".join([learnware.id for learnware in multiple_result[0].learnwares])
print(f"mixture_score: {multiple_result[0].score}, mixture_learnware: {mixture_id}")
mixture_learnware_list = multiple_result[0].learnwares
else:
mixture_learnware_list = [single_result[0].learnware]

# test reuse (job selector)
reuse_job_selector = JobSelectorReuser(learnware_list=mixture_learnware_list, use_herding=False)
job_loss, job_acc = evaluate(reuse_job_selector, test_dataset)
job_selector_score_list.append(job_acc)
print(f"mixture reuse accuracy (job selector): {job_acc}")

# test reuse (ensemble)
reuse_ensemble = AveragingReuser(learnware_list=mixture_learnware_list, mode="vote_by_prob")
ensemble_loss, ensemble_acc = evaluate(reuse_ensemble, test_dataset)
ensemble_score_list.append(ensemble_acc)
print(f"mixture reuse accuracy (ensemble): {ensemble_acc}\n")

user_model_score_mat = []
pruning_score_mat = []
single_score_mat = []

for n_label, repeated in zip(self.n_labeled_list, self.repeated_list):
user_model_score_list, reuse_pruning_score_list = [], []
if n_label > len(train_x):
n_label = len(train_x)
for _ in range(repeated):
x_train, y_train = zip(*random.sample(list(zip(train_x, train_y)), k=n_label))
x_train = np.array(list(x_train))
y_train = np.array(list(y_train))

x_train = torch.from_numpy(x_train)
y_train = torch.from_numpy(y_train)
sampled_dataset = TensorDataset(x_train, y_train)

mode_save_path = os.path.abspath(os.path.join(self.model_path, "model.pth"))
model = ConvModel(
channel=x_train.shape[1], im_size=(x_train.shape[2], x_train.shape[3]), n_random_features=10
).to(device)
train_model(
model,
sampled_dataset,
sampled_dataset,
mode_save_path,
epochs=35,
batch_size=128,
device=device,
verbose=False,
)
model.load_state_dict(torch.load(mode_save_path))
_, user_model_acc = evaluate(model, test_dataset, distribution=True)
user_model_score_list.append(user_model_acc)

reuse_pruning = EnsemblePruningReuser(learnware_list=mixture_learnware_list, mode="classification")
reuse_pruning.fit(x_train, y_train)
_, pruning_acc = evaluate(reuse_pruning, test_dataset, distribution=False)
reuse_pruning_score_list.append(pruning_acc)

single_score_mat.append([best_acc] * repeated)
user_model_score_mat.append(user_model_score_list)
pruning_score_mat.append(reuse_pruning_score_list)
print(
f"user_label_num: {n_label}, user_acc: {np.mean(user_model_score_mat[-1])}, pruning_acc: {np.mean(pruning_score_mat[-1])}"
)

logger.info(f"Saving Curves for User_{i}")
user_curves_data = (single_score_mat, user_model_score_mat, pruning_score_mat)
with open(os.path.join(self.curve_path, f"curve{str(i)}.pkl"), "wb") as f:
pickle.dump(user_curves_data, f)

logger.info(
"Accuracy of selected learnware: %.3f +/- %.3f, Average performance: %.3f +/- %.3f, Best performance: %.3f +/- %.3f"
% (
np.mean(select_list),
np.std(select_list),
np.mean(avg_list),
np.std(avg_list),
np.mean(best_list),
np.std(best_list),
)
)
logger.info("Average performance improvement: %.3f" % (np.mean(improve_list)))
logger.info(
"Average Job Selector Reuse Performance: %.3f +/- %.3f"
% (np.mean(job_selector_score_list), np.std(job_selector_score_list))
)
logger.info(
"Averaging Ensemble Reuse Performance: %.3f +/- %.3f"
% (np.mean(ensemble_score_list), np.std(ensemble_score_list))
)

pruning_curves_data, user_model_curves_data = [], []
total_user_model_score_mat = [np.zeros(self.repeated_list[i]) for i in range(len(self.n_labeled_list))]
total_pruning_score_mat = [np.zeros(self.repeated_list[i]) for i in range(len(self.n_labeled_list))]
for user_idx in range(self.image_benchmark.user_num):
with open(os.path.join(self.curve_path, f"curve{str(user_idx)}.pkl"), "rb") as f:
user_curves_data = pickle.load(f)
(single_score_mat, user_model_score_mat, pruning_score_mat) = user_curves_data

for i in range(len(self.n_labeled_list)):
total_user_model_score_mat[i] += 1 - np.array(user_model_score_mat[i]) / 100
total_pruning_score_mat[i] += 1 - np.array(pruning_score_mat[i]) / 100

for i in range(len(self.n_labeled_list)):
total_user_model_score_mat[i] /= self.image_benchmark.user_num
total_pruning_score_mat[i] /= self.image_benchmark.user_num
user_model_curves_data.append(
(np.mean(total_user_model_score_mat[i]), np.std(total_user_model_score_mat[i]))
)
pruning_curves_data.append((np.mean(total_pruning_score_mat[i]), np.std(total_pruning_score_mat[i])))

self._plot_labeled_peformance_curves([user_model_curves_data, pruning_curves_data])


if __name__ == "__main__":
fire.Fire(ImageDatasetWorkflow)

+ 3
- 1
learnware/reuse/ensemble_pruning.py View File

@@ -148,7 +148,9 @@ class EnsemblePruningReuser(BaseReuser):
import geatpy as ea
except ModuleNotFoundError:
raise ModuleNotFoundError(f"EnsemblePruningReuser is not available because 'geatpy' is not installed! Please install it manually (only support python_version<3.11).")

if torch.is_tensor(v_true):
v_true = v_true.detach().cpu().numpy()

model_num = v_predict.shape[1]



+ 14
- 6
learnware/reuse/job_selector.py View File

@@ -59,8 +59,11 @@ class JobSelectorReuser(BaseReuser):
for idx in range(len(self.learnware_list)):
data_idx_list = np.where(select_result == idx)[0]
if len(data_idx_list) > 0:
# pred_y = self.learnware_list[idx].predict(raw_user_data[data_idx_list])
pred_y = self.learnware_list[idx].predict([raw_user_data[i] for i in data_idx_list])
if isinstance(raw_user_data, list):
pred_y = self.learnware_list[idx].predict([raw_user_data[i] for i in data_idx_list])
else:
pred_y = self.learnware_list[idx].predict(raw_user_data[data_idx_list])

if isinstance(pred_y, torch.Tensor):
pred_y = pred_y.detach().cpu().numpy()
# elif isinstance(pred_y, tf.Tensor):
@@ -89,6 +92,9 @@ class JobSelectorReuser(BaseReuser):
user_data : np.ndarray
User's raw data.
"""
if torch.is_tensor(user_data):
user_data = user_data.detach().cpu().numpy()

if len(self.learnware_list) == 1:
# user_data_num = user_data.shape[0]
user_data_num = len(user_data)
@@ -118,9 +124,9 @@ class JobSelectorReuser(BaseReuser):
task_spec = learnware_rkme_spec_list[i]
if self.use_herding:
task_herding_num = max(5, int(self.herding_num * task_mixture_weight[i]))
herding_X_i = task_spec.herding(task_herding_num).detach().cpu().numpy()
herding_X_i = task_spec.herding(task_herding_num)
else:
herding_X_i = task_spec.z.detach().cpu().numpy()
herding_X_i = task_spec.get_z()
task_herding_num = herding_X_i.shape[0]
task_val_num = task_herding_num // 5

@@ -223,8 +229,10 @@ class JobSelectorReuser(BaseReuser):
try:
from lightgbm import LGBMClassifier, early_stopping
except ModuleNotFoundError:
raise ModuleNotFoundError(f"JobSelectorReuser is not available because 'lightgbm' is not installed! Please install it manually.")
raise ModuleNotFoundError(
f"JobSelectorReuser is not available because 'lightgbm' is not installed! Please install it manually."
)

score_best = -1
learning_rate = [0.01]
max_depth = [66]


+ 1
- 1
learnware/specification/regular/image/rkme.py View File

@@ -366,7 +366,7 @@ class RKMEImageSpecification(RegularStatSpecification):
indices = torch.multinomial(self.beta, T, replacement=True)
mock = self.z[indices] + torch.randn_like(self.z[indices]) * 0.01

return mock.numpy()
return mock.detach().cpu().numpy()

def _sampling_candidates(self, N: int) -> np.ndarray:
raise NotImplementedError()


+ 4
- 2
learnware/specification/regular/table/rkme.py View File

@@ -411,7 +411,7 @@ class RKMETableSpecification(RegularStatSpecification):
S_shape = tuple([S.shape[0]] + list(Z_shape)[1:])
S = S.reshape(S_shape)

return S
return S.detach().cpu().numpy()

def save(self, filepath: str):
"""Save the computed RKME specification to a specified path in JSON format.
@@ -457,7 +457,9 @@ class RKMETableSpecification(RegularStatSpecification):
for d in self.get_states():
if d in rkme_load.keys():
if d == "type" and rkme_load[d] != self.type:
raise TypeError(f"The type of loaded RKME ({rkme_load[d]}) is different from the expected type ({self.type})!")
raise TypeError(
f"The type of loaded RKME ({rkme_load[d]}) is different from the expected type ({self.type})!"
)
setattr(self, d, rkme_load[d])




Loading…
Cancel
Save