| @@ -1,114 +0,0 @@ | |||
| import copy | |||
| import os | |||
| import zipfile | |||
| from shutil import copyfile, rmtree | |||
| import learnware | |||
| import numpy as np | |||
| import tqdm | |||
| import yaml | |||
| from learnware import specification | |||
| from learnware.market import EasyMarket | |||
| from preprocess.dataloader import ImageDataLoader | |||
| user_semantic = { | |||
| "Data": {"Values": ["Image"], "Type": "Class"}, | |||
| "Task": { | |||
| "Values": ["Classification"], | |||
| "Type": "Class", | |||
| }, | |||
| "Library": {"Values": ["Scikit-learn"], "Type": "Class"}, | |||
| "Scenario": {"Values": [], "Type": "Tag"}, | |||
| "Description": {"Values": "", "Type": "String"}, | |||
| "Name": {"Values": "", "Type": "String"}, | |||
| "Output": {"Values": "", "Dimension": 0} | |||
| } | |||
| def build_from_preprocessed(args, regenerate=True): | |||
| zip_path_list = [] | |||
| data_root = os.path.join(args.data_root, 'learnware_market_data', "{}_{:d}".format(args.data, args.data_id)) | |||
| dataloader = ImageDataLoader(data_root, args.n_uploaders, train=True) | |||
| market_root = args.market_root | |||
| for i, (train_X, train_y, val_X, val_y) in tqdm.tqdm(enumerate(dataloader), total=args.n_uploaders): | |||
| dir_path = os.path.join(market_root, args.data, "{}_{:d}".format(args.spec, args.id), "learnware_{:d}".format(i)) | |||
| os.makedirs(dir_path, exist_ok=True) | |||
| if not regenerate: | |||
| zip_path_list.append(dir_path + ".zip") | |||
| continue | |||
| # print("Preparing Learnware {:d} with {:s} specification".format(i, args.spec)) | |||
| # Copy Model File | |||
| model_file = os.path.join(dir_path, "model.pth") | |||
| copyfile(os.path.join(data_root, "models", "uploader_{:d}.pth".format(i)), | |||
| model_file) | |||
| # Make Specification | |||
| if args.spec == "rbf": | |||
| spec = specification.utils.generate_rkme_spec(X=train_X, reduced_set_size=args.K, gamma=0.1, cuda_idx=args.cuda_idx) | |||
| elif args.spec == "ntk": | |||
| spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i, **args.__dict__) | |||
| spec.generate_stat_spec_from_data(val_X, K=args.K, steps=args.ntk_steps, reduce=True, whitening=False) | |||
| else: | |||
| raise NotImplementedError("Not Support", args.spec) | |||
| spec.save(os.path.join(dir_path, "spec.json")) | |||
| # Copy __init__.py and learnware_yaml | |||
| init_file = os.path.join(dir_path, "__init__.py") | |||
| yaml_file = os.path.join(dir_path, "learnware.yaml") | |||
| copyfile( | |||
| os.path.join(market_root, "learnware_example", | |||
| "conv.py"), init_file | |||
| ) # cp conv.py init_file | |||
| with open(os.path.join(market_root, "learnware_example", | |||
| "{}.yaml".format(args.spec)), "r") as yaml_templet,\ | |||
| open(yaml_file, "w") as yaml_target: | |||
| yaml_content = yaml.load(yaml_templet, Loader=yaml.FullLoader) | |||
| yaml_content["model"]["kwargs"]["device"] = str(choose_device(args.cuda_idx)) | |||
| yaml_content["model"]["kwargs"]["input_channel"] = train_X.shape[1] | |||
| if args.spec == "ntk": | |||
| yaml_content["stat_specifications"][0]["kwargs"] = copy.deepcopy(args.__dict__) | |||
| yaml.dump(yaml_content, yaml_target) | |||
| zip_file = dir_path + ".zip" | |||
| # zip -q -r -j zip_file dir_path | |||
| with zipfile.ZipFile(zip_file, "w") as zip_obj: | |||
| for foldername, subfolders, filenames in os.walk(dir_path): | |||
| for filename in filenames: | |||
| file_path = os.path.join(foldername, filename) | |||
| zip_info = zipfile.ZipInfo(filename) | |||
| zip_info.compress_type = zipfile.ZIP_STORED | |||
| with open(file_path, "rb") as file: | |||
| zip_obj.writestr(zip_info, file.read()) | |||
| rmtree(dir_path) # rm -r dir_path | |||
| zip_path_list.append(zip_file) | |||
| return zip_path_list | |||
| def upload_to_easy_market(args, zip_path_list, market_id=None): | |||
| learnware.init() | |||
| np.random.seed(2023) | |||
| market_id = market_id if market_id else "NTK-RF-{:d}".format(args.id) | |||
| market = DummyMarket(market_id=market_id, rebuild=True) | |||
| for idx, zip_path in enumerate(zip_path_list): | |||
| semantic_spec = copy.deepcopy(user_semantic) | |||
| semantic_spec["Name"]["Values"] = "learnware_{:d}".format(idx) | |||
| semantic_spec["Description"]["Values"] = "test_learnware_number_{:d}".format(idx) | |||
| semantic_spec["Scenario"]["Values"] = [args.data] | |||
| semantic_spec["Output"]['Dimension'] = 10 | |||
| market.add_learnware(zip_path, semantic_spec) | |||
| logger = get_custom_logger() | |||
| logger.debug("Total Item: {:d}".format(len(market))) | |||
| return market | |||
| @@ -0,0 +1 @@ | |||
| from .data import * | |||
| @@ -0,0 +1,26 @@ | |||
| import os | |||
| import torch | |||
| from torch.utils.data import random_split, Subset | |||
| from torchvision import datasets | |||
| from examples.dataset_cifar_workflow.benchmarks.dataset.utils import build_transform, sample_by_labels, split_dataset | |||
| cache_root = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', '..', 'cache')) | |||
| augment_transform, regular_transform = build_transform((32, 32)) | |||
| cifar_train_set_augment = datasets.CIFAR10(root="cache", download=True, | |||
| train=True, transform=augment_transform) | |||
| cifar_train_set = datasets.CIFAR10(root="cache", download=True, | |||
| train=True, transform=augment_transform) | |||
| cifar_test_set = datasets.CIFAR10(root="cache", download=True, | |||
| train=False, transform=regular_transform) | |||
| def uploader_data(): | |||
| train_indices, order = split_dataset(torch.asarray(cifar_train_set_augment.targets), 12500, split="uploader") | |||
| valid_indices, _ = split_dataset(torch.asarray(cifar_test_set.targets), 2000, split="uploader", order=order) | |||
| return (Subset(cifar_train_set_augment, train_indices), | |||
| Subset(cifar_test_set, valid_indices), | |||
| Subset(cifar_train_set, train_indices)) | |||
| @@ -3,8 +3,14 @@ from functools import reduce | |||
| import numpy as np | |||
| import torch | |||
| import torchvision | |||
| from torch.utils.data import TensorDataset | |||
| torchvision.disable_beta_transforms_warning() | |||
| from torchvision.transforms import transforms, v2 | |||
| def sample_by_labels(labels: torch.Tensor, weights, total_num): | |||
| weights = np.asarray(weights) | |||
| @@ -25,8 +31,7 @@ def sample_by_labels(labels: torch.Tensor, weights, total_num): | |||
| USER_WEIGHTS = [3, 3, 1, 1, 1, 1, 0, 0, 0, 0] | |||
| UPLOADER_WEIGHTS = [4, 4, 1, 1, 0, 0, 0, 0, 0, 0] | |||
| def split_dataset(data_x, data_y, size, split="uploader"): | |||
| def split_dataset(labels, size, split="uploader", order=None): | |||
| if split == "uploader": | |||
| weights = np.asarray(UPLOADER_WEIGHTS) | |||
| elif split == "user": | |||
| @@ -34,12 +39,29 @@ def split_dataset(data_x, data_y, size, split="uploader"): | |||
| else: | |||
| raise Exception(split) | |||
| order = list(range(len(weights))) | |||
| random.shuffle(order) | |||
| if order is None: | |||
| order = list(range(len(weights))) | |||
| random.shuffle(order) | |||
| selected_data_indexes = reduce(lambda x, y: x+y, sample_by_labels(data_y, weights[order], size)) | |||
| selected_data_indexes = reduce(lambda x, y: x+y, sample_by_labels(labels, weights[order], size)) | |||
| selected_data_indexes = torch.stack(selected_data_indexes) | |||
| selected_X = data_x[selected_data_indexes].numpy() | |||
| selected_y = data_y[selected_data_indexes].numpy() | |||
| return TensorDataset(selected_X, selected_y), weights[order] | |||
| return selected_data_indexes, order | |||
| def build_transform(size): | |||
| augment_transform = transforms.Compose([ | |||
| transforms.Resize(size), | |||
| v2.AutoAugment(), | |||
| transforms.ToTensor(), | |||
| transforms.Normalize(mean=[0.485, 0.456, 0.406], | |||
| std=[0.229, 0.224, 0.225]), | |||
| ]) | |||
| regular_transform = transforms.Compose([ | |||
| transforms.Resize(size), | |||
| transforms.ToTensor(), | |||
| transforms.Normalize(mean=[0.485, 0.456, 0.406], | |||
| std=[0.229, 0.224, 0.225]), | |||
| ]) | |||
| return augment_transform, regular_transform | |||
| @@ -1,75 +0,0 @@ | |||
| import os | |||
| import random | |||
| from time import sleep | |||
| from typing import Dict | |||
| import learnware | |||
| import numpy as np | |||
| import torch.random | |||
| from learnware import specification | |||
| from learnware.market import BaseUserInfo | |||
| from tqdm import tqdm | |||
| from build_market import user_semantic | |||
| from preprocess.dataloader import ImageDataLoader | |||
| from utils.clerk import Clerk, get_custom_logger | |||
| from utils.reuse import AveragingReuser | |||
| def evaluate_market_performance(args, market, clerk: Clerk=None, regenerate=True) -> Dict: | |||
| logger = get_custom_logger() | |||
| data_root = os.path.join(args.data_root, 'learnware_market_data', "{}_{:d}".format(args.data, args.data_id)) | |||
| dataloader = ImageDataLoader(data_root, args.n_users, train=False) | |||
| acc = [] | |||
| market_root = args.market_root | |||
| # shuffled = list(enumerate(dataloader)) | |||
| # random.shuffle(shuffled) | |||
| for i, (test_X, test_y) in enumerate(dataloader): | |||
| dir_path = os.path.join(market_root, args.data, "{}_{:d}".format(args.spec, args.id), "user_{:d}".format(i)) | |||
| os.makedirs(dir_path, exist_ok=True) | |||
| if regenerate: | |||
| if args.spec == "rbf": | |||
| stat_spec = specification.utils.generate_rkme_spec(X=test_X, reduced_set_size=args.K, gamma=0.1, cuda_idx=args.cuda_idx) | |||
| elif args.spec == "ntk": | |||
| stat_spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i+args.n_uploaders, **args.__dict__) | |||
| stat_spec.generate_stat_spec_from_data(test_X, reduce=True, steps=args.ntk_steps, K=args.K, whitening=False) | |||
| else: | |||
| raise NotImplementedError() | |||
| # Save User's spec to disk | |||
| stat_spec.save(os.path.join(dir_path, "spec.json")) | |||
| else: | |||
| if args.spec == "rbf": | |||
| stat_spec = specification.RKMEStatSpecification(gamma=0.1, cuda_idx=args.cuda_idx) | |||
| elif args.spec == "ntk": | |||
| stat_spec = learnware.specification.RKMEImageStatSpecification(rkme_id=i+args.n_uploaders, cache=False, **args.__dict__) | |||
| else: | |||
| raise NotImplementedError() | |||
| # Load User's spec from disk | |||
| stat_spec.load(os.path.join(dir_path, "spec.json")) | |||
| user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMEStatSpecification": stat_spec}) | |||
| sorted_score_list, single_learnware_list, _, _= market.search_learnware(user_info, max_search_num=args.max_search_num) | |||
| reuse_ensemble = AveragingReuser(learnware_list=single_learnware_list, mode="vote") | |||
| ensemble_predict_y = np.argmax(reuse_ensemble.predict(user_data=test_X), axis=-1) | |||
| curr_acc = np.mean(ensemble_predict_y == test_y) | |||
| acc.append(curr_acc) | |||
| if clerk: | |||
| clerk.rkme_performance(curr_acc) | |||
| logger.debug("Accuracy for user {:d}: {:.3f}; {:.3f} on average up to now.".format(i, curr_acc, np.mean(acc))) | |||
| logger.info("Accuracy {:.3f}({:.3f})".format(np.mean(acc), np.std(acc))) | |||
| return { | |||
| "Accuracy": { | |||
| "Mean": np.mean(acc), | |||
| "Std": np.std(acc), | |||
| "All": acc | |||
| } | |||
| } | |||
| @@ -0,0 +1,3 @@ | |||
| numpy | |||
| torch>2.0.0 | |||
| torchvision | |||
| @@ -0,0 +1,158 @@ | |||
| import os | |||
| import zipfile | |||
| from shutil import rmtree | |||
| import numpy as np | |||
| import torch | |||
| import tqdm | |||
| from torch import optim, nn | |||
| from torch.utils.data import DataLoader, Dataset | |||
| from learnware.client import LearnwareClient | |||
| from learnware.learnware import Learnware | |||
| from learnware.specification import generate_rkme_image_spec, RKMEImageSpecification | |||
| from .dataset import uploader_data | |||
| from .models.conv import ConvModel | |||
| from learnware.market import LearnwareMarket | |||
| from learnware.utils import choose_device | |||
| @torch.no_grad() | |||
| def evaluate(model, evaluate_set: Dataset, device=None): | |||
| device = choose_device(0) if device is None else device | |||
| if isinstance(model, Learnware): | |||
| # duck-type | |||
| model.__call__ = model.predict | |||
| if isinstance(model, nn.Module): | |||
| model.eval() | |||
| criterion = nn.CrossEntropyLoss(reduction="sum") | |||
| total, correct, loss = 0, 0, 0.0 | |||
| dataloader = DataLoader(evaluate_set, batch_size=512, shuffle=True) | |||
| for i, (X, y) in enumerate(dataloader): | |||
| X, y = X.to(device), y.to(device) | |||
| out = model(X) | |||
| loss += criterion(out, y) | |||
| _, predicted = torch.max(out.data, 1) | |||
| total += y.size(0) | |||
| correct += (predicted == y).sum().item() | |||
| acc = correct / total * 100 | |||
| loss = loss / total | |||
| if isinstance(model, nn.Module): | |||
| model.train() | |||
| return loss, acc | |||
| def build_learnware(name: str, market: LearnwareMarket, model_name="conv", | |||
| out_classes=10, epochs=35, batch_size=1024, device=None): | |||
| device = choose_device(0) if device is None else device | |||
| if name == "cifar10": | |||
| train_set, valid_set, spec_set = uploader_data() | |||
| else: | |||
| raise Exception("Not support", name) | |||
| channel = train_set[0][0].shape[0] | |||
| image_size = train_set[0][0].shape[1], train_set[0][0].shape[2] | |||
| model = ConvModel(channel=channel, im_size=image_size, | |||
| n_random_features=out_classes).to(device) | |||
| model.train() | |||
| # SGD optimizer with learning rate 1e-2 | |||
| optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.9) | |||
| # mean-squared error loss | |||
| criterion = nn.CrossEntropyLoss() | |||
| # Prepare DataLoader | |||
| dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True) | |||
| # Optimizing... | |||
| for epoch in tqdm.tqdm(range(epochs), total=epochs): | |||
| running_loss = [] | |||
| for i, (X, y) in enumerate(dataloader): | |||
| X, y = X.to(device=device), y.to(device=device) | |||
| optimizer.zero_grad() | |||
| out = model(X) | |||
| loss = criterion(out, y) | |||
| loss.backward() | |||
| optimizer.step() | |||
| running_loss.append(loss.item()) | |||
| if (epoch + 1) % 5 == 0: | |||
| valid_loss, valid_acc = evaluate(model, train_set, device=device) | |||
| print('Epoch: {}, Train Average Loss: {:.3f}, Valid Average Loss: {:.3f}'.format( | |||
| epoch+1, np.mean(running_loss), valid_loss)) | |||
| train_loss, train_acc = evaluate(model, train_set, device=device) | |||
| print("Train Loss: {:.3e}\tTrain Accuracy: {:.3e}".format(train_loss, train_acc)) | |||
| # build specification | |||
| loader = DataLoader(spec_set, batch_size=3000, shuffle=True) | |||
| sampled_X, _ = next(iter(loader)) | |||
| spec = generate_rkme_image_spec(sampled_X) | |||
| # add to market | |||
| cache_dir = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'cache', 'learnware')) | |||
| if os.path.exists(cache_dir): | |||
| rmtree(cache_dir) | |||
| os.makedirs(cache_dir, exist_ok=True) | |||
| model_dir = os.path.abspath(os.path.join(__file__, "models")) | |||
| spec.save(os.path.join(cache_dir, "spec.json")) | |||
| zip_file = os.path.join(cache_dir, "learnware.zip") | |||
| # zip -q -r -j zip_file dir_path | |||
| with zipfile.ZipFile(zip_file, "w") as zip_obj: | |||
| for foldername, subfolders, filenames in os.walk(os.path.join(model_dir, model_name)): | |||
| for filename in filenames: | |||
| file_path = os.path.join(foldername, filename) | |||
| zip_info = zipfile.ZipInfo(filename) | |||
| zip_info.compress_type = zipfile.ZIP_STORED | |||
| with open(file_path, "rb") as file: | |||
| zip_obj.writestr(zip_info, file.read()) | |||
| for filename, filepath in zip(["spec.json", "config.yaml"], | |||
| [os.path.join(cache_dir, "spec.json"), | |||
| os.path.join(model_dir, "config.yaml")]): | |||
| zip_info = zipfile.ZipInfo(filename) | |||
| zip_info.compress_type = zipfile.ZIP_STORED | |||
| with open(file_path, "rb") as file: | |||
| zip_obj.writestr(zip_info, file.read()) | |||
| market.add_learnware(zip_file, semantic_spec=LearnwareClient.create_semantic_specification( | |||
| self=None, | |||
| name="learnware", | |||
| description="", | |||
| data_type="Image", | |||
| task_type="Classification", | |||
| library_type="PyTorch", | |||
| scenarios=["Computer"], | |||
| output_description={str(i): "i" for i in range(out_classes)}) | |||
| ) | |||
| return model | |||
| def build_specification(name: str, cache_id, sampled_size=3000): | |||
| cache_path = os.path.abspath(os.path.join( | |||
| os.path.dirname( __file__ ), '..', '..', 'cache', "{}.json".format(cache_id))) | |||
| if os.path.exists(cache_path): | |||
| spec = RKMEImageSpecification() | |||
| spec.load(cache_path) | |||
| return spec | |||
| if name == "cifar10": | |||
| dataset = cifar10(split="user") | |||
| else: | |||
| raise Exception("Not support", name) | |||
| loader = DataLoader(dataset, batch_size=sampled_size, shuffle=True) | |||
| sampled_X, _ = next(iter(loader)) | |||
| spec = generate_rkme_image_spec(sampled_X) | |||
| spec.save(cache_path) | |||
| return spec | |||
| @@ -1,10 +0,0 @@ | |||
| from torchvision import datasets | |||
| def cifar10(split="uploader"): | |||
| assert(split in {"uploader", "user"}) | |||
| if split == "uploader": | |||
| dataset = datasets.CIFAR10(root="cache", download=True, train=True) | |||
| else: | |||
| dataset = datasets.CIFAR10(root="cache", download=True, train=False) | |||
| @@ -0,0 +1,33 @@ | |||
| import os | |||
| import fire | |||
| from examples.dataset_cifar_workflow.benchmarks.utils import build_learnware, build_specification | |||
| from learnware.market import instantiate_learnware_market | |||
| PROXY_IP = "172.24.57.111" | |||
| os.environ["HTTP_PROXY"] = "http://"+PROXY_IP+":7890" | |||
| os.environ["HTTPS_PROXY"] = "http://"+PROXY_IP+":7890" | |||
| class CifarDatasetWorkflow: | |||
| def prepare_learnware(self, market_size=30, rebuild=False): | |||
| """initialize learnware market""" | |||
| # learnware.init() | |||
| market = instantiate_learnware_market(name="easy", market_id="dataset_cifar_workflow", rebuild=rebuild) | |||
| for i in range(market_size - len(market)): | |||
| build_learnware("cifar10", market) | |||
| print("Total Item:", len(market)) | |||
| def evaluate(self, user_size=20): | |||
| market = instantiate_learnware_market(name="easy", market_id="dataset_cifar_workflow", rebuild=rebuild) | |||
| # for i in range(user_size): | |||
| # build_specification() | |||
| if __name__ == "__main__": | |||
| fire.Fire(CifarDatasetWorkflow) | |||