From 5f0929c14a060aaa3673afcbe7b7a3fdf2b40ae2 Mon Sep 17 00:00:00 2001 From: shihy Date: Thu, 30 Nov 2023 10:11:18 +0800 Subject: [PATCH] [ENH] Fixed some bugs and finished the main code --- .../dataset => }/__init__.py | 0 .../benchmarks/build_market.py | 114 ------------- .../benchmarks/dataset/__init__.py | 1 + .../benchmarks/dataset/data.py | 26 +++ .../{ => benchmarks}/dataset/utils.py | 38 ++++- .../benchmarks/evaluate_market.py | 75 --------- .../{ => benchmarks}/models/__init__.py | 0 .../{ => benchmarks}/models/config.yaml | 0 .../{ => benchmarks}/models/conv/__init__.py | 0 .../{ => benchmarks}/models/conv/model.py | 0 .../benchmarks/models/conv/requirements.txt | 3 + .../benchmarks/utils.py | 158 ++++++++++++++++++ .../dataset_cifar_workflow/dataset/data.py | 10 -- examples/dataset_cifar_workflow/main.py | 33 ++++ .../dataset_cifar_workflow/models/train.py | 0 15 files changed, 251 insertions(+), 207 deletions(-) rename examples/{dataset_cifar_workflow/dataset => }/__init__.py (100%) delete mode 100644 examples/dataset_cifar_workflow/benchmarks/build_market.py create mode 100644 examples/dataset_cifar_workflow/benchmarks/dataset/__init__.py create mode 100644 examples/dataset_cifar_workflow/benchmarks/dataset/data.py rename examples/dataset_cifar_workflow/{ => benchmarks}/dataset/utils.py (54%) delete mode 100644 examples/dataset_cifar_workflow/benchmarks/evaluate_market.py rename examples/dataset_cifar_workflow/{ => benchmarks}/models/__init__.py (100%) rename examples/dataset_cifar_workflow/{ => benchmarks}/models/config.yaml (100%) rename examples/dataset_cifar_workflow/{ => benchmarks}/models/conv/__init__.py (100%) rename examples/dataset_cifar_workflow/{ => benchmarks}/models/conv/model.py (100%) create mode 100644 examples/dataset_cifar_workflow/benchmarks/models/conv/requirements.txt create mode 100644 examples/dataset_cifar_workflow/benchmarks/utils.py delete mode 100644 examples/dataset_cifar_workflow/dataset/data.py delete mode 100644 examples/dataset_cifar_workflow/models/train.py diff --git a/examples/dataset_cifar_workflow/dataset/__init__.py b/examples/__init__.py similarity index 100% rename from examples/dataset_cifar_workflow/dataset/__init__.py rename to examples/__init__.py diff --git a/examples/dataset_cifar_workflow/benchmarks/build_market.py b/examples/dataset_cifar_workflow/benchmarks/build_market.py deleted file mode 100644 index e9bf46a..0000000 --- a/examples/dataset_cifar_workflow/benchmarks/build_market.py +++ /dev/null @@ -1,114 +0,0 @@ -import copy -import os -import zipfile -from shutil import copyfile, rmtree - -import learnware -import numpy as np -import tqdm -import yaml -from learnware import specification -from learnware.market import EasyMarket - -from preprocess.dataloader import ImageDataLoader - -user_semantic = { - "Data": {"Values": ["Image"], "Type": "Class"}, - "Task": { - "Values": ["Classification"], - "Type": "Class", - }, - "Library": {"Values": ["Scikit-learn"], "Type": "Class"}, - "Scenario": {"Values": [], "Type": "Tag"}, - "Description": {"Values": "", "Type": "String"}, - "Name": {"Values": "", "Type": "String"}, - "Output": {"Values": "", "Dimension": 0} -} - - -def build_from_preprocessed(args, regenerate=True): - zip_path_list = [] - data_root = os.path.join(args.data_root, 'learnware_market_data', "{}_{:d}".format(args.data, args.data_id)) - dataloader = ImageDataLoader(data_root, args.n_uploaders, train=True) - - market_root = args.market_root - for i, (train_X, train_y, val_X, val_y) in tqdm.tqdm(enumerate(dataloader), total=args.n_uploaders): - dir_path = os.path.join(market_root, args.data, "{}_{:d}".format(args.spec, args.id), "learnware_{:d}".format(i)) - os.makedirs(dir_path, exist_ok=True) - - if not regenerate: - zip_path_list.append(dir_path + ".zip") - continue - - # print("Preparing Learnware {:d} with {:s} specification".format(i, args.spec)) - # Copy Model File - model_file = os.path.join(dir_path, "model.pth") - copyfile(os.path.join(data_root, "models", "uploader_{:d}.pth".format(i)), - model_file) - - # Make Specification - if args.spec == "rbf": - spec = specification.utils.generate_rkme_spec(X=train_X, reduced_set_size=args.K, gamma=0.1, cuda_idx=args.cuda_idx) - elif args.spec == "ntk": - spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i, **args.__dict__) - spec.generate_stat_spec_from_data(val_X, K=args.K, steps=args.ntk_steps, reduce=True, whitening=False) - else: - raise NotImplementedError("Not Support", args.spec) - spec.save(os.path.join(dir_path, "spec.json")) - - # Copy __init__.py and learnware_yaml - init_file = os.path.join(dir_path, "__init__.py") - yaml_file = os.path.join(dir_path, "learnware.yaml") - copyfile( - os.path.join(market_root, "learnware_example", - "conv.py"), init_file - ) # cp conv.py init_file - - with open(os.path.join(market_root, "learnware_example", - "{}.yaml".format(args.spec)), "r") as yaml_templet,\ - open(yaml_file, "w") as yaml_target: - - yaml_content = yaml.load(yaml_templet, Loader=yaml.FullLoader) - - yaml_content["model"]["kwargs"]["device"] = str(choose_device(args.cuda_idx)) - yaml_content["model"]["kwargs"]["input_channel"] = train_X.shape[1] - if args.spec == "ntk": - yaml_content["stat_specifications"][0]["kwargs"] = copy.deepcopy(args.__dict__) - - yaml.dump(yaml_content, yaml_target) - - - zip_file = dir_path + ".zip" - # zip -q -r -j zip_file dir_path - with zipfile.ZipFile(zip_file, "w") as zip_obj: - for foldername, subfolders, filenames in os.walk(dir_path): - for filename in filenames: - file_path = os.path.join(foldername, filename) - zip_info = zipfile.ZipInfo(filename) - zip_info.compress_type = zipfile.ZIP_STORED - with open(file_path, "rb") as file: - zip_obj.writestr(zip_info, file.read()) - - rmtree(dir_path) # rm -r dir_path - zip_path_list.append(zip_file) - - return zip_path_list - -def upload_to_easy_market(args, zip_path_list, market_id=None): - learnware.init() - np.random.seed(2023) - market_id = market_id if market_id else "NTK-RF-{:d}".format(args.id) - market = DummyMarket(market_id=market_id, rebuild=True) - - for idx, zip_path in enumerate(zip_path_list): - semantic_spec = copy.deepcopy(user_semantic) - semantic_spec["Name"]["Values"] = "learnware_{:d}".format(idx) - semantic_spec["Description"]["Values"] = "test_learnware_number_{:d}".format(idx) - semantic_spec["Scenario"]["Values"] = [args.data] - semantic_spec["Output"]['Dimension'] = 10 - market.add_learnware(zip_path, semantic_spec) - - logger = get_custom_logger() - logger.debug("Total Item: {:d}".format(len(market))) - - return market \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/benchmarks/dataset/__init__.py b/examples/dataset_cifar_workflow/benchmarks/dataset/__init__.py new file mode 100644 index 0000000..8d16d00 --- /dev/null +++ b/examples/dataset_cifar_workflow/benchmarks/dataset/__init__.py @@ -0,0 +1 @@ +from .data import * \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/benchmarks/dataset/data.py b/examples/dataset_cifar_workflow/benchmarks/dataset/data.py new file mode 100644 index 0000000..5a8e335 --- /dev/null +++ b/examples/dataset_cifar_workflow/benchmarks/dataset/data.py @@ -0,0 +1,26 @@ +import os + +import torch +from torch.utils.data import random_split, Subset +from torchvision import datasets + +from examples.dataset_cifar_workflow.benchmarks.dataset.utils import build_transform, sample_by_labels, split_dataset + + +cache_root = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', '..', 'cache')) + +augment_transform, regular_transform = build_transform((32, 32)) +cifar_train_set_augment = datasets.CIFAR10(root="cache", download=True, + train=True, transform=augment_transform) +cifar_train_set = datasets.CIFAR10(root="cache", download=True, + train=True, transform=augment_transform) +cifar_test_set = datasets.CIFAR10(root="cache", download=True, + train=False, transform=regular_transform) + +def uploader_data(): + train_indices, order = split_dataset(torch.asarray(cifar_train_set_augment.targets), 12500, split="uploader") + valid_indices, _ = split_dataset(torch.asarray(cifar_test_set.targets), 2000, split="uploader", order=order) + + return (Subset(cifar_train_set_augment, train_indices), + Subset(cifar_test_set, valid_indices), + Subset(cifar_train_set, train_indices)) diff --git a/examples/dataset_cifar_workflow/dataset/utils.py b/examples/dataset_cifar_workflow/benchmarks/dataset/utils.py similarity index 54% rename from examples/dataset_cifar_workflow/dataset/utils.py rename to examples/dataset_cifar_workflow/benchmarks/dataset/utils.py index 04b0608..36fd33a 100644 --- a/examples/dataset_cifar_workflow/dataset/utils.py +++ b/examples/dataset_cifar_workflow/benchmarks/dataset/utils.py @@ -3,8 +3,14 @@ from functools import reduce import numpy as np import torch +import torchvision from torch.utils.data import TensorDataset +torchvision.disable_beta_transforms_warning() +from torchvision.transforms import transforms, v2 + + + def sample_by_labels(labels: torch.Tensor, weights, total_num): weights = np.asarray(weights) @@ -25,8 +31,7 @@ def sample_by_labels(labels: torch.Tensor, weights, total_num): USER_WEIGHTS = [3, 3, 1, 1, 1, 1, 0, 0, 0, 0] UPLOADER_WEIGHTS = [4, 4, 1, 1, 0, 0, 0, 0, 0, 0] - -def split_dataset(data_x, data_y, size, split="uploader"): +def split_dataset(labels, size, split="uploader", order=None): if split == "uploader": weights = np.asarray(UPLOADER_WEIGHTS) elif split == "user": @@ -34,12 +39,29 @@ def split_dataset(data_x, data_y, size, split="uploader"): else: raise Exception(split) - order = list(range(len(weights))) - random.shuffle(order) + if order is None: + order = list(range(len(weights))) + random.shuffle(order) - selected_data_indexes = reduce(lambda x, y: x+y, sample_by_labels(data_y, weights[order], size)) + selected_data_indexes = reduce(lambda x, y: x+y, sample_by_labels(labels, weights[order], size)) selected_data_indexes = torch.stack(selected_data_indexes) - selected_X = data_x[selected_data_indexes].numpy() - selected_y = data_y[selected_data_indexes].numpy() - return TensorDataset(selected_X, selected_y), weights[order] \ No newline at end of file + return selected_data_indexes, order + +def build_transform(size): + augment_transform = transforms.Compose([ + transforms.Resize(size), + v2.AutoAugment(), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]), + ]) + + regular_transform = transforms.Compose([ + transforms.Resize(size), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]), + ]) + + return augment_transform, regular_transform \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/benchmarks/evaluate_market.py b/examples/dataset_cifar_workflow/benchmarks/evaluate_market.py deleted file mode 100644 index 8fb5b87..0000000 --- a/examples/dataset_cifar_workflow/benchmarks/evaluate_market.py +++ /dev/null @@ -1,75 +0,0 @@ -import os -import random -from time import sleep -from typing import Dict - -import learnware -import numpy as np -import torch.random -from learnware import specification -from learnware.market import BaseUserInfo -from tqdm import tqdm - -from build_market import user_semantic -from preprocess.dataloader import ImageDataLoader -from utils.clerk import Clerk, get_custom_logger -from utils.reuse import AveragingReuser - - -def evaluate_market_performance(args, market, clerk: Clerk=None, regenerate=True) -> Dict: - logger = get_custom_logger() - - data_root = os.path.join(args.data_root, 'learnware_market_data', "{}_{:d}".format(args.data, args.data_id)) - dataloader = ImageDataLoader(data_root, args.n_users, train=False) - acc = [] - - market_root = args.market_root - # shuffled = list(enumerate(dataloader)) - # random.shuffle(shuffled) - for i, (test_X, test_y) in enumerate(dataloader): - dir_path = os.path.join(market_root, args.data, "{}_{:d}".format(args.spec, args.id), "user_{:d}".format(i)) - os.makedirs(dir_path, exist_ok=True) - - if regenerate: - if args.spec == "rbf": - stat_spec = specification.utils.generate_rkme_spec(X=test_X, reduced_set_size=args.K, gamma=0.1, cuda_idx=args.cuda_idx) - elif args.spec == "ntk": - stat_spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i+args.n_uploaders, **args.__dict__) - stat_spec.generate_stat_spec_from_data(test_X, reduce=True, steps=args.ntk_steps, K=args.K, whitening=False) - else: - raise NotImplementedError() - # Save User's spec to disk - stat_spec.save(os.path.join(dir_path, "spec.json")) - else: - if args.spec == "rbf": - stat_spec = specification.RKMEStatSpecification(gamma=0.1, cuda_idx=args.cuda_idx) - elif args.spec == "ntk": - stat_spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i+args.n_uploaders, cache=False, **args.__dict__) - else: - raise NotImplementedError() - # Load User's spec from disk - stat_spec.load(os.path.join(dir_path, "spec.json")) - - user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMEStatSpecification": stat_spec}) - - sorted_score_list, single_learnware_list, _, _= market.search_learnware(user_info, max_search_num=args.max_search_num) - - reuse_ensemble = AveragingReuser(learnware_list=single_learnware_list, mode="vote") - ensemble_predict_y = np.argmax(reuse_ensemble.predict(user_data=test_X), axis=-1) - - curr_acc = np.mean(ensemble_predict_y == test_y) - acc.append(curr_acc) - if clerk: - clerk.rkme_performance(curr_acc) - - logger.debug("Accuracy for user {:d}: {:.3f}; {:.3f} on average up to now.".format(i, curr_acc, np.mean(acc))) - - logger.info("Accuracy {:.3f}({:.3f})".format(np.mean(acc), np.std(acc))) - - return { - "Accuracy": { - "Mean": np.mean(acc), - "Std": np.std(acc), - "All": acc - } - } \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/models/__init__.py b/examples/dataset_cifar_workflow/benchmarks/models/__init__.py similarity index 100% rename from examples/dataset_cifar_workflow/models/__init__.py rename to examples/dataset_cifar_workflow/benchmarks/models/__init__.py diff --git a/examples/dataset_cifar_workflow/models/config.yaml b/examples/dataset_cifar_workflow/benchmarks/models/config.yaml similarity index 100% rename from examples/dataset_cifar_workflow/models/config.yaml rename to examples/dataset_cifar_workflow/benchmarks/models/config.yaml diff --git a/examples/dataset_cifar_workflow/models/conv/__init__.py b/examples/dataset_cifar_workflow/benchmarks/models/conv/__init__.py similarity index 100% rename from examples/dataset_cifar_workflow/models/conv/__init__.py rename to examples/dataset_cifar_workflow/benchmarks/models/conv/__init__.py diff --git a/examples/dataset_cifar_workflow/models/conv/model.py b/examples/dataset_cifar_workflow/benchmarks/models/conv/model.py similarity index 100% rename from examples/dataset_cifar_workflow/models/conv/model.py rename to examples/dataset_cifar_workflow/benchmarks/models/conv/model.py diff --git a/examples/dataset_cifar_workflow/benchmarks/models/conv/requirements.txt b/examples/dataset_cifar_workflow/benchmarks/models/conv/requirements.txt new file mode 100644 index 0000000..c1bb5f6 --- /dev/null +++ b/examples/dataset_cifar_workflow/benchmarks/models/conv/requirements.txt @@ -0,0 +1,3 @@ +numpy +torch>2.0.0 +torchvision diff --git a/examples/dataset_cifar_workflow/benchmarks/utils.py b/examples/dataset_cifar_workflow/benchmarks/utils.py new file mode 100644 index 0000000..51547d8 --- /dev/null +++ b/examples/dataset_cifar_workflow/benchmarks/utils.py @@ -0,0 +1,158 @@ +import os +import zipfile +from shutil import rmtree + +import numpy as np +import torch +import tqdm +from torch import optim, nn +from torch.utils.data import DataLoader, Dataset + +from learnware.client import LearnwareClient +from learnware.learnware import Learnware +from learnware.specification import generate_rkme_image_spec, RKMEImageSpecification +from .dataset import uploader_data +from .models.conv import ConvModel +from learnware.market import LearnwareMarket +from learnware.utils import choose_device + +@torch.no_grad() +def evaluate(model, evaluate_set: Dataset, device=None): + device = choose_device(0) if device is None else device + + if isinstance(model, Learnware): + # duck-type + model.__call__ = model.predict + if isinstance(model, nn.Module): + model.eval() + + criterion = nn.CrossEntropyLoss(reduction="sum") + total, correct, loss = 0, 0, 0.0 + dataloader = DataLoader(evaluate_set, batch_size=512, shuffle=True) + for i, (X, y) in enumerate(dataloader): + X, y = X.to(device), y.to(device) + out = model(X) + loss += criterion(out, y) + + _, predicted = torch.max(out.data, 1) + total += y.size(0) + correct += (predicted == y).sum().item() + + acc = correct / total * 100 + loss = loss / total + + if isinstance(model, nn.Module): + model.train() + + return loss, acc + + +def build_learnware(name: str, market: LearnwareMarket, model_name="conv", + out_classes=10, epochs=35, batch_size=1024, device=None): + device = choose_device(0) if device is None else device + + if name == "cifar10": + train_set, valid_set, spec_set = uploader_data() + else: + raise Exception("Not support", name) + + channel = train_set[0][0].shape[0] + image_size = train_set[0][0].shape[1], train_set[0][0].shape[2] + + model = ConvModel(channel=channel, im_size=image_size, + n_random_features=out_classes).to(device) + model.train() + + # SGD optimizer with learning rate 1e-2 + optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.9) + + # mean-squared error loss + criterion = nn.CrossEntropyLoss() + # Prepare DataLoader + dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True) + # Optimizing... + for epoch in tqdm.tqdm(range(epochs), total=epochs): + running_loss = [] + for i, (X, y) in enumerate(dataloader): + X, y = X.to(device=device), y.to(device=device) + optimizer.zero_grad() + out = model(X) + loss = criterion(out, y) + loss.backward() + optimizer.step() + running_loss.append(loss.item()) + + if (epoch + 1) % 5 == 0: + valid_loss, valid_acc = evaluate(model, train_set, device=device) + print('Epoch: {}, Train Average Loss: {:.3f}, Valid Average Loss: {:.3f}'.format( + epoch+1, np.mean(running_loss), valid_loss)) + + train_loss, train_acc = evaluate(model, train_set, device=device) + print("Train Loss: {:.3e}\tTrain Accuracy: {:.3e}".format(train_loss, train_acc)) + + # build specification + loader = DataLoader(spec_set, batch_size=3000, shuffle=True) + sampled_X, _ = next(iter(loader)) + spec = generate_rkme_image_spec(sampled_X) + + # add to market + cache_dir = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'cache', 'learnware')) + if os.path.exists(cache_dir): + rmtree(cache_dir) + os.makedirs(cache_dir, exist_ok=True) + model_dir = os.path.abspath(os.path.join(__file__, "models")) + spec.save(os.path.join(cache_dir, "spec.json")) + + zip_file = os.path.join(cache_dir, "learnware.zip") + # zip -q -r -j zip_file dir_path + with zipfile.ZipFile(zip_file, "w") as zip_obj: + for foldername, subfolders, filenames in os.walk(os.path.join(model_dir, model_name)): + for filename in filenames: + file_path = os.path.join(foldername, filename) + zip_info = zipfile.ZipInfo(filename) + zip_info.compress_type = zipfile.ZIP_STORED + with open(file_path, "rb") as file: + zip_obj.writestr(zip_info, file.read()) + + for filename, filepath in zip(["spec.json", "config.yaml"], + [os.path.join(cache_dir, "spec.json"), + os.path.join(model_dir, "config.yaml")]): + zip_info = zipfile.ZipInfo(filename) + zip_info.compress_type = zipfile.ZIP_STORED + with open(file_path, "rb") as file: + zip_obj.writestr(zip_info, file.read()) + + market.add_learnware(zip_file, semantic_spec=LearnwareClient.create_semantic_specification( + self=None, + name="learnware", + description="", + data_type="Image", + task_type="Classification", + library_type="PyTorch", + scenarios=["Computer"], + output_description={str(i): "i" for i in range(out_classes)}) + ) + + return model + + +def build_specification(name: str, cache_id, sampled_size=3000): + cache_path = os.path.abspath(os.path.join( + os.path.dirname( __file__ ), '..', '..', 'cache', "{}.json".format(cache_id))) + + if os.path.exists(cache_path): + spec = RKMEImageSpecification() + spec.load(cache_path) + return spec + + if name == "cifar10": + dataset = cifar10(split="user") + else: + raise Exception("Not support", name) + + loader = DataLoader(dataset, batch_size=sampled_size, shuffle=True) + sampled_X, _ = next(iter(loader)) + spec = generate_rkme_image_spec(sampled_X) + + spec.save(cache_path) + return spec \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/dataset/data.py b/examples/dataset_cifar_workflow/dataset/data.py deleted file mode 100644 index 3c47d88..0000000 --- a/examples/dataset_cifar_workflow/dataset/data.py +++ /dev/null @@ -1,10 +0,0 @@ -from torchvision import datasets - - -def cifar10(split="uploader"): - assert(split in {"uploader", "user"}) - - if split == "uploader": - dataset = datasets.CIFAR10(root="cache", download=True, train=True) - else: - dataset = datasets.CIFAR10(root="cache", download=True, train=False) \ No newline at end of file diff --git a/examples/dataset_cifar_workflow/main.py b/examples/dataset_cifar_workflow/main.py index e69de29..c4cf0a9 100644 --- a/examples/dataset_cifar_workflow/main.py +++ b/examples/dataset_cifar_workflow/main.py @@ -0,0 +1,33 @@ +import os + +import fire + +from examples.dataset_cifar_workflow.benchmarks.utils import build_learnware, build_specification +from learnware.market import instantiate_learnware_market + +PROXY_IP = "172.24.57.111" +os.environ["HTTP_PROXY"] = "http://"+PROXY_IP+":7890" +os.environ["HTTPS_PROXY"] = "http://"+PROXY_IP+":7890" + +class CifarDatasetWorkflow: + + def prepare_learnware(self, market_size=30, rebuild=False): + """initialize learnware market""" + # learnware.init() + + market = instantiate_learnware_market(name="easy", market_id="dataset_cifar_workflow", rebuild=rebuild) + + for i in range(market_size - len(market)): + build_learnware("cifar10", market) + + print("Total Item:", len(market)) + + def evaluate(self, user_size=20): + market = instantiate_learnware_market(name="easy", market_id="dataset_cifar_workflow", rebuild=rebuild) + + # for i in range(user_size): + # build_specification() + + +if __name__ == "__main__": + fire.Fire(CifarDatasetWorkflow) diff --git a/examples/dataset_cifar_workflow/models/train.py b/examples/dataset_cifar_workflow/models/train.py deleted file mode 100644 index e69de29..0000000