Browse Source

[MNT] init hetero table workflow

tags/v0.3.2
liuht 2 years ago
parent
commit
82e1aa212e
2 changed files with 195 additions and 202 deletions
  1. +50
    -8
      examples/dataset_table_workflow/base.py
  2. +145
    -194
      examples/dataset_table_workflow/hetero.py

+ 50
- 8
examples/dataset_table_workflow/base.py View File

@@ -1,13 +1,16 @@
import os
import time
import pandas
import torch
import random
import tempfile
import numpy as np
from queue import Empty
from tqdm import tqdm
from learnware.client import LearnwareClient
from learnware.logger import get_module_logger
from learnware.market import instantiate_learnware_market
from learnware.tests.benchmarks import LearnwareBenchmark
from torch.multiprocessing import Process, Queue, set_start_method

from config import *
from methods import *
@@ -15,6 +18,12 @@ from utils import process_single_aug

logger = get_module_logger("base_table", level="INFO")

try:
set_start_method('spawn')
except RuntimeError:
pass
torch.multiprocessing.set_sharing_strategy("file_system")


class TableWorkflow:
def __init__(self, benchmark_config, name="easy", rebuild=False):
@@ -24,6 +33,8 @@ class TableWorkflow:
os.makedirs(self.result_path, exist_ok=True)
os.makedirs(self.curves_result_path, exist_ok=True)
self._prepare_market(benchmark_config, name, rebuild)
self.cuda_idx = list(range(torch.cuda.device_count()))
@staticmethod
def _limited_data(method, test_info, loss_func):
@@ -75,6 +86,7 @@ class TableWorkflow:
def test_method(self, test_info, recorders, loss_func=loss_func_rmse):
method_name_full = test_info["method_name"]
method_name = method_name_full if method_name_full == "user_model" else "_".join(method_name_full.split("_")[1:])
method = test_methods[method_name_full]
user, idx = test_info["user"], test_info["idx"]
recorder = recorders[method_name_full]
@@ -84,15 +96,45 @@ class TableWorkflow:
if method_name_full == "hetero_single_aug":
if test_info["force"] or recorder.should_test_method(user, idx, save_path):
for learnware in test_info["learnwares"]:
test_info["single_learnware"] = [learnware]
scores = self._limited_data(test_methods[method_name_full], test_info, loss_func)
recorder.record(user, scores)
# # single process testing
# all_scores = []
# for learnware in test_info["learnwares"]:
# test_info["single_learnware"] = [learnware]
# all_scores.append(self._limited_data(test_methods[method_name_full], test_info, loss_func))
# recorder.record(user, all_scores)
# multi process testing
queue = Queue()
processes = []
bar = tqdm(total=len(test_info["learnwares"]), desc=f"Test {method_name}", unit="learnware")
learnware_chunks = [test_info["learnwares"][i:len(test_info["learnwares"]):len(self.cuda_idx)] for i in self.cuda_idx]
for cuda_idx, learnware_chunk in zip(self.cuda_idx, learnware_chunks):
p = Process(target=self.process_learnware_chunk, args=(cuda_idx, method, test_info, loss_func, learnware_chunk, queue))
processes.append(p)
p.start()
all_results = []
while any(p.is_alive() for p in processes) or not queue.empty():
try:
result = queue.get(timeout=0.1)
all_results.append(result)
bar.update(1)
except Empty:
time.sleep(0.1)
continue
bar.close()

process_single_aug(user, idx, scores, recorders, save_root_path)
for p in processes:
p.join()
all_results.sort(key=lambda x: x[0])
all_scores = [result[1] for result in all_results]
recorder.record(user, all_scores)
# process_single_aug(user, idx, all_scores, recorders, save_root_path)
recorder.save(save_path)
else:
process_single_aug(user, idx, recorder.data[user], recorders, save_root_path)
# else:
process_single_aug(user, idx, recorder.data[user][idx], recorders, save_root_path)
else:
if test_info["force"] or recorder.should_test_method(user, idx, save_path):
scores = self._limited_data(test_methods[method_name_full], test_info, loss_func)


+ 145
- 194
examples/dataset_table_workflow/hetero.py View File

@@ -6,220 +6,171 @@ warnings.filterwarnings("ignore")

import json
import numpy as np
from matplotlib import pyplot as plt
import learnware.specification as specification
from learnware.logger import get_module_logger
from learnware.specification import generate_stat_spec
from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.reuse import AveragingReuser, FeatureAlignLearnware
from multiprocessing import Pool
from learnware.reuse import AveragingReuser, JobSelectorReuser, FeatureAlignLearnware

from benchmarks import DataLoader
from examples.dataset_table_workflow.methods import *
from base import TableWorkflow, user_semantic
from methods import *
from base import TableWorkflow
from config import align_model_params
from utils import Recorder, plot_performance_curves, analyze_performance
from utils import Recorder, plot_performance_curves

logger = get_module_logger("hetero_test", level="INFO")
learnware_market = ["pfs_default", "pfs_denis", "corporacion_bojan", "corporacion_lee", "corporacion_lingzhi"]
default_users = ["m5_default"] # "m5_default", "m5_kkiller", "m5_rana"
n_labeled_list = [100, 200, 500, 1000, 2000]
n_repeat_list = [10, 10, 10, 3, 3]

class HeterogeneousWorkflow(TableWorkflow):
def __init__(self, reload_market=False, regenerate_flag=False):
super(HeterogeneousWorkflow, self).__init__(learnware_market)
self.curves_result_path = os.path.join(self.result_path, 'curves')
self.unlabeled_res_path = os.path.join(self.result_path, 'unlabeled')
self.figs_result_path = os.path.join(self.result_path, "figs")

os.makedirs(self.curves_result_path, exist_ok=True)
os.makedirs(self.unlabeled_res_path, exist_ok=True)
os.makedirs(self.figs_result_path, exist_ok=True)

if reload_market:
self.prepare_market(name="hetero", market_id="heterogeneous", regenerate_flag=regenerate_flag)

def test_hetero_unlabeled(self):
hetero_market = instantiate_learnware_market(market_id="heterogeneous", name="hetero")
logger.info("Total Item: %d" % len(hetero_market))
n_labeled_list = [50, 75, 100, 200]
n_repeat_list = [10, 10, 10, 10]

class HeterogeneousDatasetWorkflow(TableWorkflow):
def unlabeled_hetero_table_example(self):
logger.info("Total Item: %d" % len(self.market))
learnware_rmse_list = []
single_score_list = []
job_selector_score_list = []
ensemble_score_list = []
all_learnwares = self.market.get_learnwares()
select_list = defaultdict(list)
avg_list = defaultdict(list)
oracle_list = defaultdict(list)
improve_list = defaultdict(list)
ensemble_score_list = defaultdict(list)
user_model_score_lists = defaultdict(list)

user_model_method = partial(self._limited_data, HeteroMethods.user_model_score)

for user in default_users:
user_unlabeld_res_path = os.path.join(self.unlabeled_res_path, f"{user}.json")
if os.path.exists(user_unlabeld_res_path):
with open(user_unlabeld_res_path, "rb") as file:
unlabeld_res = json.load(file)
select_list[user] = unlabeld_res["select_list"]
avg_list[user] = unlabeld_res["avg_list"]
oracle_list[user] = unlabeld_res["oracle_list"]
ensemble_score_list[user] = unlabeld_res["ensemble_score_list"]
user_model_score_lists[user] = unlabeld_res["user_model_score_lists"]
else:
data_loader = DataLoader(user)
idx_list = data_loader.get_shop_ids()
for idx in idx_list:
_, _, test_x, test_y, feature_descriptions = data_loader.get_raw_data(idx)
train_x_list, _ = data_loader.get_labeled_training_data(idx, size_list=n_labeled_list, n_repeat_list=n_repeat_list)
user_stat_spec = specification.RKMETableSpecification()
user_stat_spec.generate_stat_spec_from_data(X=test_x)

feature_dim = len(feature_descriptions)
feature_descriptions_dict = {str(i): feature_descriptions[i] for i in range(feature_dim)}
input_description = {
"Dimension": feature_dim,
"Description": feature_descriptions_dict
}
user_semantic["Input"] = input_description
user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMETableSpecification": user_stat_spec})
logger.info(f"Searching Market for user: {user}_{idx}")
search_result = hetero_market.search_learnware(user_info)
single_result = search_result.get_single_results()
multiple_result = search_result.get_multiple_results()
logger.info(f"hetero search result of user {user}_{idx}:")
logger.info(
f"single model num: {len(single_result)}, max_score: {single_result[0].score}, min_score: {single_result[-1].score}"
)

l = len(single_result)
rmse_list = []
for idx in range(l):
hetero_learnware = FeatureAlignLearnware(single_result[idx].learnware, **align_model_params)
hetero_learnware.align(user_rkme=user_stat_spec)
pred_y = hetero_learnware.predict(test_x)
rmse_list.append(loss_func_rmse(pred_y, test_y))
logger.info(
f"Top1-score: {single_result[0].score}, learnware_id: {single_result[0].learnware.id}, rmse: {rmse_list[0]}"
)
if len(multiple_result) > 0:
mixture_id = " ".join([learnware.id for learnware in multiple_result[0].learnwares])
logger.info(f"mixture_score: {multiple_result[0].score}, mixture_learnware: {mixture_id}")
mixture_learnware_list = []
for learnware in multiple_result[0].learnwares:
hetero_learnware = FeatureAlignLearnware(learnware, **align_model_params)
hetero_learnware.align(user_rkme=user_stat_spec)
mixture_learnware_list.append(hetero_learnware)
else:
hetero_learnware = FeatureAlignLearnware(single_result[0].learnware, **align_model_params)
hetero_learnware.align(user_rkme=user_stat_spec)
mixture_learnware_list = [hetero_learnware]
# test user model
user_model_score_list = user_model_method(train_x_list, test_x, test_y, data_loader)

# test reuse (ensemble)
reuse_ensemble = AveragingReuser(learnware_list=mixture_learnware_list, mode="mean")
ensemble_predict_y = reuse_ensemble.predict(user_data=test_x)
ensemble_score = loss_func_rmse(ensemble_predict_y, test_y)
ensemble_score_list[user].append(ensemble_score)
logger.info(f"mixture reuse rmse (ensemble): {ensemble_score}")

select_list[user].append(rmse_list[0])
avg_list[user].append(np.mean(rmse_list))
oracle_list[user].append(np.min(rmse_list))
improve_list[user].append((np.mean(rmse_list) - rmse_list[0]) / np.mean(rmse_list))
user_model_score_lists[user].append(user_model_score_list)
logger.info(f"Saving unlabeled results for User: {user}")
res = {
"select_list": select_list[user],
"avg_list": avg_list[user],
"oracle_list": oracle_list[user],
"ensemble_score_list": ensemble_score_list[user],
"user_model_score_lists": user_model_score_lists[user]
}
with open(user_unlabeld_res_path, "w") as file:
json.dump(res, file, indent=4)

for user in default_users:
logger.info(f"User Dataset: {user}")
user = self.benchmark.name
for idx in range(self.benchmark.user_num):
test_x, test_y = self.benchmark.get_test_data(user_ids=idx)
test_x, test_y = test_x.values, test_y.values
user_stat_spec = generate_stat_spec(type="table", X=test_x)
user_info = BaseUserInfo(
semantic_spec=self.user_semantic, stat_info={user_stat_spec.type: user_stat_spec}
)
logger.info(f"Searching Market for user: {user}_{idx}")
search_result = self.market.search_learnware(user_info)
single_result = search_result.get_single_results()
multiple_result = search_result.get_multiple_results()
logger.info(f"hetero search result of user {user}_{idx}:")
logger.info(
"RMSE of selected learnware: %.3f +/- %.3f, Average performance: %.3f +/- %.3f, Oracle performace: %.3f +/- %.3f"
% (np.mean(select_list[user]), np.std(select_list[user]), np.mean(avg_list[user]), np.std(avg_list[user]), np.mean(oracle_list[user]), np.std(oracle_list[user]))
f"single model num: {len(single_result)}, max_score: {single_result[0].score}, min_score: {single_result[-1].score}"
)
pred_y = single_result[0].learnware.predict(test_x)
single_score_list.append(loss_func_rmse(pred_y, test_y))

rmse_list = []
for learnware in all_learnwares:
hetero_learnware = FeatureAlignLearnware(learnware, **align_model_params)
hetero_learnware.align(user_rkme=user_stat_spec)
pred_y = hetero_learnware.predict(test_x)
rmse_list.append(loss_func_rmse(pred_y, test_y))
logger.info(
"Averaging Ensemble Reuse Performance: %.3f +/- %.3f"
% (np.mean(ensemble_score_list[user]), np.std(ensemble_score_list[user]))
f"Top1-score: {single_result[0].score}, learnware_id: {single_result[0].learnware.id}, rmse: {single_score_list[0]}"
)
for idx, n_labeled in enumerate(n_labeled_list):
n_labeled_score_list = [lst[idx] for lst in user_model_score_lists[user]]
logger.info(
"User Model with %d data: %.3f +/ %.3f"
% (n_labeled, np.mean(n_labeled_score_list), np.std(n_labeled_score_list))
)

def test_hetero_labeled(self):
hetero_market = instantiate_learnware_market(market_id="heterogeneous", name="hetero")
logger.info("Total Items: %d" % len(hetero_market))

methods = ["user_model", "hetero_single_aug", "hetero_multiple_aug", "hetero_multiple_avg", "hetero_ensemble_pruning"]
methods_to_test = ["hetero_multiple_avg", "hetero_ensemble_pruning"]
if len(multiple_result) > 0:
mixture_id = " ".join([learnware.id for learnware in multiple_result[0].learnwares])
logger.info(f"mixture_score: {multiple_result[0].score}, mixture_learnware: {mixture_id}")
mixture_learnware_list = []
for learnware in multiple_result[0].learnwares:
hetero_learnware = FeatureAlignLearnware(learnware, **align_model_params)
hetero_learnware.align(user_rkme=user_stat_spec)
mixture_learnware_list.append(hetero_learnware)
else:
hetero_learnware = FeatureAlignLearnware(single_result[0].learnware, **align_model_params)
hetero_learnware.align(user_rkme=user_stat_spec)
mixture_learnware_list = [hetero_learnware]
# test reuse (job selector)
resue_baseline = JobSelectorReuser(learnware_list=mixture_learnware_list, herding_num=100)
reuse_predict = resue_baseline.predict(user_data=test_x)
reuse_score = loss_func_rmse(reuse_predict, test_y)
job_selector_score_list.append(reuse_score)
logger.info(f"mixture reuse rmse (job selector): {reuse_score}")

# test reuse (ensemble)
reuse_ensemble = AveragingReuser(learnware_list=mixture_learnware_list, mode="mean")
ensemble_predict_y = reuse_ensemble.predict(user_data=test_x)
ensemble_score = loss_func_rmse(ensemble_predict_y, test_y)
ensemble_score_list[user].append(ensemble_score)
logger.info(f"mixture reuse rmse (ensemble): {ensemble_score}")

learnware_rmse_list.append(rmse_list)
single_list = np.array(learnware_rmse_list)
avg_score_list = [np.mean(lst, axis=0) for lst in single_list]
oracle_score_list = [np.min(lst, axis=0) for lst in single_list]
logger.info(
"RMSE of selected learnware: %.3f +/- %.3f, Average performance: %.3f +/- %.3f, Oracle performace: %.3f +/- %.3f"
% (
np.mean(single_score_list),
np.std(single_score_list),
np.mean(avg_score_list),
np.std(avg_score_list),
np.mean(oracle_score_list),
np.std(oracle_score_list),
)
)
logger.info(
"Average Job Selector Reuse Performance: %.3f +/- %.3f"
% (np.mean(job_selector_score_list), np.std(job_selector_score_list))
)
logger.info(
"Averaging Ensemble Reuse Performance: %.3f +/- %.3f"
% (np.mean(ensemble_score_list), np.std(ensemble_score_list))
)

def labeled_hetero_table_example(self):
logger.info("Total Items: %d" % len(self.market))
methods = ["user_model", "hetero_single_aug", "hetero_multiple_avg", "hetero_ensemble_pruning"]
methods_to_test = []
recorders = {method: Recorder() for method in methods + ["select_score", "oracle_score", "mean_score"]}

for user in default_users:
data_loader = DataLoader(user)
idx_list = data_loader.get_shop_ids()
for idx in idx_list:
_, _, test_x, test_y, feature_descriptions = data_loader.get_raw_data(idx)
train_subsets = data_loader.get_labeled_training_data(
idx,
size_list=n_labeled_list,
n_repeat_list=n_repeat_list
)
user_stat_spec = specification.RKMETableSpecification()
user_stat_spec.generate_stat_spec(X=test_x)

input_description = {
"Dimension": len(feature_descriptions),
"Description": {str(i): feature_descriptions[i] for i in range(len(feature_descriptions))}
}
user_semantic["Input"] = input_description
user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMETableSpecification": user_stat_spec})
logger.info(f"Searching Market for user: {user}_{idx}")

search_result = hetero_market.search_learnware(user_info, max_search_num=10)
single_result = search_result.get_single_results()
multiple_result = search_result.get_multiple_results()

if len(multiple_result) > 0:
mixture_id = " ".join([learnware.id for learnware in multiple_result[0].learnwares])
logger.info(f"Mixture score: {multiple_result[0].score}, Mixture learnware: {mixture_id}")
mixture_learnware_list = multiple_result[0].learnwares
else:
mixture_learnware_list = [single_result[0].learnware]
logger.info(f"Hetero search result of user {user}_{idx}: mixture learnware num: {len(mixture_learnware_list)}")
user = self.benchmark.name
for idx in range(self.benchmark.user_num):
test_x, test_y = self.benchmark.get_test_data(user_ids=idx)
test_x, test_y = test_x.values, test_y.values
train_x, train_y = self.benchmark.get_train_data(user_ids=idx)
train_x, train_y = train_x.values, train_y.values
train_subsets = self.get_train_subsets(idx, train_x, train_y)
user_stat_spec = generate_stat_spec(type="table", X=test_x)
user_semantic = self.get_semantic_specifications(user_ids=idx)
user_info = BaseUserInfo(
semantic_spec=user_semantic, stat_info={user_stat_spec.type: user_stat_spec}
)
logger.info(f"Searching Market for user: {user}_{idx}")

test_info = {"user": user, "idx": idx, "train_subsets": train_subsets, "test_x": test_x, "test_y": test_y}
common_config = {"user_rkme": user_stat_spec, "multiple_learnwares": mixture_learnware_list}
method_configs = {
"user_model": {"data_loader": data_loader},
"hetero_single_aug": {"user_rkme": user_stat_spec, "learnwares": [single_result[0].learnware]},
"hetero_multiple_aug": common_config,
"hetero_multiple_avg": common_config,
"hetero_ensemble_pruning": common_config
}
search_result = self.market.search_learnware(user_info, max_search_num=10)
single_result = search_result.get_single_results()
multiple_result = search_result.get_multiple_results()

for method_name in methods:
test_info["method_name"] = method_name
test_info["force"] = method_name in methods_to_test
test_info.update(method_configs[method_name])
self.test_method(test_info, recorders, loss_func=loss_func_rmse)
if len(multiple_result) > 0:
mixture_id = " ".join([learnware.id for learnware in multiple_result[0].learnwares])
logger.info(f"Mixture score: {multiple_result[0].score}, Mixture learnware: {mixture_id}")
mixture_learnware_list = multiple_result[0].learnwares
else:
mixture_learnware_list = [single_result[0].learnware]
logger.info(f"Hetero search result of user {user}_{idx}: mixture learnware num: {len(mixture_learnware_list)}")

test_info = {"user": user, "idx": idx, "train_subsets": train_subsets, "test_x": test_x, "test_y": test_y}
common_config = {"user_rkme": user_stat_spec, "multiple_learnwares": mixture_learnware_list}
method_configs = {
"user_model": {"dataset": self.benchmark.name, "mode_type": "lgb"},
"hetero_single_aug": {"user_rkme": user_stat_spec, "learnwares": [single_result[0].learnware]},
"hetero_multiple_aug": common_config,
"hetero_multiple_avg": common_config,
"hetero_ensemble_pruning": common_config
}

for method_name in methods:
logger.info(f"Testing method {method_name}")
test_info["method_name"] = method_name
test_info["force"] = method_name in methods_to_test
test_info.update(method_configs[method_name])
self.test_method(test_info, recorders, loss_func=loss_func_rmse)
for method, recorder in recorders.items():
recorder.save(os.path.join(self.curves_result_path, f"{user}/{user}_{method}_performance.json"))
methods_to_plot = ["user_model", "select_score", "hetero_multiple_avg", "hetero_ensemble_pruning"]
plot_performance_curves(user, {method: recorders[method] for method in methods_to_plot}, n_labeled_list=n_labeled_list)
plot_performance_curves(user, {method: recorders[method] for method in methods_to_plot}, task="Hetero", n_labeled_list=n_labeled_list)

Loading…
Cancel
Save