From 3d63065cfde0e695564efe8857e2bd9ff7c0d359 Mon Sep 17 00:00:00 2001 From: shihy Date: Wed, 29 Nov 2023 19:46:41 +0800 Subject: [PATCH] [ENH] Adding the basic framework --- .../benchmarks/__init__.py | 0 .../benchmarks/build_market.py | 114 ++++++++++++++++++ .../benchmarks/evaluate_market.py | 75 ++++++++++++ .../dataset/__init__.py | 0 .../dataset_cifar_workflow/dataset/data.py | 10 ++ .../dataset_cifar_workflow/dataset/utils.py | 45 +++++++ examples/dataset_cifar_workflow/main.py | 0 .../dataset_cifar_workflow/models/__init__.py | 0 .../dataset_cifar_workflow/models/config.yaml | 8 ++ .../models/conv/__init__.py | 26 ++++ .../models/conv/model.py | 71 +++++++++++ .../dataset_cifar_workflow/models/train.py | 0 12 files changed, 349 insertions(+) create mode 100644 examples/dataset_cifar_workflow/benchmarks/__init__.py create mode 100644 examples/dataset_cifar_workflow/benchmarks/build_market.py create mode 100644 examples/dataset_cifar_workflow/benchmarks/evaluate_market.py create mode 100644 examples/dataset_cifar_workflow/dataset/__init__.py create mode 100644 examples/dataset_cifar_workflow/dataset/data.py create mode 100644 examples/dataset_cifar_workflow/dataset/utils.py create mode 100644 examples/dataset_cifar_workflow/main.py create mode 100644 examples/dataset_cifar_workflow/models/__init__.py create mode 100644 examples/dataset_cifar_workflow/models/config.yaml create mode 100644 examples/dataset_cifar_workflow/models/conv/__init__.py create mode 100644 examples/dataset_cifar_workflow/models/conv/model.py create mode 100644 examples/dataset_cifar_workflow/models/train.py diff --git a/examples/dataset_cifar_workflow/benchmarks/__init__.py b/examples/dataset_cifar_workflow/benchmarks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dataset_cifar_workflow/benchmarks/build_market.py b/examples/dataset_cifar_workflow/benchmarks/build_market.py new file mode 100644 index 0000000..e9bf46a --- /dev/null +++ b/examples/dataset_cifar_workflow/benchmarks/build_market.py @@ -0,0 +1,114 @@ +import copy +import os +import zipfile +from shutil import copyfile, rmtree + +import learnware +import numpy as np +import tqdm +import yaml +from learnware import specification +from learnware.market import EasyMarket + +from preprocess.dataloader import ImageDataLoader + +user_semantic = { + "Data": {"Values": ["Image"], "Type": "Class"}, + "Task": { + "Values": ["Classification"], + "Type": "Class", + }, + "Library": {"Values": ["Scikit-learn"], "Type": "Class"}, + "Scenario": {"Values": [], "Type": "Tag"}, + "Description": {"Values": "", "Type": "String"}, + "Name": {"Values": "", "Type": "String"}, + "Output": {"Values": "", "Dimension": 0} +} + + +def build_from_preprocessed(args, regenerate=True): + zip_path_list = [] + data_root = os.path.join(args.data_root, 'learnware_market_data', "{}_{:d}".format(args.data, args.data_id)) + dataloader = ImageDataLoader(data_root, args.n_uploaders, train=True) + + market_root = args.market_root + for i, (train_X, train_y, val_X, val_y) in tqdm.tqdm(enumerate(dataloader), total=args.n_uploaders): + dir_path = os.path.join(market_root, args.data, "{}_{:d}".format(args.spec, args.id), "learnware_{:d}".format(i)) + os.makedirs(dir_path, exist_ok=True) + + if not regenerate: + zip_path_list.append(dir_path + ".zip") + continue + + # print("Preparing Learnware {:d} with {:s} specification".format(i, args.spec)) + # Copy Model File + model_file = os.path.join(dir_path, "model.pth") + copyfile(os.path.join(data_root, "models", "uploader_{:d}.pth".format(i)), + model_file) + + # Make Specification + if args.spec == "rbf": + spec = specification.utils.generate_rkme_spec(X=train_X, reduced_set_size=args.K, gamma=0.1, cuda_idx=args.cuda_idx) + elif args.spec == "ntk": + spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i, **args.__dict__) + spec.generate_stat_spec_from_data(val_X, K=args.K, steps=args.ntk_steps, reduce=True, whitening=False) + else: + raise NotImplementedError("Not Support", args.spec) + spec.save(os.path.join(dir_path, "spec.json")) + + # Copy __init__.py and learnware_yaml + init_file = os.path.join(dir_path, "__init__.py") + yaml_file = os.path.join(dir_path, "learnware.yaml") + copyfile( + os.path.join(market_root, "learnware_example", + "conv.py"), init_file + ) # cp conv.py init_file + + with open(os.path.join(market_root, "learnware_example", + "{}.yaml".format(args.spec)), "r") as yaml_templet,\ + open(yaml_file, "w") as yaml_target: + + yaml_content = yaml.load(yaml_templet, Loader=yaml.FullLoader) + + yaml_content["model"]["kwargs"]["device"] = str(choose_device(args.cuda_idx)) + yaml_content["model"]["kwargs"]["input_channel"] = train_X.shape[1] + if args.spec == "ntk": + yaml_content["stat_specifications"][0]["kwargs"] = copy.deepcopy(args.__dict__) + + yaml.dump(yaml_content, yaml_target) + + + zip_file = dir_path + ".zip" + # zip -q -r -j zip_file dir_path + with zipfile.ZipFile(zip_file, "w") as zip_obj: + for foldername, subfolders, filenames in os.walk(dir_path): + for filename in filenames: + file_path = os.path.join(foldername, filename) + zip_info = zipfile.ZipInfo(filename) + zip_info.compress_type = zipfile.ZIP_STORED + with open(file_path, "rb") as file: + zip_obj.writestr(zip_info, file.read()) + + rmtree(dir_path) # rm -r dir_path + zip_path_list.append(zip_file) + + return zip_path_list + +def upload_to_easy_market(args, zip_path_list, market_id=None): + learnware.init() + np.random.seed(2023) + market_id = market_id if market_id else "NTK-RF-{:d}".format(args.id) + market = DummyMarket(market_id=market_id, rebuild=True) + + for idx, zip_path in enumerate(zip_path_list): + semantic_spec = copy.deepcopy(user_semantic) + semantic_spec["Name"]["Values"] = "learnware_{:d}".format(idx) + semantic_spec["Description"]["Values"] = "test_learnware_number_{:d}".format(idx) + semantic_spec["Scenario"]["Values"] = [args.data] + semantic_spec["Output"]['Dimension'] = 10 + market.add_learnware(zip_path, semantic_spec) + + logger = get_custom_logger() + logger.debug("Total Item: {:d}".format(len(market))) + + return market \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/benchmarks/evaluate_market.py b/examples/dataset_cifar_workflow/benchmarks/evaluate_market.py new file mode 100644 index 0000000..8fb5b87 --- /dev/null +++ b/examples/dataset_cifar_workflow/benchmarks/evaluate_market.py @@ -0,0 +1,75 @@ +import os +import random +from time import sleep +from typing import Dict + +import learnware +import numpy as np +import torch.random +from learnware import specification +from learnware.market import BaseUserInfo +from tqdm import tqdm + +from build_market import user_semantic +from preprocess.dataloader import ImageDataLoader +from utils.clerk import Clerk, get_custom_logger +from utils.reuse import AveragingReuser + + +def evaluate_market_performance(args, market, clerk: Clerk=None, regenerate=True) -> Dict: + logger = get_custom_logger() + + data_root = os.path.join(args.data_root, 'learnware_market_data', "{}_{:d}".format(args.data, args.data_id)) + dataloader = ImageDataLoader(data_root, args.n_users, train=False) + acc = [] + + market_root = args.market_root + # shuffled = list(enumerate(dataloader)) + # random.shuffle(shuffled) + for i, (test_X, test_y) in enumerate(dataloader): + dir_path = os.path.join(market_root, args.data, "{}_{:d}".format(args.spec, args.id), "user_{:d}".format(i)) + os.makedirs(dir_path, exist_ok=True) + + if regenerate: + if args.spec == "rbf": + stat_spec = specification.utils.generate_rkme_spec(X=test_X, reduced_set_size=args.K, gamma=0.1, cuda_idx=args.cuda_idx) + elif args.spec == "ntk": + stat_spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i+args.n_uploaders, **args.__dict__) + stat_spec.generate_stat_spec_from_data(test_X, reduce=True, steps=args.ntk_steps, K=args.K, whitening=False) + else: + raise NotImplementedError() + # Save User's spec to disk + stat_spec.save(os.path.join(dir_path, "spec.json")) + else: + if args.spec == "rbf": + stat_spec = specification.RKMEStatSpecification(gamma=0.1, cuda_idx=args.cuda_idx) + elif args.spec == "ntk": + stat_spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i+args.n_uploaders, cache=False, **args.__dict__) + else: + raise NotImplementedError() + # Load User's spec from disk + stat_spec.load(os.path.join(dir_path, "spec.json")) + + user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMEStatSpecification": stat_spec}) + + sorted_score_list, single_learnware_list, _, _= market.search_learnware(user_info, max_search_num=args.max_search_num) + + reuse_ensemble = AveragingReuser(learnware_list=single_learnware_list, mode="vote") + ensemble_predict_y = np.argmax(reuse_ensemble.predict(user_data=test_X), axis=-1) + + curr_acc = np.mean(ensemble_predict_y == test_y) + acc.append(curr_acc) + if clerk: + clerk.rkme_performance(curr_acc) + + logger.debug("Accuracy for user {:d}: {:.3f}; {:.3f} on average up to now.".format(i, curr_acc, np.mean(acc))) + + logger.info("Accuracy {:.3f}({:.3f})".format(np.mean(acc), np.std(acc))) + + return { + "Accuracy": { + "Mean": np.mean(acc), + "Std": np.std(acc), + "All": acc + } + } \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/dataset/__init__.py b/examples/dataset_cifar_workflow/dataset/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dataset_cifar_workflow/dataset/data.py b/examples/dataset_cifar_workflow/dataset/data.py new file mode 100644 index 0000000..3c47d88 --- /dev/null +++ b/examples/dataset_cifar_workflow/dataset/data.py @@ -0,0 +1,10 @@ +from torchvision import datasets + + +def cifar10(split="uploader"): + assert(split in {"uploader", "user"}) + + if split == "uploader": + dataset = datasets.CIFAR10(root="cache", download=True, train=True) + else: + dataset = datasets.CIFAR10(root="cache", download=True, train=False) \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/dataset/utils.py b/examples/dataset_cifar_workflow/dataset/utils.py new file mode 100644 index 0000000..04b0608 --- /dev/null +++ b/examples/dataset_cifar_workflow/dataset/utils.py @@ -0,0 +1,45 @@ +import random +from functools import reduce + +import numpy as np +import torch +from torch.utils.data import TensorDataset + + +def sample_by_labels(labels: torch.Tensor, weights, total_num): + weights = np.asarray(weights) + + norm_factor = np.sum(weights) + last_non_zero = np.argwhere(weights > 0)[-1].item() + category_nums = [int(w * total_num / norm_factor) for w in weights[:last_non_zero]] + category_nums += [total_num - sum(category_nums)] + category_nums += [0] * (weights.shape[0] - last_non_zero - 1) + + selected_cls_indexes = [ + random.sample(list(torch.where(labels == c)[0]), k=n) + for c, n in enumerate(category_nums) + ] + + return selected_cls_indexes + + +USER_WEIGHTS = [3, 3, 1, 1, 1, 1, 0, 0, 0, 0] +UPLOADER_WEIGHTS = [4, 4, 1, 1, 0, 0, 0, 0, 0, 0] + +def split_dataset(data_x, data_y, size, split="uploader"): + if split == "uploader": + weights = np.asarray(UPLOADER_WEIGHTS) + elif split == "user": + weights = np.asarray(USER_WEIGHTS) + else: + raise Exception(split) + + order = list(range(len(weights))) + random.shuffle(order) + + selected_data_indexes = reduce(lambda x, y: x+y, sample_by_labels(data_y, weights[order], size)) + selected_data_indexes = torch.stack(selected_data_indexes) + selected_X = data_x[selected_data_indexes].numpy() + selected_y = data_y[selected_data_indexes].numpy() + + return TensorDataset(selected_X, selected_y), weights[order] \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/main.py b/examples/dataset_cifar_workflow/main.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dataset_cifar_workflow/models/__init__.py b/examples/dataset_cifar_workflow/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/dataset_cifar_workflow/models/config.yaml b/examples/dataset_cifar_workflow/models/config.yaml new file mode 100644 index 0000000..a73666d --- /dev/null +++ b/examples/dataset_cifar_workflow/models/config.yaml @@ -0,0 +1,8 @@ +model: + class_name: Model + kwargs: {} +stat_specifications: + - module_path: learnware.specification + class_name: RKMEImageStatSpecification + file_name: spec.json + kwargs: {} \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/models/conv/__init__.py b/examples/dataset_cifar_workflow/models/conv/__init__.py new file mode 100644 index 0000000..9080b3c --- /dev/null +++ b/examples/dataset_cifar_workflow/models/conv/__init__.py @@ -0,0 +1,26 @@ +import os + +import torch +import numpy as np +from learnware.model import BaseModel + +from .model import ConvModel + + +class Model(BaseModel): + def __init__(self, device="cuda", input_channel=3): + super(Model, self).__init__(input_shape=(input_channel, 32, 32), output_shape=(10,)) + dir_path = os.path.dirname(os.path.abspath(__file__)) + self.device =device + self.model = ConvModel(channel=input_channel, n_random_features=10) + self.model.load_state_dict(torch.load(os.path.join(dir_path, "model.pth"))) + self.model.to(device).eval() + + def fit(self, X: np.ndarray, y: np.ndarray): + raise NotImplementedError() + + def predict(self, X: np.ndarray) -> np.ndarray: + return self.model(torch.asarray(X, dtype=torch.float32, device=self.device)) + + def finetune(self, X: np.ndarray, y: np.ndarray): + raise NotImplementedError() diff --git a/examples/dataset_cifar_workflow/models/conv/model.py b/examples/dataset_cifar_workflow/models/conv/model.py new file mode 100644 index 0000000..b9e04a1 --- /dev/null +++ b/examples/dataset_cifar_workflow/models/conv/model.py @@ -0,0 +1,71 @@ +from torch import nn + + +class ConvModel(nn.Module): + def __init__(self, channel, n_random_features, net_width = 64, net_depth = 3, net_act = 'relu', + net_norm = 'batchnorm', net_pooling = 'avgpooling', im_size = (32,32)): + super().__init__() + self.features, shape_feat = self._make_layers(channel, net_width, net_depth, net_norm, net_act, net_pooling, im_size) + num_feat = shape_feat[0]*shape_feat[1]*shape_feat[2] + self.classifier = nn.Linear(num_feat, n_random_features) + + def forward(self, x): + out = self.features(x) + out = out.reshape(out.size(0), -1) + out = self.classifier(out) + return out + + def _get_activation(self, net_act): + if net_act == 'sigmoid': + return nn.Sigmoid() + elif net_act == 'relu': + return nn.ReLU(inplace=True) + elif net_act == 'leakyrelu': + return nn.LeakyReLU(negative_slope=0.01) + elif net_act == 'gelu': + return nn.SiLU() + else: + raise Exception('unknown activation function: %s'%net_act) + + def _get_pooling(self, net_pooling): + if net_pooling == 'maxpooling': + return nn.MaxPool2d(kernel_size=2, stride=2) + elif net_pooling == 'avgpooling': + return nn.AvgPool2d(kernel_size=2, stride=2) + elif net_pooling == 'none': + return None + else: + raise Exception('unknown net_pooling: %s'%net_pooling) + + def _get_normlayer(self, net_norm, shape_feat): + if net_norm == 'batchnorm': + return nn.BatchNorm2d(shape_feat[0], affine=True) + elif net_norm == 'layernorm': + return nn.LayerNorm(shape_feat, elementwise_affine=True) + elif net_norm == 'instancenorm': + return nn.GroupNorm(shape_feat[0], shape_feat[0], affine=True) + elif net_norm == 'groupnorm': + return nn.GroupNorm(4, shape_feat[0], affine=True) + elif net_norm == 'none': + return None + else: + raise Exception('unknown net_norm: %s'%net_norm) + + def _make_layers(self, channel, net_width, net_depth, net_norm, net_act, net_pooling, im_size): + layers = [] + in_channels = channel + shape_feat = [in_channels, im_size[0], im_size[1]] + for d in range(net_depth): + layers += [nn.Conv2d(in_channels, net_width, kernel_size=3, padding='same')] + + shape_feat[0] = net_width + if net_norm != 'none': + layers += [self._get_normlayer(net_norm, shape_feat)] + layers += [self._get_activation(net_act)] + in_channels = net_width + if net_pooling != 'none': + layers += [self._get_pooling(net_pooling)] + shape_feat[1] //= 2 + shape_feat[2] //= 2 + + return nn.Sequential(*layers), shape_feat \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/models/train.py b/examples/dataset_cifar_workflow/models/train.py new file mode 100644 index 0000000..e69de29