Browse Source

Merge pull request #48 from Learnware-LAMDA/feature/hetero

[ENH] add heterogeneous market support
tags/v0.3.2
bxdd GitHub 2 years ago
parent
commit
c2d5604f8e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 3383 additions and 313 deletions
  1. +2
    -1
      .gitignore
  2. +3
    -3
      examples/dataset_m5_workflow/main.py
  3. +3
    -3
      examples/dataset_pfs_workflow/main.py
  4. +3
    -3
      examples/workflow_by_code/main.py
  5. +1
    -8
      learnware/client/learnware_client.py
  6. +0
    -1
      learnware/client/package_utils.py
  7. +3
    -2
      learnware/config.py
  8. +1
    -1
      learnware/market/__init__.py
  9. +2
    -2
      learnware/market/anchor/user_info.py
  10. +62
    -27
      learnware/market/base.py
  11. +1
    -1
      learnware/market/easy/__init__.py
  12. +4
    -40
      learnware/market/easy/database_ops.py
  13. +8
    -13
      learnware/market/easy/organizer.py
  14. +2
    -7
      learnware/market/easy/searcher.py
  15. +0
    -1
      learnware/market/hetergeneous/__init__.py
  16. +0
    -96
      learnware/market/hetergeneous/organizer.py
  17. +2
    -0
      learnware/market/heterogeneous/__init__.py
  18. +306
    -0
      learnware/market/heterogeneous/organizer/__init__.py
  19. +19
    -0
      learnware/market/heterogeneous/organizer/hetero_map/README.md
  20. +635
    -0
      learnware/market/heterogeneous/organizer/hetero_map/__init__.py
  21. +378
    -0
      learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py
  22. +364
    -0
      learnware/market/heterogeneous/organizer/hetero_map/trainer.py
  23. +52
    -0
      learnware/market/heterogeneous/searcher.py
  24. +44
    -0
      learnware/market/heterogeneous/utils.py
  25. +42
    -15
      learnware/market/module.py
  26. +7
    -1
      learnware/market/utils.py
  27. +11
    -1
      learnware/reuse/__init__.py
  28. +25
    -0
      learnware/reuse/align.py
  29. +3
    -3
      learnware/reuse/averaging.py
  30. +2
    -2
      learnware/reuse/ensemble_pruning.py
  31. +111
    -0
      learnware/reuse/feature_augment.py
  32. +2
    -0
      learnware/reuse/hetero/__init__.py
  33. +331
    -0
      learnware/reuse/hetero/feature_align.py
  34. +92
    -0
      learnware/reuse/hetero/hetero_map.py
  35. +4
    -6
      learnware/reuse/job_selector.py
  36. +32
    -0
      learnware/reuse/utils.py
  37. +6
    -3
      learnware/specification/__init__.py
  38. +10
    -5
      learnware/specification/module.py
  39. +1
    -1
      learnware/specification/regular/__init__.py
  40. +1
    -1
      learnware/specification/regular/base.py
  41. +4
    -3
      learnware/specification/regular/image/rkme.py
  42. +3
    -42
      learnware/specification/regular/table/rkme.py
  43. +1
    -0
      learnware/specification/regular/text/rkme.py
  44. +1
    -0
      learnware/specification/system/__init__.py
  45. +13
    -0
      learnware/specification/system/base.py
  46. +0
    -15
      learnware/specification/system/heter_table.py
  47. +160
    -0
      learnware/specification/system/hetero_table.py
  48. +1
    -0
      learnware/utils/__init__.py
  49. +46
    -0
      learnware/utils/gpu.py
  50. +1
    -0
      setup.py
  51. +100
    -0
      tests/test_hetero_market/example_learnwares/config.py
  52. +8
    -0
      tests/test_hetero_market/example_learnwares/learnware.yaml
  53. +16
    -0
      tests/test_hetero_market/example_learnwares/model0.py
  54. +16
    -0
      tests/test_hetero_market/example_learnwares/model1.py
  55. +1
    -0
      tests/test_hetero_market/example_learnwares/requirements.txt
  56. +425
    -0
      tests/test_hetero_market/test_hetero.py
  57. +2
    -2
      tests/test_specification/test_rkme.py
  58. +10
    -4
      tests/test_workflow/test_workflow.py

+ 2
- 1
.gitignore View File

@@ -27,6 +27,7 @@ dist/
*.db
*.json
*.zip
*.bin

# special software
.pytest_cache/
@@ -42,4 +43,4 @@ cache/
tmp/
learnware_pool/
PFS/
data/
data/

+ 3
- 3
examples/dataset_m5_workflow/main.py View File

@@ -9,7 +9,7 @@ from shutil import copyfile, rmtree
import learnware
from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.reuse import JobSelectorReuser, AveragingReuser
from learnware.specification import generate_rkme_spec
from learnware.specification import generate_rkme_table_spec
from m5 import DataLoader
from learnware.logger import get_module_logger

@@ -98,7 +98,7 @@ class M5DatasetWorkflow:
for idx in tqdm(idx_list):
train_x, train_y, test_x, test_y = m5.get_idx_data(idx)
st = time.time()
spec = generate_rkme_spec(X=train_x, gamma=0.1, cuda_idx=0)
spec = generate_rkme_table_spec(X=train_x, gamma=0.1, cuda_idx=0)
ed = time.time()
logger.info("Stat spec generated in %.3f s" % (ed - st))

@@ -150,7 +150,7 @@ class M5DatasetWorkflow:

for idx in idx_list:
train_x, train_y, test_x, test_y = m5.get_idx_data(idx)
user_spec = generate_rkme_spec(X=test_x, gamma=0.1, cuda_idx=0)
user_spec = generate_rkme_table_spec(X=test_x, gamma=0.1, cuda_idx=0)
user_spec_path = f"./user_spec/user_{idx}.json"
user_spec.save(user_spec_path)



+ 3
- 3
examples/dataset_pfs_workflow/main.py View File

@@ -9,7 +9,7 @@ from shutil import copyfile, rmtree
import learnware
from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.reuse import JobSelectorReuser, AveragingReuser
from learnware.specification import generate_rkme_spec
from learnware.specification import generate_rkme_table_spec
from pfs import Dataloader
from learnware.logger import get_module_logger

@@ -95,7 +95,7 @@ class PFSDatasetWorkflow:
for idx in tqdm(idx_list):
train_x, train_y, test_x, test_y = pfs.get_idx_data(idx)
st = time.time()
spec = generate_rkme_spec(X=train_x, gamma=0.1, cuda_idx=0)
spec = generate_rkme_table_spec(X=train_x, gamma=0.1, cuda_idx=0)
ed = time.time()
logger.info("Stat spec generated in %.3f s" % (ed - st))

@@ -147,7 +147,7 @@ class PFSDatasetWorkflow:

for idx in idx_list:
train_x, train_y, test_x, test_y = pfs.get_idx_data(idx)
user_spec = generate_rkme_spec(X=test_x, gamma=0.1, cuda_idx=0)
user_spec = generate_rkme_table_spec(X=test_x, gamma=0.1, cuda_idx=0)
user_spec_path = f"./user_spec/user_{idx}.json"
user_spec.save(user_spec_path)



+ 3
- 3
examples/workflow_by_code/main.py View File

@@ -12,7 +12,7 @@ from shutil import copyfile, rmtree
import learnware
from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.reuse import JobSelectorReuser, AveragingReuser
from learnware.specification import generate_rkme_spec, RKMETableSpecification
from learnware.specification import generate_rkme_table_spec, RKMETableSpecification

curr_root = os.path.dirname(os.path.abspath(__file__))

@@ -53,7 +53,7 @@ class LearnwareMarketWorkflow:

joblib.dump(clf, os.path.join(dir_path, "svm.pkl"))

spec = generate_rkme_spec(X=data_X, gamma=0.1, cuda_idx=0)
spec = generate_rkme_table_spec(X=data_X, gamma=0.1, cuda_idx=0)
spec.save(os.path.join(dir_path, "svm.json"))

init_file = os.path.join(dir_path, "__init__.py")
@@ -173,7 +173,7 @@ class LearnwareMarketWorkflow:
X, y = load_digits(return_X_y=True)
_, data_X, _, data_y = train_test_split(X, y, test_size=0.3, shuffle=True)

stat_spec = generate_rkme_spec(X=data_X, gamma=0.1, cuda_idx=0)
stat_spec = generate_rkme_table_spec(X=data_X, gamma=0.1, cuda_idx=0)
user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMETableSpecification": stat_spec})

_, _, _, mixture_learnware_list = easy_market.search_learnware(user_info)


+ 1
- 8
learnware/client/learnware_client.py View File

@@ -199,9 +199,6 @@ class LearnwareClient:
if semantic_specification is None:
semantic_specification = {}

semantic_specification.pop("Input", None)
semantic_specification.pop("Output", None)

if stat_spec is None:
files = None
else:
@@ -265,11 +262,7 @@ class LearnwareClient:
semantic_specification["Input"] = input_description
semantic_specification["Output"] = output_description

if self._check_semantic_specification(semantic_specification):
return semantic_specification
else:
logger.error("The parameters passed in create_semantic_specification() are illegal!")
return None
return semantic_specification

def list_semantic_specification_values(self, key: SemanticSpecificationKey):
url = f"{self.host}/engine/semantic_specification"


+ 0
- 1
learnware/client/package_utils.py View File

@@ -111,7 +111,6 @@ def filter_nonexist_conda_packages(packages: list) -> Tuple[List[str], List[str]
last_bracket = stdout.rfind("\n{")
if last_bracket != -1:
stdout = stdout[last_bracket:]
pass
print(stdout)
output = json.loads(stdout).get("bad_deps", [])



+ 3
- 2
learnware/config.py View File

@@ -13,8 +13,6 @@ class Config:
if os.path.exists(config_file):
with open(config_file, "r") as f:
self.__dict__["_config"].update(json.load(f))
pass
pass

def __getitem__(self, key):
return self.__dict__["_config"][key]
@@ -65,11 +63,13 @@ LEARNWARE_FOLDER_POOL_PATH = os.path.join(LEARNWARE_POOL_PATH, "learnwares")

DATABASE_PATH = os.path.join(ROOT_DIRPATH, "database")
STDOUT_PATH = os.path.join(ROOT_DIRPATH, "stdout")
CACHE_PATH = os.path.join(ROOT_DIRPATH, "cache")

# TODO: Delete them later
os.makedirs(ROOT_DIRPATH, exist_ok=True)
os.makedirs(DATABASE_PATH, exist_ok=True)
os.makedirs(STDOUT_PATH, exist_ok=True)
os.makedirs(CACHE_PATH, exist_ok=True)

semantic_config = {
"Data": {
@@ -125,6 +125,7 @@ _DEFAULT_CONFIG = {
"root_path": ROOT_DIRPATH,
"package_path": PACKAGE_DIRPATH,
"stdout_path": STDOUT_PATH,
"cache_path": CACHE_PATH,
"logging_level": logging.INFO,
"logging_outfile": None,
"semantic_specs": semantic_config,


+ 1
- 1
learnware/market/__init__.py View File

@@ -3,7 +3,7 @@ from .base import BaseUserInfo, LearnwareMarket, BaseChecker, BaseOrganizer, Bas
from .evolve_anchor import EvolvedAnchoredOrganizer
from .evolve import EvolvedOrganizer
from .easy import EasyOrganizer, EasySearcher, EasySemanticChecker, EasyStatChecker
from .hetergeneous import HeterogeneousOrganizer, MappingFunction
from .heterogeneous import HeteroMapTableOrganizer, HeteroSearcher

from .classes import CondaChecker
from .module import instantiate_learnware_market

+ 2
- 2
learnware/market/anchor/user_info.py View File

@@ -29,13 +29,13 @@ class AnchoredUserInfo(BaseUserInfo):
self.anchor_learnware_ids += learnware_ids

def update_stat_info(self, name: str, item: Any):
"""Update stat_info based on anchor learnwares
"""Update stat_info by market or user with anchor learnwares

Parameters
----------
name : str
Name of stat_info
item : Any
Statistical information calculated on anchor learnwares
Statistical information calculated by market or user with anchor learnwares
"""
self.stat_info[name] = item

+ 62
- 27
learnware/market/base.py View File

@@ -42,32 +42,59 @@ class BaseUserInfo:
def get_stat_info(self, name: str):
return self.stat_info.get(name, None)

def update_stat_info(self, name: str, item: Any):
"""Update stat_info by market

Parameters
----------
name : str
Name of stat_info
item : Any
Statistical information calculated by market
"""
self.stat_info[name] = item


class LearnwareMarket:
"""Base interface for market, it provide the interface of search/add/detele/update learnwares"""

def __init__(
self,
market_id: str = "default",
organizer: BaseOrganizer = None,
searcher: BaseSearcher = None,
organizer: BaseOrganizer,
searcher: BaseSearcher,
checker_list: List[BaseChecker] = None,
rebuild=False,
**kwargs,
):
self.market_id = market_id
self.learnware_organizer = BaseOrganizer() if organizer is None else organizer
self.learnware_organizer.reset(market_id=market_id)
self.learnware_organizer.reload_market(rebuild=rebuild)
self.learnware_searcher = BaseSearcher() if searcher is None else searcher
self.learnware_searcher.reset(organizer=self.learnware_organizer)

if checker_list is None:
self.learnware_checker = {"BaseChecker": BaseChecker()}
else:
self.learnware_checker = {checker.__class__.__name__: checker for checker in checker_list}
for name, checker in self.learnware_checker.items():
self.learnware_organizer = organizer
self.learnware_searcher = searcher
checker_list = [] if checker_list is None else checker_list
self.learnware_checker = {checker.__class__.__name__: checker for checker in checker_list}

for checker in self.learnware_checker.values():
checker.reset(organizer=self.learnware_organizer)

@property
def market_id(self):
return self.learnware_organizer.market_id

def reset(self, organizer_kwargs=None, searcher_kwargs=None, checker_kwargs=None, **kwargs):
if organizer_kwargs is not None:
self.learnware_organizer.reset(**organizer_kwargs)

if searcher_kwargs is not None:
self.learnware_searcher.reset(**searcher_kwargs)

if checker_kwargs is not None:
if len(set(checker_kwargs) & set(self.learnware_checker)):
for name, checker in self.learnware_checker.items():
checker.reset(**checker_kwargs.get(name, {}))
else:
for checker in self.learnware_checker.values():
checker.reset(**checker_kwargs)

for _k, _v in kwargs.items():
setattr(self, _k, _v)

def reload_market(self, **kwargs) -> bool:
self.learnware_organizer.reload_market(**kwargs)

@@ -242,11 +269,12 @@ class LearnwareMarket:


class BaseOrganizer:
def __init__(self, market_id=None):
self.reset(market_id=market_id)
def __init__(self, market_id, **kwargs):
self.reset(market_id=market_id, **kwargs)

def reset(self, market_id=None, **kwargs):
def reset(self, market_id, rebuild=False, **kwargs):
self.market_id = market_id
self.reload_market(rebuild=rebuild, **kwargs)

def reload_market(self, rebuild=False, **kwargs) -> bool:
"""Reload the learnware organizer when server restared.
@@ -416,10 +444,10 @@ class BaseOrganizer:


class BaseSearcher:
def __init__(self, organizer: BaseOrganizer = None):
self.learnware_organizer = organizer
def __init__(self, organizer: BaseOrganizer, **kwargs):
self.reset(organizer=organizer, **kwargs)

def reset(self, organizer):
def reset(self, organizer: BaseOrganizer, **kwargs):
self.learnware_organizer = organizer

def __call__(self, user_info: BaseUserInfo, check_status: int = None):
@@ -441,11 +469,8 @@ class BaseChecker:
NONUSABLE_LEARNWARE = 0
USABLE_LEARWARE = 1

def __init__(self, organizer: BaseOrganizer = None):
self.learnware_organizer = organizer

def reset(self, organizer):
self.learnware_organizer = organizer
def reset(self, **kwargs):
pass

def __call__(self, learnware: Learnware) -> Tuple[int, str]:
"""Check the utility of a learnware
@@ -468,3 +493,13 @@ class BaseChecker:
"""

raise NotImplementedError("'__call__' method is not implemented in BaseChecker")


class OrganizerRelatedChecker(BaseChecker):
"""Here this is the interface for checker who is related to the organizer"""

def __init__(self, organizer: BaseOrganizer, **kwargs):
self.reset(organizer=organizer, **kwargs)

def reset(self, organizer: BaseOrganizer, **kwargs):
self.learnware_organizer = organizer

+ 1
- 1
learnware/market/easy/__init__.py View File

@@ -11,5 +11,5 @@ if not is_torch_avaliable(verbose=False):
EasyStatChecker = None
logger.warning("EasySeacher and EasyChecker are skipped because 'torch' is not installed!")
else:
from .searcher import EasySearcher
from .searcher import EasySearcher, EasyStatSearcher, EasyFuzzSemanticSearcher, EasyExactSemanticSearcher
from .checker import EasySemanticChecker, EasyStatChecker

+ 4
- 40
learnware/market/easy/database_ops.py View File

@@ -19,8 +19,6 @@ class Learnware(DeclarativeBase):
folder_path = Column(Text, nullable=False)
use_flag = Column(Text, nullable=False)

pass


class DatabaseOperations(object):
def __init__(self, url: str, database_name: str):
@@ -28,13 +26,10 @@ class DatabaseOperations(object):
url = os.path.join(url, f"{database_name}.db")
else:
url = f"{url}/{database_name}"
pass

self.url = url
self.create_database_if_not_exists(url)

pass

def create_database_if_not_exists(self, url):
database_exists = True

@@ -44,12 +39,10 @@ class DatabaseOperations(object):
path = url[start + 4 :]
if os.path.exists(path):
database_exists = True
pass
else:
database_exists = False
os.makedirs(os.path.dirname(path), exist_ok=True)
pass
pass

elif self.url.startswith("postgresql"):
# it is postgresql
dbname_start = url.rfind("/")
@@ -63,37 +56,27 @@ class DatabaseOperations(object):

for row in result.fetchall():
db_list.add(row[0].lower())
pass

if dbname.lower() not in db_list:
database_exists = False
conn.execution_options(isolation_level="AUTOCOMMIT").execute(
text("CREATE DATABASE {0};".format(dbname))
)
pass
else:
database_exists = True
pass
pass
engine.dispose()
pass
else:
raise Exception(f"Unsupported database url: {self.url}")
pass

self.engine = create_engine(url, future=True)

if not database_exists:
DeclarativeBase.metadata.create_all(self.engine)
pass
pass

def clear_learnware_table(self):
with self.engine.connect() as conn:
conn.execute(text("DELETE FROM tb_learnware;"))
conn.commit()
pass
pass

def add_learnware(self, id: str, semantic_spec: dict, zip_path, folder_path, use_flag: str):
with self.engine.connect() as conn:
@@ -114,15 +97,11 @@ class DatabaseOperations(object):
),
)
conn.commit()
pass
pass

def delete_learnware(self, id: str):
with self.engine.connect() as conn:
conn.execute(text("DELETE FROM tb_learnware WHERE id=:id;"), dict(id=id))
conn.commit()
pass
pass

def update_learnware_semantic_specification(self, id: str, semantic_spec: dict):
with self.engine.connect() as conn:
@@ -132,8 +111,6 @@ class DatabaseOperations(object):
dict(id=id, semantic_spec=semantic_spec_str),
)
conn.commit()
pass
pass

def update_learnware_use_flag(self, id: str, use_flag: str):
with self.engine.connect() as conn:
@@ -142,8 +119,6 @@ class DatabaseOperations(object):
dict(id=id, use_flag=use_flag),
)
conn.commit()
pass
pass

def get_learnware_semantic_specification(self, id: str):
with self.engine.connect() as conn:
@@ -153,8 +128,6 @@ class DatabaseOperations(object):
return None
else:
return json.loads(row[0])
pass
pass

def get_learnware_use_flag(self, id: str):
with self.engine.connect() as conn:
@@ -164,8 +137,6 @@ class DatabaseOperations(object):
return None
else:
return int(row[0])
pass
pass

def get_learnware_info(self, id: str):
with self.engine.connect() as conn:
@@ -187,8 +158,6 @@ class DatabaseOperations(object):
"folder_path": folder_path,
"use_flag": use_flag,
}
pass
pass

def load_market(self):
with self.engine.connect() as conn:
@@ -205,12 +174,10 @@ class DatabaseOperations(object):
semantic_spec_dict = json.loads(semantic_spec)
try:
new_learnware = get_learnware_from_dirpath(
id=id, semantic_spec=semantic_spec_dict, learnware_dirpath=folder_path
id=id, semantic_spec=semantic_spec_dict, learnware_dirpath=folder_path, ignore_error=False
)
assert new_learnware is not None, "learnware must not be None"
logger.info(f"Load learnware: {id}")
logger.info(f"Load learnware {id} succeed!")
except Exception as err:
logger.error(f"Load learnware {id} failed due to {err}!")
continue

learnware_list[id] = new_learnware
@@ -218,8 +185,5 @@ class DatabaseOperations(object):
folder_list[id] = folder_path
use_flags[id] = int(use_flag)
max_count = max(max_count, int(id))
pass
return learnware_list, zip_list, folder_list, use_flags, max_count + 1
pass

pass
return learnware_list, zip_list, folder_list, use_flags, max_count + 1

+ 8
- 13
learnware/market/easy/organizer.py View File

@@ -17,12 +17,12 @@ logger = get_module_logger("easy_organizer")

class EasyOrganizer(BaseOrganizer):
def reload_market(self, rebuild=False) -> bool:
"""Reload the learnware organizer when server restared.
"""Reload the learnware organizer when server restarted.

Returns
-------
bool
A flag indicating whether the market is reload successfully.
A flag indicating whether the market is reloaded successfully.
"""
self.market_store_path = os.path.join(conf.market_root_path, self.market_id)
self.learnware_pool_path = os.path.join(self.market_store_path, "learnware_pool")
@@ -41,8 +41,8 @@ class EasyOrganizer(BaseOrganizer):
try:
self.dbops.clear_learnware_table()
rmtree(self.learnware_pool_path)
except:
pass
except Exception as err:
logger.error(f"Clear current database failed due to {err}!!")

os.makedirs(self.learnware_pool_path, exist_ok=True)
os.makedirs(self.learnware_zip_pool_path, exist_ok=True)
@@ -97,6 +97,7 @@ class EasyOrganizer(BaseOrganizer):
id=learnware_id, semantic_spec=semantic_spec, learnware_dirpath=target_folder_dir
)
except:
logger.warning("New learnware is not properly added!")
try:
os.remove(target_zip_dir)
rmtree(target_folder_dir)
@@ -145,7 +146,7 @@ class EasyOrganizer(BaseOrganizer):
zip_dir = self.learnware_zip_list[id]
if os.path.exists(zip_dir):
os.remove(zip_dir)
pass
folder_dir = self.learnware_folder_list[id]
rmtree(folder_dir, ignore_errors=True)
self.learnware_list.pop(id)
@@ -233,7 +234,7 @@ class EasyOrganizer(BaseOrganizer):
----------
ids : Union[str, List[str]]
Give a id or a list of ids
str: id of targer learware
str: id of target learware
List[str]: A list of ids of target learnwares

Returns
@@ -373,13 +374,8 @@ class EasyOrganizer(BaseOrganizer):
return [self.learnware_list[idx] for idx in learnware_ids]

def reload_learnware(self, learnware_id: str):
current_learnware = self.learnware_list.get(learnware_id)

if current_learnware is None:
# add learnware
if learnware_id not in self.learnware_list:
self.count += 1
else:
pass

target_zip_dir = os.path.join(self.learnware_zip_pool_path, "%s.zip" % (learnware_id))
target_folder_dir = os.path.join(self.learnware_folder_pool_path, learnware_id)
@@ -390,7 +386,6 @@ class EasyOrganizer(BaseOrganizer):
id=learnware_id, semantic_spec=semantic_spec, learnware_dirpath=target_folder_dir
)
self.use_flags[learnware_id] = self.dbops.get_learnware_use_flag(learnware_id)
pass

def get_learnware_info_from_storage(self, learnware_id: str) -> Dict:
"""return learnware zip path and semantic_specification from storage


+ 2
- 7
learnware/market/easy/searcher.py View File

@@ -38,7 +38,6 @@ class EasyExactSemanticSearcher(BaseSearcher):
v1 = v1.lower()
if v1 not in name2 and v1 not in description2:
return False
pass
else:
if len(v2) == 0:
# user input contains some key that is not in database
@@ -54,9 +53,6 @@ class EasyExactSemanticSearcher(BaseSearcher):
elif semantic_spec1[key]["Type"] == "Tag":
if not (set(v1) & set(v2)):
return False
pass
pass
pass

return True

@@ -435,7 +431,6 @@ class EasyStatSearcher(BaseSearcher):
):
continue

# TODO: must we check dim for Text and Image specification?
rkme_dim = str(list(rkme.get_z().shape)[1:])
if rkme_dim == user_rkme_dim:
filtered_learnware_list.append(learnware)
@@ -598,10 +593,10 @@ class EasyStatSearcher(BaseSearcher):


class EasySearcher(BaseSearcher):
def __init__(self, organizer: EasyOrganizer = None):
super(EasySearcher, self).__init__(organizer)
def __init__(self, organizer: EasyOrganizer):
self.semantic_searcher = EasyFuzzSemanticSearcher(organizer)
self.stat_searcher = EasyStatSearcher(organizer)
super(EasySearcher, self).__init__(organizer)

def reset(self, organizer):
self.learnware_organizer = organizer


+ 0
- 1
learnware/market/hetergeneous/__init__.py View File

@@ -1 +0,0 @@
from .organizer import MappingFunction, HeterogeneousOrganizer

+ 0
- 96
learnware/market/hetergeneous/organizer.py View File

@@ -1,96 +0,0 @@
import numpy as np
from typing import List

from ..evolve.organizer import EvolvedOrganizer
from ...learnware import Learnware


class MappingFunction:
def __init__(self) -> None:
pass

def transform(X: np.ndarray) -> np.ndarray:
"""transform the data in one feature space to another feature space.

Parameters
----------
X : np.ndarray
data in one feature space

Returns
-------
np.ndarray
transformed data in other feature space
"""
pass


class HeterogeneousOrganizer(EvolvedOrganizer):
"""Organize learnwares with heterogeneous feature spaces, organizer version with evolved learnwares"""

def __init__(self, *args, **kwargs):
super(HeterogeneousOrganizer, self).__init__(*args, **kwargs)
self.mapping_function_list = {}

def _mapping_function_list_initialization(self, learnware_list: List[Learnware]):
"""Initialize mapping functions with all submitted learnwares

Parameters
----------
learnware_list : List[Learnware]
list of learnwares
"""
self.mapping_function_list = self.learn_mapping_functions(learnware_list)

def learn_mapping_functions(self, learnware_list: List[Learnware]) -> List[MappingFunction]:
"""Use all statistical specifications of submitted learnwares to generate mapping functions from each original feature space to subsapce and vice verse.

Parameters
----------
learnware_list : List[Learnware]
list of learnwares

Returns
-------
List[MappingFunction]
list of mapping functions
"""
pass

def transform_original_to_subspace(
self, original_feature_space_idx: int, original_feature: np.ndarray
) -> np.ndarray:
"""Transform feature in a original feature space to the subspace.

Parameters
----------
original_feature_space_idx : int
index of the original feature space
original_feature : np.ndarray
data in the original feature space

Returns
-------
np.ndarray
mapped data in the subspace
"""
pass

def transform_subspace_to_original(
self, original_feature_space_idx: int, subspace_feature: np.ndarray
) -> np.ndarray:
"""Transform feature in the subspace to a original feature space.

Parameters
----------
original_feature_space_idx : int
index of the original feature space
subspace_feature : np.ndarray
data in the subspace

Returns
-------
np.ndarray
mapped data in the original feature space
"""
pass

+ 2
- 0
learnware/market/heterogeneous/__init__.py View File

@@ -0,0 +1,2 @@
from .organizer import HeteroMapTableOrganizer
from .searcher import HeteroSearcher

+ 306
- 0
learnware/market/heterogeneous/organizer/__init__.py View File

@@ -0,0 +1,306 @@
import os
import traceback
import pandas as pd
from collections import defaultdict
from typing import List, Tuple, Union

import pandas as pd

from .hetero_map import HeteroMap, Trainer
from ..utils import is_hetero
from ...base import BaseChecker, BaseUserInfo
from ...easy import EasyOrganizer
from ....learnware import Learnware
from ....logger import get_module_logger
from ....specification import HeteroMapTableSpecification, RKMETableSpecification


logger = get_module_logger("hetero_map_table_organizer")


class HeteroMapTableOrganizer(EasyOrganizer):
def reload_market(self, rebuild=False) -> bool:
"""Reload the heterogeneous learnware organizer when server restarted.

Returns
-------
bool
A flag indicating whether the heterogeneous market is reloaded successfully.
"""
super(HeteroMapTableOrganizer, self).reload_market(rebuild=rebuild)

hetero_folder_path = os.path.join(self.market_store_path, "hetero")
os.makedirs(hetero_folder_path, exist_ok=True)
self.market_mapping_path = os.path.join(hetero_folder_path, "model.bin")
self.hetero_specs_path = os.path.join(hetero_folder_path, "hetero_specifications")
os.makedirs(self.hetero_specs_path, exist_ok=True)

if os.path.exists(self.market_mapping_path):
logger.info(f"Reload market mapping from checkpoint {self.market_mapping_path}")
self.market_mapping = HeteroMap.load(checkpoint=self.market_mapping_path)
if not rebuild:
usable_ids = self.get_learnware_ids(check_status=BaseChecker.USABLE_LEARWARE)
hetero_ids = self._get_hetero_learnware_ids(usable_ids)
for hetero_id in hetero_ids:
self._reload_learnware_hetero_spec(hetero_id)
else:
logger.warning(f"No market mapping to reload!")
self.market_mapping = HeteroMap()

def reset(self, market_id, rebuild=False, auto_update=False, auto_update_limit=100, **training_args):
"""Reset the heterogeneous market with specified settings.

Parameters
----------
market_id : str
the heterogeneous market's id
rebuild : bool, optional
A flag indicating whether to reload market, by default False
auto_update : bool, optional
A flag indicating whether to enable automatic updating of market mapping, by default False
auto_update_limit : int, optional
The threshold for the number of learnwares required to trigger an automatic market mapping update, by default 100
"""
self.auto_update = auto_update
self.auto_update_limit = auto_update_limit
self.count_down = auto_update_limit
self.training_args = training_args

super(HeteroMapTableOrganizer, self).reset(market_id, rebuild)

def add_learnware(
self, zip_path: str, semantic_spec: dict, check_status: int, learnware_id: str = None
) -> Tuple[str, int]:
"""Add a learnware into the heterogeneous learnware market.
Initiates an update of the market mapping if `auto_update` is True and the number of learnwares supporting training reaches `auto_update_limit`.

Parameters
----------
zip_path : str
Filepath for learnware model, a zipped file.
semantic_spec : dict
semantic_spec for new learnware, in dictionary format.
check_status : int
A flag indicating whether the learnware is usable.
learnware_id : str, optional
A id in database for learnware

Returns
-------
Tuple[str, int]
- str indicating model_id
- int indicating the final learnware check_status
"""
learnware_id, learnwere_status = super(HeteroMapTableOrganizer, self).add_learnware(
zip_path, semantic_spec, check_status, learnware_id
)

if learnwere_status == BaseChecker.USABLE_LEARWARE and len(self._get_hetero_learnware_ids(learnware_id)):
self._update_learware_hetero_sepc(learnware_id)

if self.auto_update:
self.count_down -= 1
if self.count_down == 0:
training_learnware_ids = self._get_hetero_learnware_ids(
self.get_learnware_ids(check_status=BaseChecker.USABLE_LEARWARE)
)
training_learnwares = self.get_learnware_by_ids(training_learnware_ids)
logger.info(f"Verified leanwares for training: {training_learnware_ids}")
updated_market_mapping = self.train(
learnware_list=training_learnwares, save_dir=self.market_mapping_path, **self.training_args
)
logger.info(
f"Market mapping train completed. Now update HeteroMapTableSpecification for {training_learnware_ids}"
)
self.market_mapping = updated_market_mapping
self._update_learware_hetero_sepc(training_learnware_ids)

self.count_down = self.auto_update_limit

return learnware_id, learnwere_status

def delete_learnware(self, id: str) -> bool:
"""Delete learnware from heterogeneous learnware market.
If a corresponding HeteroMapTableSpecification exists, it is also removed.

Parameters
----------
id : str
Learnware to be deleted

Returns
-------
bool
True for successful operation.
False for id not found.
"""
flag = super(HeteroMapTableOrganizer, self).delete_learnware(id)
if flag:
hetero_spec_path = os.path.join(self.hetero_specs_path, f"{id}.json")
try:
os.remove(hetero_spec_path)
except FileNotFoundError:
pass
return flag

def update_learnware(
self, id: str, zip_path: str = None, semantic_spec: dict = None, check_status: int = None
) -> bool:
"""Update learnware with zip_path, semantic_specification and check_status.
If the learnware supports heterogeneous market training, its HeteroMapTableSpecification is also updated.

Parameters
----------
id : str
Learnware id
zip_path : str, optional
Filepath for learnware model, a zipped file.
semantic_spec : dict, optional
semantic_spec for new learnware, in dictionary format.
check_status : int, optional
A flag indicating whether the learnware is usable.

Returns
-------
int
The final learnware check_status.
"""
final_status = super(HeteroMapTableOrganizer, self).update_learnware(id, zip_path, semantic_spec, check_status)
if final_status == BaseChecker.USABLE_LEARWARE and len(self._get_hetero_learnware_ids(id)):
self._update_learware_hetero_sepc(id)
return final_status

def _reload_learnware_hetero_spec(self, learnware_id):
try:
hetero_spec_path = os.path.join(self.hetero_specs_path, f"{learnware_id}.json")
if os.path.exists(hetero_spec_path):
hetero_spec = HeteroMapTableSpecification()
hetero_spec.load(hetero_spec_path)
self.learnware_list[learnware_id].update_stat_spec(hetero_spec.type, hetero_spec)
else:
self._update_learware_hetero_sepc(learnware_id)
logger.info(f"Reload HeteroMapTableSpecification for hetero spec {learnware_id} succeed!")
except Exception as err:
logger.error(f"Reload HeteroMapTableSpecification for hetero spec {learnware_id} failed! due to {err}.")

def reload_learnware(self, learnware_id: str):
"""Reload learnware into heterogeneous learnware market.
If a corresponding HeteroMapTableSpecification exists, it is also reloaded.

Parameters
----------
learnware_id : str
Learnware to be reloaded
"""
super(HeteroMapTableOrganizer, self).reload_learnware(learnware_id)
if len(self._get_hetero_learnware_ids(learnware_id)):
self._reload_learnware_hetero_spec(learnware_id)

def _update_learware_hetero_sepc(self, ids: Union[str, List[str]]):
"""Update learnware by ids, attempting to generate HeteroMapTableSpecification for them.

Parameters
----------
ids : Union[str, List[str]]
Give a id or a list of ids
str: id of target learware
List[str]: A list of ids of target learnwares
"""
if isinstance(ids, str):
ids = [ids]

for idx in ids:
try:
spec = self.learnware_list[idx].get_specification()
semantic_spec, stat_spec = spec.get_semantic_spec(), spec.get_stat_spec()["RKMETableSpecification"]
features = semantic_spec["Input"]["Description"]
save_path = os.path.join(self.hetero_specs_path, f"{idx}.json")

hetero_spec = self.market_mapping.hetero_mapping(stat_spec, features)
self.learnware_list[idx].update_stat_spec(hetero_spec.type, hetero_spec)
hetero_spec.save(save_path)

except Exception as err:
traceback.print_exc()
logger.warning(f"Learnware {idx} generate HeteroMapTableSpecification failed!")

def _get_hetero_learnware_ids(self, ids: Union[str, List[str]]) -> List[str]:
"""Get learnware ids that supports heterogeneous market training and search.

Parameters
----------
ids : Union[str, List[str]]
Give a id or a list of ids
str: id of target learware
List[str]: A list of ids of target learnwares

Returns
-------
List[str]
Learnware ids
"""
if isinstance(ids, str):
ids = [ids]

ret = []
for idx in ids:
spec = self.learnware_list[idx].get_specification()
if is_hetero(stat_specs=spec.get_stat_spec(), semantic_spec=spec.get_semantic_spec()):
ret.append(idx)
return ret

def generate_hetero_map_spec(self, user_info: BaseUserInfo) -> HeteroMapTableSpecification:
"""Generate HeteroMapTableSpecificaion based on user's input description and statistical information.

Parameters
----------
user_info : BaseUserInfo
user_info contains semantic_spec and stat_info

Returns
-------
HeteroMapTableSpecification
The generated HeteroMapTableSpecification for user
"""
user_stat_spec = user_info.stat_info["RKMETableSpecification"]
user_features = user_info.get_semantic_spec()["Input"]["Description"]
user_hetero_spec = self.market_mapping.hetero_mapping(user_stat_spec, user_features)
return user_hetero_spec

@staticmethod
def train(learnware_list: List[Learnware], save_dir: str, **kwargs) -> HeteroMap:
"""Build the market mapping model using learnwares that supports heterogeneous market training.

Parameters
----------
learnware_list : List[Learnware]
The learnware list to train the market mapping
save_dir : str
Filepath where the trained market mapping will be saved

Returns
-------
HeteroMap
The trained market mapping model
"""
learnware_df_dict = defaultdict(list)
for learnware in learnware_list:
spec = learnware.get_specification()
stat_spec = spec.get_stat_spec()["RKMETableSpecification"]
features = spec.get_semantic_spec()["Input"]["Description"]
learnware_df = pd.DataFrame(data=stat_spec.get_z(), columns=features.values())
learnware_df_dict[tuple(sorted(features))].append(learnware_df)
allset = [pd.concat(dfs) for dfs in learnware_df_dict.values()]

# Train market mapping
market_mapping = HeteroMap(**kwargs)
market_mapping_trainer = Trainer(
model=market_mapping,
train_set_list=allset,
collate_fn=market_mapping.collate_fn,
**kwargs,
)
market_mapping_trainer.train()
market_mapping_trainer.save_model(output_dir=save_dir)

return market_mapping

+ 19
- 0
learnware/market/heterogeneous/organizer/hetero_map/README.md View File

@@ -0,0 +1,19 @@
# README

## Overview

This package contains code modified from the paper "TransTab: A Flexible Transferable Tabular Learning Framework." The original project, available at [TransTab GitHub Repository](https://github.com/RyanWangZf/transtab), is under the BSD 2-Clause license. The code here has been modified to focus specifically on numerical features, retaining only methods relevant to these features. The training approach is limited to unsupervised training. Differing from the original paper's usage of TransTab for final predictions, this code is utilized for feature extraction.

## Contents

- `__init__.py`: The `__init__.py` file defines the `HeteroMap` class, which forms the main network structure of the market engine. It includes methods for handling heterogeneous tabular data, focusing on mapping data from diverse feature spaces into a unified "specification world".
- `trainer.py`: The `trainer.py` file focuses on the unsupervised training process of the market engine. The `TransTabCollatorForCL` class is used for generating positive and negative samples from tabular vertical partitions for unsupervised learning.
- `feature_extractor.py`: This file is utilized for the purpose of tokenizing feature descriptions and transforming them into word embeddings.

## Handling heterogeneous learnwares

The code is used for finding a unified specification space for learnwares generated from table data with heterogeneous feature spaces and assigning new specifications accordingly. When the market receives some leanrwares, it utilize existing learnware specifications to train an engine. This engine integrates the specifications from various spaces into a unified "specification world", assigning new market-specific specifications to the learnware. As more learnwares are uploaded, the engine continuously updates, refining the specification world and updating the specifications of the learnware.

## License

The hetero_map package, based on the TransTab project, adheres to the BSD 2-Clause license.

+ 635
- 0
learnware/market/heterogeneous/organizer/hetero_map/__init__.py View File

@@ -0,0 +1,635 @@
from typing import Callable, List, Optional, Union

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from torch import Tensor, nn

from .....specification import HeteroMapTableSpecification, RKMETableSpecification
from .feature_extractor import CLSToken, FeatureProcessor, FeatureTokenizer
from .trainer import Trainer, TransTabCollatorForCL


class HeteroMap(nn.Module):
"""
This class is based on 'TransTab' project as described in the paper
"TransTab: A flexible transferable tabular learning framework". The original project is available at
https://github.com/RyanWangZf/transtab and is licensed under the BSD 2-Clause License.

Modifications:
- Simplified the original code to focus primarily on methods related to numerical features.
- Retained only the unsupervised training method.
- While the original paper and the TransTab framework utilized the module for final predictions, this version
is modified for feature extraction purposes only.

The class implements a neural network module for processing tabular data, specifically tuned for numerical features.
"""

def __init__(
self,
feature_tokenizer: FeatureTokenizer = None,
hidden_dim: int = 128,
num_layer: int = 2,
num_attention_head: int = 8,
hidden_dropout_prob: float = 0,
ffn_dim: int = 256,
projection_dim: int = 128,
overlap_ratio: float = 0.5,
num_partition: int = 3,
temperature: int = 10,
base_temperature: int = 10,
activation: Union[str, Callable] = "relu",
device: Union[str, torch.device] = "cpu",
**kwargs,
):
"""
The initialization method for hetero map.

Parameters
----------
feature_tokenizer : FeatureTokenizer, optional
Tokenizer for feature representation, by default None
hidden_dim : int, optional
Dimension of hidden layer, by default 128
num_layer : int, optional
Number of layers in the transformer encoder, by default 2
num_attention_head : int, optional
Number of attention heads in the transformer, by default 8
hidden_dropout_prob : int, optional
Dropout probability for hidden layers, by default 0
ffn_dim : int, optional
Dimension of feedforward network, by default 256
projection_dim : int, optional
Dimension for projection head, by default 128
overlap_ratio : float, optional
Overlap ratio for tokenizatio, by default 0.5
num_partition : int, optional
Number of partitions for collatio, by default 3
temperature : int, optional
Temperature parameter for contrastive learnin, by default 10
base_temperature : int, optional
Base temperature paramete, by default 10
activation : Union[str, Callable], optional
Activation function for transformer layer, by default "relu"
device : Union[str, torch.device], optional
Device to run the model on, by default "cpu"
kwargs:
Additional arguments to be passed to the feature tokenizer
"""
super(HeteroMap, self).__init__()

self.model_args = {
"num_partition": num_partition,
"overlap_ratio": overlap_ratio,
"hidden_dim": hidden_dim,
"num_layer": num_layer,
"num_attention_head": num_attention_head,
"hidden_dropout_prob": hidden_dropout_prob,
"ffn_dim": ffn_dim,
"projection_dim": projection_dim,
"activation": activation,
}
self.model_args.update(kwargs)

if feature_tokenizer is None:
feature_tokenizer = FeatureTokenizer(**kwargs)

self.feature_tokenizer = feature_tokenizer

self.feature_processor = FeatureProcessor(
vocab_size=feature_tokenizer.vocab_size,
pad_token_id=feature_tokenizer.pad_token_id,
hidden_dim=hidden_dim,
hidden_dropout_prob=hidden_dropout_prob,
device=device,
)

self.encoder = TransformerMultiLayer(
hidden_dim=hidden_dim,
num_layer=num_layer,
num_attention_head=num_attention_head,
hidden_dropout_prob=hidden_dropout_prob,
ffn_dim=ffn_dim,
activation=activation,
)
self.cls_token = CLSToken(hidden_dim=hidden_dim)
self.collate_fn = TransTabCollatorForCL(
feature_tokenizer=feature_tokenizer, overlap_ratio=overlap_ratio, num_partition=num_partition
)

self.projection_head = nn.Linear(hidden_dim, projection_dim, bias=False)
self.cross_entropy_loss = nn.CrossEntropyLoss()
self.temperature = temperature
self.base_temperature = base_temperature
self.num_partition = num_partition
self.overlap_ratio = overlap_ratio
self.to(device)

def to(self, device: Union[str, torch.device]):
"""Moves the model and all its submodules to the specified device

Parameters
----------
device : Union[str, torch.device]
The target device to which the model and its components should be moved.

Returns
-------
HeteroMap
The instance of HeteroMap after moving to the specified device.
"""
super(HeteroMap, self).to(device)
if hasattr(self, "feature_processor"):
self.feature_processor.device = device
self.device = device
return self

@staticmethod
def load(checkpoint: str = None):
"""Load the model state_dict and architecture configuration from the specified checkpoint.

Parameters
----------
checkpoint: str
the directory path to load.
"""
# load model weight state dict
model_info = torch.load(checkpoint, map_location="cpu")
model = HeteroMap(**model_info["model_args"])
model.load_state_dict(model_info["model_state_dict"], strict=False)
return model

def save(self, checkpoint: str):
"""Save the model state_dict and architecture configuration to the specified checkpoint.

Parameters
----------
checkpoint: str
the directory path to save.
"""
# save model weight state dict
model_info = {"model_state_dict": self.state_dict(), "model_args": self.model_args}
torch.save(model_info, checkpoint)

def forward(self, x: dict):
"""Processes the input data 'x', performs positive sampling, and computes contrastive loss.

Parameters
----------
x : dict
Pre-tokenized input tabular data in the form of a dictionary

Returns
-------
torch.Tensor
The self-supervised VPCL loss
"""
feat_x_list = []
if isinstance(x, dict):
# pretokenized inputs
for input_x in x["input_sub_x"]:
feat_x = self.feature_processor(**input_x)
feat_x = self.cls_token(**feat_x)
feat_x = self.encoder(**feat_x)
feat_x_proj = feat_x[:, 0, :]
feat_x_proj = self.projection_head(feat_x_proj)
feat_x_list.append(feat_x_proj)
else:
raise ValueError(f"expect input x to be dict(pretokenized), get {type(x)} instead")

# compute cl loss (multi-view InfoNCE loss)
feat_x_multiview = torch.stack(feat_x_list, axis=1) # bs, n_view, emb_dim
loss = self._self_supervised_contrastive_loss(feat_x_multiview)
return loss

def hetero_mapping(self, rkme_spec: RKMETableSpecification, features: dict) -> HeteroMapTableSpecification:
"""Generate HeteroMapTableSpecification from given tabular data's statistical specification and descriptions of features.

Parameters
----------
rkme_spec : RKMETableSpecification
The RKME specification from the tabular data
features : dict
A dictionary mapping each feature's numerical identifier to its semantic description.

Returns
-------
HeteroMapTableSpecification
The resulting HeteroMapTableSpecification
"""
hetero_spec = HeteroMapTableSpecification()
data = rkme_spec.get_z()
cols = [features.get(str(i), "Unknown Feature") for i in range(data.shape[1])]
hetero_input_df = pd.DataFrame(data=data, columns=cols)
hetero_embedding = self._extract_batch_features(hetero_input_df)
hetero_spec.generate_stat_spec_from_system(hetero_embedding, rkme_spec)
return hetero_spec

def _build_positive_pairs(self, x: pd.DataFrame, n: int):
"""
Builds positive pairs by splitting the input DataFrame into 'n' parts with some overlap.

Parameters
----------
x : pd.DataFrame
The input DataFrame to be split.
n : int
The number of partitions to divide the DataFrame into.

Returns
-------
List[pd.DataFrame]
A list of DataFrames, each representing a partition of the input DataFrame with some overlap.
"""
x_cols = x.columns.tolist()
sub_col_list = np.array_split(np.array(x_cols), n)
len_cols = len(sub_col_list[0])
overlap = int(np.ceil(len_cols * (self.overlap_ratio)))
sub_x_list = []
for i, sub_col in enumerate(sub_col_list):
if overlap > 0 and i < n - 1:
sub_col = np.concatenate([sub_col, sub_col_list[i + 1][:overlap]])
elif overlap > 0 and i == n - 1:
sub_col = np.concatenate([sub_col, sub_col_list[i - 1][-overlap:]])
sub_x = x.copy()[sub_col]
sub_x_list.append(sub_x)
return sub_x_list

def _extract_features(self, x: Union[dict, pd.DataFrame], cols=None):
"""Performs a forward pass with the given input feature `x`, and extracts features.

Parameters
----------
x: Union[dict, pd.DataFrame]
pd.DataFrame: A batch of raw tabular samples
dict: The output of feature_tokenizer

Returns
-------
output_features: numpy.ndarray
The [CLS] embedding at the end of transformer encoder
"""
if isinstance(x, pd.DataFrame):
inputs = self.feature_tokenizer(x)
elif isinstance(x, torch.Tensor):
inputs = self.feature_tokenizer.forward(cols, x)
else:
raise ValueError(f"feature_tokenizer takes inputs with dict or pd.DataFrame, find {type(x)}.")

outputs = self.feature_processor(**inputs) # outputs is dict, "embedding" and "mask"
outputs = self.cls_token(**outputs) # add the cls embedding

# go through transformers, get the first cls embedding
encoder_output = self.encoder(**outputs) # bs, seqlen+1, hidden_dim
output_features = encoder_output[:, 0, :]

return output_features

def _extract_batch_features(self, x_test: pd.DataFrame, eval_batch_size=256) -> np.ndarray:
"""Performs forward passes on a batch of input features `x_test`, extracting and returning features as an array.

Parameters
----------
x_test : pd.DataFrame
A batch of raw tabular samples
eval_batch_size : int, optional
The size of each batch for processing, by default 256

Returns
-------
np.ndarray
An array containing the extracted features from all batches
"""
self.eval()
output_feas_list = []
for i in range(0, len(x_test), eval_batch_size):
bs_x_test = x_test.iloc[i : i + eval_batch_size]
with torch.no_grad():
output_features = self._extract_features(bs_x_test).detach().cpu().numpy()
output_feas_list.append(output_features)

all_output_features = np.concatenate(output_feas_list, 0)
return all_output_features

def _self_supervised_contrastive_loss(self, features: torch.Tensor):
"""
Compute the self-supervised VPCL loss.

Parameters
----------
features : torch.Tensor
The encoded features of multiple partitions of input tables, with shape (bs, n_partition, proj_dim).

Returns
-------
torch.Tensor
The computed self-supervised VPCL loss.
"""
batch_size = features.shape[0]
labels = torch.arange(batch_size, dtype=torch.long, device=self.device).view(-1, 1)
mask = torch.eq(labels, labels.T).float().to(labels.device)

contrast_count = features.shape[1]
# [[0,1],[2,3]] -> [0,2,1,3]
contrast_feature = torch.cat(torch.unbind(features, dim=1), dim=0)
anchor_feature = contrast_feature
anchor_count = contrast_count
anchor_dot_contrast = torch.div(torch.matmul(anchor_feature, contrast_feature.T), self.temperature)
logits_max, _ = torch.max(anchor_dot_contrast, dim=1, keepdim=True)
logits = anchor_dot_contrast - logits_max.detach()
mask = mask.repeat(anchor_count, contrast_count)
logits_mask = torch.scatter(
torch.ones_like(mask),
1,
torch.arange(batch_size * anchor_count).view(-1, 1).to(features.device),
0,
)
mask = mask * logits_mask
# compute log_prob
exp_logits = torch.exp(logits) * logits_mask
log_prob = logits - torch.log(exp_logits.sum(1, keepdim=True))
# compute mean of log-likelihood over positive
mean_log_prob_pos = (mask * log_prob).sum(1) / mask.sum(1)
loss = -(self.temperature / self.base_temperature) * mean_log_prob_pos
loss = loss.view(anchor_count, batch_size).mean()
return loss


class TransformerLayer(nn.Module):
"""A custom Transformer layer implemented as a PyTorch module."""

__config__ = ["batch_first", "norm_first"]

def __init__(
self,
d_model: int,
nhead: int,
dim_feedforward: int = 2048,
dropout: float = 0.1,
activation: Union[str, Callable] = F.relu,
layer_norm_eps: float = 1e-5,
batch_first: bool = True,
norm_first: bool = False,
device: Union[str, torch.device] = None,
dtype: torch.dtype = None,
use_layer_norm: bool = True,
):
"""
The initialization method for transformer layer.

Parameters
----------
d_model : int
The number of expected features in the input
nhead : int
The number of heads in the multiheadattention models
dim_feedforward : int, optional
The dimension of the feedforward network model, by default 2048
dropout : float, optional
The dropout value, by default 0.1
activation : Union[str, Callable], optional
The activation function to use, by default F.relu
layer_norm_eps : float, optional
The epsilon used for layer normalization, by default 1e-5
batch_first : bool, optional
Whether to use (batch, seq, feature) format for input and output tensors, by default True
norm_first : bool, optional
Whether to perform layer normalization before attention and feedforward operations, by default False
device : Union[str, torch.device], optional
The device on which the layer is to be run, by default None
dtype : torch.dtype, optional
The data type of the layer's parameters, by default None
use_layer_norm : bool, optional
Whether to use layer normalization, by default True
"""
factory_kwargs = {"device": device, "dtype": dtype}
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, batch_first=batch_first, **factory_kwargs)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward, **factory_kwargs)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model, **factory_kwargs)

# Implementation of gates
self.gate_linear = nn.Linear(d_model, 1, bias=False)
self.gate_act = nn.Sigmoid()

self.norm_first = norm_first
self.use_layer_norm = use_layer_norm

if self.use_layer_norm:
self.norm1 = nn.LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs)
self.norm2 = nn.LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)

# Legacy string support for activation function.
if isinstance(activation, str):
self.activation = self._get_activation_fn(activation)
else:
self.activation = activation

# self-attention block
def _sa_block(self, x: torch.Tensor, attn_mask: torch.Tensor, key_padding_mask: torch.Tensor) -> torch.Tensor:
"""
Applies a self-attention block to the input tensor.

Parameters
----------
x : torch.Tensor
The input tensor for the self-attention block.
attn_mask : torch.Tensor
The attention mask for the self-attention operation.
key_padding_mask : torch.Tensor
The key padding mask for the self-attention operation.

Returns
-------
torch.Tensor
The output tensor after applying the self-attention block.
"""
key_padding_mask = ~key_padding_mask.bool()
x = self.self_attn(
x,
x,
x,
attn_mask=attn_mask,
key_padding_mask=key_padding_mask,
)[0]
return self.dropout1(x)

# feed forward block
def _ff_block(self, x: torch.Tensor) -> torch.Tensor:
"""
Applies a feed-forward block to the input tensor.

Parameters
----------
x : torch.Tensor
The input tensor for the feed-forward block.

Returns
-------
torch.Tensor
The output tensor after applying the feed-forward block.
"""
g = self.gate_act(self.gate_linear(x))
h = self.linear1(x)
h = h * g # add gate
h = self.linear2(self.dropout(self.activation(h)))
return self.dropout2(h)

def __setstate__(self, state):
if "activation" not in state:
state["activation"] = F.relu
super().__setstate__(state)

@staticmethod
def _get_activation_fn(activation: str) -> Callable:
"""
Retrieves the activation function based on the provided activation name.

Parameters
----------
activation : str
Name of the activation function. Supported values are "relu", "gelu", "selu", and "leakyrelu".

Returns
-------
Callable
The corresponding activation function from torch.nn.functional.
"""
if activation == "relu":
return F.relu
elif activation == "gelu":
return F.gelu
elif activation == "selu":
return F.selu
elif activation == "leakyrelu":
return F.leaky_relu
raise RuntimeError("activation should be relu/gelu/selu/leakyrelu, not {}".format(activation))

def forward(
self,
src: torch.Tensor,
src_mask: torch.Tensor = None,
src_key_padding_mask: torch.Tensor = None,
is_causal: torch.Tensor = None,
**kwargs,
) -> torch.Tensor:
"""Pass the input through the encoder layer.

Parameters
----------
src : torch.Tensor
The sequence to the encoder layer.
src_mask : torch.Tensor, optional
The mask for the src sequence, by default None
src_key_padding_mask : torch.Tensor, optional
The mask for the src keys per batch, by default None
is_causal : torch.Tensor, optional
A flag indicating whether the layer should be causal, by default None

Returns
-------
torch.Tensor
The output tensor after passing through the encoder layer.
"""
# see Fig. 1 of https://arxiv.org/pdf/2002.04745v1.pdf
x = src
if self.use_layer_norm:
if self.norm_first:
x = x + self._sa_block(self.norm1(x), src_mask, src_key_padding_mask)
x = x + self._ff_block(self.norm2(x))
else:
x = self.norm1(x + self._sa_block(x, src_mask, src_key_padding_mask))
x = self.norm2(x + self._ff_block(x))

else: # do not use layer norm
x = x + self._sa_block(x, src_mask, src_key_padding_mask)
x = x + self._ff_block(x)
return x


class TransformerMultiLayer(nn.Module):
"""A custom multi-layer Transformer module."""

def __init__(
self,
hidden_dim: int = 128,
num_layer: int = 2,
num_attention_head: int = 2,
hidden_dropout_prob: float = 0,
ffn_dim: int = 256,
activation: Union[str, Callable] = "relu",
):
"""
The initialization method for align transformer multilayer.

Parameters
----------
hidden_dim : int, optional
Dimension of the hidden layer in the Transformer, by default 128.
num_layer : int, optional
Number of Transformer layers, by default 2.
num_attention_head : int, optional
Number of attention heads in each Transformer layer, by default 2.
hidden_dropout_prob : float, optional
Dropout probability for the hidden layers, by default 0.
ffn_dim : int, optional
Dimension of the feedforward network model, by default 256.
activation : Union[str, Callable], optional
The activation function to be used, by default "relu".
"""
super().__init__()
self.transformer_encoder = nn.ModuleList(
[
TransformerLayer(
d_model=hidden_dim,
nhead=num_attention_head,
dropout=hidden_dropout_prob,
dim_feedforward=ffn_dim,
batch_first=True,
layer_norm_eps=1e-5,
norm_first=False,
use_layer_norm=True,
activation=activation,
)
]
)
if num_layer > 1:
encoder_layer = TransformerLayer(
d_model=hidden_dim,
nhead=num_attention_head,
dropout=hidden_dropout_prob,
dim_feedforward=ffn_dim,
batch_first=True,
layer_norm_eps=1e-5,
norm_first=False,
use_layer_norm=True,
activation=activation,
)
stacked_transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layer - 1)
self.transformer_encoder.append(stacked_transformer)

def forward(self, embedding: torch.Tensor, attention_mask: torch.Tensor = None, **kwargs) -> torch.Tensor:
"""
Passes the input embedding through the Transformer encoder layers.

Parameters
----------
embedding : torch.Tensor
The input embedding tensor with shape (batch size, number of tokens, hidden dimension).
attention_mask : torch.Tensor, optional
The attention mask for the input tensor, by default None.

Returns
-------
Tensor
The output tensor after processing through Transformer encoder layers.
"""
outputs = embedding
for i, mod in enumerate(self.transformer_encoder):
outputs = mod(outputs, src_key_padding_mask=attention_mask)
return outputs

+ 378
- 0
learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py View File

@@ -0,0 +1,378 @@
import math
import os
from typing import Callable, Dict, List, Union

import numpy as np
import pandas as pd
import torch
import torch.nn.init as nn_init
from torch import Tensor, nn
from transformers import BertTokenizerFast

from .....config import C as conf


class WordEmbedding(nn.Module):
"""Encodes tokens drawn from column names into word embeddings."""

def __init__(
self,
vocab_size: int,
hidden_dim: int,
padding_idx: int = 0,
hidden_dropout_prob: float = 0,
layer_norm_eps: float = 1e-5,
):
"""
The initialization method for word embedding.

Parameters
----------
vocab_size : int
The size of the vocabulary.
hidden_dim : int
The dimension of the hidden layer.
padding_idx : int, optional
The index of the padding token, by default 0.
hidden_dropout_prob : float, optional
The dropout probability for the hidden layer, by default 0.
layer_norm_eps : float, optional
The epsilon value for layer normalization, by default 1e-5.
"""
super().__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_dim, padding_idx)
nn_init.kaiming_normal_(self.word_embeddings.weight)
self.norm = nn.LayerNorm(hidden_dim, eps=layer_norm_eps)
self.dropout = nn.Dropout(hidden_dropout_prob)

def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
"""
Performs the forward pass of the WordEmbedding module.

Parameters
----------
input_ids : torch.Tensor
The input token IDs.

Returns
-------
torch.Tensor
The word embeddings corresponding to the input token IDs.
"""
embeddings = self.word_embeddings(input_ids)
embeddings = self.norm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings


class NumEmbedding(nn.Module):
"""Encode tokens drawn from column names and the corresponding numerical features."""

def __init__(self, hidden_dim: int):
"""
The initialization method for num embedding.

Parameters
----------
hidden_dim : int
The dimension of the hidden layer.
"""
super().__init__()
self.norm = nn.LayerNorm(hidden_dim)
self.num_bias = nn.Parameter(Tensor(1, 1, hidden_dim)) # add bias
nn_init.uniform_(self.num_bias, a=-1 / math.sqrt(hidden_dim), b=1 / math.sqrt(hidden_dim))

def forward(self, col_emb: torch.Tensor, x_ts: torch.Tensor) -> torch.Tensor:
"""
Performs the forward pass of the NumEmbedding module.

Parameters
----------
col_emb : torch.Tensor
The numerical column embeddings with shape (# numerical columns, emb_dim).
x_ts : torch.Tensor
The numerical features with shape (bs, emb_dim).

Returns
-------
torch.Tensor
The combined feature embeddings.
"""
col_emb = col_emb.unsqueeze(0).expand((x_ts.shape[0], -1, -1))
feat_emb = col_emb * x_ts.unsqueeze(-1).float() + self.num_bias
return feat_emb


class FeatureTokenizer:
"""Process input dataframe to input indices towards encoder, usually used to build dataloader for paralleling loading."""

def __init__(
self,
disable_tokenizer_parallel: bool = True,
**kwargs,
):
"""
The initialization method for feature tokenizer.
.
Parameters
----------
disable_tokenizer_parallel : bool, optional
Whether to disable tokenizer parallelism, by default True.
"""
cache_dir = conf.cache_path
os.makedirs(cache_dir, exist_ok=True)
self.tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased", cache_dir=cache_dir)
self.tokenizer.__dict__["model_max_length"] = 512
if disable_tokenizer_parallel: # disable tokenizer parallel
os.environ["TOKENIZERS_PARALLELISM"] = "false"
self.vocab_size = self.tokenizer.vocab_size
self.pad_token_id = self.tokenizer.pad_token_id

def __call__(self, x: pd.DataFrame, shuffle: bool = False, keep_input_grad: bool = False) -> Dict:
"""
Tokenizes the input DataFrame.

Parameters
----------
x : pd.DataFrame
The input DataFrame with column names and features.
shuffle : bool, optional
Whether to shuffle column order during training, by default False.
keep_input_grad : bool, optional
Whether to keep input gradients, by default False.

Returns
-------
Dict
A dictionary with tokenized inputs.
"""
encoded_inputs = {"x_num": None, "num_col_input_ids": None}

index_cols = (
[i for i in range(len(x.columns))] if not shuffle else np.random.shuffle([i for i in range(len(x.columns))])
)
num_cols = [x.columns[i] for i in index_cols]
x_num = x.iloc(axis=1)[index_cols].fillna(0)
if keep_input_grad:
x_num_ts = torch.tensor(x_num.values, dtype=float, requires_grad=True) # keep the grad
else:
x_num_ts = torch.tensor(x_num.values, dtype=float)

num_col_ts = self.tokenizer(
num_cols,
padding=True,
truncation=True,
add_special_tokens=False,
return_tensors="pt",
)

encoded_inputs["x_num"] = x_num_ts
encoded_inputs["num_col_input_ids"] = num_col_ts["input_ids"]
encoded_inputs["num_att_mask"] = num_col_ts["attention_mask"] # mask out attention

return encoded_inputs

def forward(self, cols: List[str], x: torch.Tensor) -> Dict:
"""
Processes the input data and generates encoded inputs suitable for model encoding.

Parameters
----------
cols: List[str]
A list containing all column names in order.

x: torch.Tensor
The tensor containing numerical features.

Returns
-------
Dict
- 'x_num': Tensor containing numerical features.
- 'num_col_input_ids': Tensor containing tokenized IDs of numerical columns.
- 'num_att_mask': Attention mask for the numerical column tokens.
"""
encoded_inputs = {
"x_num": None,
"num_col_input_ids": None,
}
num_cols = cols
num_col_ts = self.tokenizer(
num_cols,
padding=True,
truncation=True,
add_special_tokens=False,
return_tensors="pt",
)
encoded_inputs["x_num"] = x
encoded_inputs["num_col_input_ids"] = num_col_ts["input_ids"]
encoded_inputs["num_att_mask"] = num_col_ts["attention_mask"] # mask out attention

return encoded_inputs


class FeatureProcessor(nn.Module):
"""Process inputs from feature extractor to map them to embeddings."""

def __init__(
self,
vocab_size: int = None,
hidden_dim: int = 128,
hidden_dropout_prob: float = 0,
pad_token_id: int = 0,
device: Union[str, torch.device] = "cuda:0",
):
"""
The initialization method for feature processor.

Parameters
----------
vocab_size : int, optional
The size of the vocabulary.
hidden_dim : int, optional
The dimension of the hidden layer, by default 128.
hidden_dropout_prob : float, optional
The dropout probability for the hidden layer, by default 0.
pad_token_id : int, optional
The index of the padding token, by default 0.
device : Union[str, torch.device], optional
The device to run the module on, by default "cuda:0".
"""
super().__init__()
self.word_embedding = WordEmbedding(
vocab_size=vocab_size,
hidden_dim=hidden_dim,
hidden_dropout_prob=hidden_dropout_prob,
padding_idx=pad_token_id,
)
self.num_embedding = NumEmbedding(hidden_dim)
self.align_layer = nn.Linear(hidden_dim, hidden_dim, bias=False)
self.device = device

def _avg_embedding_by_mask(self, embs: torch.Tensor, att_mask: torch.Tensor = None) -> torch.Tensor:
"""
Averages the embeddings based on the attention mask.

Parameters
----------
embs : torch.Tensor
The embeddings tensor.
att_mask : torch.Tensor, optional
The attention mask to apply on the embeddings. If None, the mean of the embeddings is returned, by default None.

Returns
-------
torch.Tensor
The resulting averaged embeddings.
"""
if att_mask is None:
return embs.mean(1)
else:
embs[att_mask == 0] = 0
embs = embs.sum(1) / att_mask.sum(1, keepdim=True).to(embs.device)
return embs

def forward(
self,
x_num: torch.Tensor = None,
num_col_input_ids: torch.Tensor = None,
num_att_mask: torch.Tensor = None,
**kwargs,
) -> torch.Tensor:
"""
Performs the forward pass of the FeatureProcessor module.

Parameters
----------
x_num : torch.Tensor, optional
The numerical features.
num_col_input_ids : torch.Tensor, optional
The input IDs for numerical columns.
num_att_mask : torch.Tensor, optional
The attention mask.

Returns
-------
torch.Tensor
The processed feature embeddings.
"""
x_num = x_num.to(self.device)
num_col_emb = self.word_embedding(num_col_input_ids.to(self.device))
num_col_emb = self._avg_embedding_by_mask(num_col_emb, num_att_mask)

num_feat_embedding = self.num_embedding(num_col_emb, x_num)
num_feat_embedding = self.align_layer(num_feat_embedding).float()

attention_mask = torch.ones(num_feat_embedding.shape[0], num_feat_embedding.shape[1]).to(
num_feat_embedding.device
)
return {"embedding": num_feat_embedding, "attention_mask": attention_mask}


class CLSToken(nn.Module):
"""Add a learnable cls token embedding at the end of each sequence."""

def __init__(self, hidden_dim: int):
"""
The initialization method for CLSToken.

Parameters
----------
hidden_dim : int
The dimension of the hidden layer.
"""
super().__init__()
self.weight = nn.Parameter(Tensor(hidden_dim))
nn_init.uniform_(self.weight, a=-1 / math.sqrt(hidden_dim), b=1 / math.sqrt(hidden_dim))
self.hidden_dim = hidden_dim

def expand(self, *leading_dimensions) -> torch.Tensor:
"""
Expands the CLS token embedding to match the leading dimensions of the input.

Parameters
----------
leading_dimensions : tuple
A variable number of integer arguments representing the leading dimensions to which the CLS token embedding will be expanded.

Returns
-------
torch.Tensor
Expanded CLS token embedding.
"""
new_dims = (1,) * (len(leading_dimensions) - 1)
# cls token (128,) -> view(*new_dims, -1) -> (1, 128)
# (1, 128) -> expand(*leading_dimensions, -1) -> (64, 1, 128)
# here expand means "shared", the cls token embedding remains the same for each sample
return self.weight.view(*new_dims, -1).expand(*leading_dimensions, -1)

def forward(self, embedding: torch.Tensor, attention_mask: torch.Tensor = None, **kwargs) -> torch.Tensor:
"""
Performs a forward pass by adding a learnable CLS token to the embedding.

Parameters
----------
embedding : torch.Tensor
The input embedding tensor.
attention_mask : torch.Tensor, optional
The attention mask for the input tensor, by default None.

Returns
-------
torch.Tensor
Output embedding with the CLS token added.
"""
# embedding shape: (64, 11, 128), where 11 is the largest sequence length after tokenizing
# after concat, learnable cls token [self.weight] is added to each semantic embedding
# embedding shape: (64, d+1, 128)
embedding = torch.cat([self.expand(len(embedding), 1), embedding], dim=1)
outputs = {"embedding": embedding}
if attention_mask is not None:
attention_mask = torch.cat(
[
torch.ones(attention_mask.shape[0], 1).to(attention_mask.device),
attention_mask,
],
1,
)
outputs["attention_mask"] = attention_mask
return outputs

+ 364
- 0
learnware/market/heterogeneous/organizer/hetero_map/trainer.py View File

@@ -0,0 +1,364 @@
import math
import os
import time
from typing import Any, Callable, Dict, List

import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm.autonotebook import trange

from .....logger import get_module_logger
from .feature_extractor import FeatureTokenizer

logger = get_module_logger("hetero_mapping_trainer")


class Trainer:
def __init__(
self,
model: Any,
train_set_list: List[Any],
collate_fn: Callable = None,
output_dir: str = "./ckpt",
num_epoch: int = 10,
batch_size: int = 64,
lr: float = 1e-4,
weight_decay: float = 0,
eval_batch_size: int = 256,
**kwargs,
):
"""
The initialization method for trainer.

Parameters
----------
model : Any
The model to be trained.
train_set_list : List[Any]
A list of training datasets.
collate_fn : Callable, optional
The collate function to be used, by default None.
output_dir : str, optional
The directory where the trained model checkpoints will be saved, by default "./ckpt".
num_epoch : int, optional
Number of epochs for training, by default 10.
batch_size : int, optional
Batch size for training, by default 64.
lr : float, optional
Learning rate, by default 1e-4.
weight_decay : float, optional
Weight decay, by default 0.
eval_batch_size : int, optional
Batch size for evaluation, by default 256.
kwargs : dict
Additional keyword arguments.
"""

self.model = model
if isinstance(train_set_list, tuple):
train_set_list = [train_set_list]

self.train_set_list = train_set_list
self.collate_fn = collate_fn
self.trainloader_list = [
self._build_dataloader(trainset, batch_size, collator=self.collate_fn) for trainset in train_set_list
]
self.output_dir = output_dir
# os.makedirs(self.output_dir, exist_ok=True)
self.args = {
"lr": lr,
"weight_decay": weight_decay,
"batch_size": batch_size,
"num_epoch": num_epoch,
"eval_batch_size": eval_batch_size,
"num_training_steps": self._get_num_train_steps(train_set_list, num_epoch, batch_size),
}
self.args["steps_per_epoch"] = int(self.args["num_training_steps"] / (num_epoch * len(self.train_set_list)))
self.optimizer = None

def train(self, verbose: bool = True) -> float:
"""
Trains the model using the provided training data.

Parameters
----------
verbose : bool, optional
Whether to display verbose output, by default True.

Returns
-------
float
The final training loss.
"""

self._create_optimizer()
start_time = time.time()
final_train_loss = 0
for epoch in trange(self.args["num_epoch"], desc="Epoch"):
ite = 0
train_loss_all = 0
for dataindex in range(len(self.trainloader_list)):
for data in self.trainloader_list[dataindex]:
self.optimizer.zero_grad()
loss = self.model(data)
loss.backward()
self.optimizer.step()
train_loss_all += loss.item()
ite += 1

if verbose:
logger.info(
"epoch: {}, train loss: {:.4f}, lr: {:.6f}, spent: {:.1f} secs".format(
epoch,
train_loss_all,
self.optimizer.param_groups[0]["lr"],
time.time() - start_time,
)
)
final_train_loss = train_loss_all

logger.info("training complete, cost {:.1f} secs.".format(time.time() - start_time))
return final_train_loss

def save_model(self, output_dir: str = None):
"""
Saves the trained model to the specified directory.

Parameters
----------
output_dir : str, optional
The directory where the model will be saved, by default None.
"""

logger.info(f"saving model checkpoint to {output_dir}")
self.model.save(output_dir)

def _create_optimizer(self):
"""Creates an optimizer for training the model."""

if self.optimizer is None:
decay_parameters = self._get_parameter_names(self.model, [nn.LayerNorm])
decay_parameters = [name for name in decay_parameters if "bias" not in name]

decay_params_dict = {n: p for n, p in self.model.named_parameters() if n in decay_parameters}
no_decay_params_dict = {n: p for n, p in self.model.named_parameters() if n not in decay_parameters}

optimizer_grouped_parameters = [
{
"params": list(decay_params_dict.values()),
"weight_decay": self.args["weight_decay"],
},
{"params": list(no_decay_params_dict.values()), "weight_decay": 0.0},
]

self.optimizer = torch.optim.Adam(optimizer_grouped_parameters, lr=self.args["lr"])

def _get_num_train_steps(self, train_set_list: List[Any], num_epoch: int, batch_size: int) -> int:
"""
Calculates the total number of training steps.

Parameters
----------
train_set_list : List[Any]
A list of training datasets.
num_epoch : int
Number of training epochs.
batch_size : int
Batch size for training.

Returns
-------
int
The total number of training steps.
"""

total_step = 0
for trainset in train_set_list:
x_train = trainset
total_step += np.ceil(len(x_train) / batch_size)
total_step *= num_epoch
return total_step

def _build_dataloader(
self, trainset: Any, batch_size: int, collator: Callable, shuffle: bool = True
) -> torch.utils.data.DataLoader:
"""
Builds a DataLoader for training.

Parameters
----------
trainset : Any
The training dataset.
batch_size : int
Batch size for the DataLoader.
collator : Callable
Collate function for the DataLoader.
shuffle : bool, optional
Whether to shuffle the data, by default True.

Returns
-------
torch.utils.data.DataLoader
The DataLoader for the training data.
"""

trainloader = DataLoader(
TrainDataset(trainset),
collate_fn=collator,
batch_size=batch_size,
shuffle=shuffle,
pin_memory=True,
drop_last=False,
)
return trainloader

def _get_parameter_names(self, model: Any, forbidden_layer_types: List[torch.dtype]) -> List[str]:
"""
Retrieves the names of parameters not inside forbidden layers.

Parameters
----------
model : Any
The model from which to retrieve parameters.
forbidden_layer_types : List[torch.dtype]
A list of layer types to exclude.

Returns
-------
List[str]
A list of parameter names not inside the forbidden layers.
"""
result = []
for name, child in model.named_children():
result += [
f"{name}.{n}"
for n in self._get_parameter_names(child, forbidden_layer_types)
if not isinstance(child, tuple(forbidden_layer_types))
]
# Add model specific parameters (defined with nn.Parameter) since they are not in any child.
result += list(model._parameters.keys())
return result


class TrainDataset(Dataset):
def __init__(self, trainset):
self.x = trainset

def __len__(self):
return len(self.x)

def __getitem__(self, index):
x = self.x.iloc[index - 1 : index]
return x


class TransTabCollatorForCL:
"""Collator class supporting positive pair sampling for contrastive learning."""

def __init__(
self,
feature_tokenizer: Callable = None,
overlap_ratio: float = 0.5,
num_partition: int = 3,
**kwargs,
):
"""
The initialization method for TransTabCollatorForCL.

Parameters
----------
feature_tokenizer : Callable, optional
The tokenizer used to process data, by default None.
overlap_ratio : float, optional
The ratio of overlap between partitions, must be between 0 and 1 (exclusive), by default 0.5.
num_partition : int, optional
The number of partitions to create from the data for contrastive learning, by default 3.
"""
self.feature_tokenizer = feature_tokenizer or FeatureTokenizer(disable_tokenizer_parallel=True)
assert num_partition > 0, f"number of contrastive subsets must be greater than 0, got {num_partition}"
assert isinstance(num_partition, int), f"number of constrative subsets must be int, got {type(num_partition)}"
assert overlap_ratio >= 0 and overlap_ratio < 1, f"overlap_ratio must be in [0, 1), got {overlap_ratio}"
self.overlap_ratio = overlap_ratio
self.num_partition = num_partition

def __call__(self, data: List[Any]) -> Dict[str, Any]:
"""
Processes the data into subsets for contrastive learning.

Parameters
----------
data : List[Any]
The input data to be processed.

Returns
-------
Dict[str, Any]
A dictionary containing the processed data subsets.
"""
df_x = pd.concat([row for row in data])
if self.num_partition > 1:
sub_x_list = self._build_positive_pairs(df_x, self.num_partition)
else:
sub_x_list = self._build_positive_pairs_single_view(df_x)
input_x_list = []
for sub_x in sub_x_list:
inputs = self.feature_tokenizer(sub_x)
input_x_list.append(inputs)
res = {"input_sub_x": input_x_list}
return res

def _build_positive_pairs(self, x: pd.DataFrame, n: int) -> List[pd.DataFrame]:
"""
Builds positive pairs of sub-dataframes from the input dataframe.

Parameters
----------
x : pd.DataFrame
The input dataframe.
n : int
The number of sub-dataframes to create.

Returns
-------
List[pd.DataFrame]
A list of sub-dataframes, each containing a positive pair of columns.
"""
x_cols = x.columns.tolist()
sub_col_list = np.array_split(np.array(x_cols), n)
len_cols = len(sub_col_list[0])
overlap = int(math.ceil(len_cols * (self.overlap_ratio)))
sub_x_list = []
for i, sub_col in enumerate(sub_col_list):
if overlap > 0 and i < n - 1:
sub_col = np.concatenate([sub_col, sub_col_list[i + 1][:overlap]])
elif overlap > 0 and i == n - 1:
sub_col = np.concatenate([sub_col, sub_col_list[i - 1][-overlap:]])
sub_x = x.copy()[sub_col]
sub_x_list.append(sub_x)
return sub_x_list

def _build_positive_pairs_single_view(self, x: pd.DataFrame) -> List[pd.DataFrame]:
"""
Builds positive pairs for a single view of data by corrupting half of the columns and shuffling the corrupted columns..

Parameters
----------
x : pd.DataFrame
The input dataframe.

Returns
-------
List[pd.DataFrame]
A list containing two dataframes, one with original data and one with shuffled columns.
"""
x_cols = x.columns.tolist()
sub_x_list = [x]
n_corrupt = int(len(x_cols) * 0.5)
corrupt_cols = x_cols[:n_corrupt]
x_corrupt = x.copy()[corrupt_cols]
np.random.shuffle(x_corrupt.values)
sub_x_list.append(pd.concat([x.copy().drop(corrupt_cols, axis=1), x_corrupt], axis=1))
return sub_x_list

+ 52
- 0
learnware/market/heterogeneous/searcher.py View File

@@ -0,0 +1,52 @@
import traceback
from typing import Tuple, List

from .utils import is_hetero
from ..base import BaseUserInfo
from ..easy import EasySearcher
from ..utils import parse_specification_type
from ...learnware import Learnware
from ...logger import get_module_logger


logger = get_module_logger("hetero_searcher")


class HeteroSearcher(EasySearcher):
def __call__(
self, user_info: BaseUserInfo, check_status: int = None, max_search_num: int = 5, search_method: str = "greedy"
) -> Tuple[List[float], List[Learnware], float, List[Learnware]]:
"""Search learnwares based on user_info from learnwares with check_status.
Employs heterogeneous learnware search if specific requirements are met, otherwise resorts to homogeneous search methods.

Parameters
----------
user_info : BaseUserInfo
user_info contains semantic_spec and stat_info
max_search_num : int
The maximum number of the returned learnwares
check_status : int, optional
- None: search from all learnwares
- Others: search from learnwares with check_status

Returns
-------
Tuple[List[float], List[Learnware], float, List[Learnware]]
the first is the sorted list of rkme dist
the second is the sorted list of Learnware (single) by the rkme dist
the third is the score of Learnware (mixture)
the fourth is the list of Learnware (mixture), the size is search_num
"""
learnware_list = self.learnware_organizer.get_learnwares(check_status=check_status)
learnware_list = self.semantic_searcher(learnware_list, user_info)

if len(learnware_list) == 0:
return [], [], 0.0, []

if parse_specification_type(stat_specs=user_info.stat_info) is not None:
if is_hetero(stat_specs=user_info.stat_info, semantic_spec=user_info.semantic_spec):
user_hetero_spec = self.learnware_organizer.generate_hetero_map_spec(user_info)
user_info.update_stat_info(user_hetero_spec.type, user_hetero_spec)
return self.stat_searcher(learnware_list, user_info, max_search_num, search_method)
else:
return None, learnware_list, 0.0, None

+ 44
- 0
learnware/market/heterogeneous/utils.py View File

@@ -0,0 +1,44 @@
from ...logger import get_module_logger

logger = get_module_logger("hetero_utils")


def is_hetero(stat_specs: dict, semantic_spec: dict) -> bool:
"""Check if user_info satifies all the criteria required for enabling heterogeneous learnware search

Parameters
----------
user_info : BaseUserInfo
user_info contains semantic_spec and stat_info

Returns
-------
bool
A flag indicating whether heterogeneous search is enabled for user_info
"""
try:
table_stat_spec = stat_specs["RKMETableSpecification"]
table_input_shape = table_stat_spec.get_z().shape[1]

semantic_task_type = semantic_spec["Task"]["Values"]
if semantic_task_type not in [["Classification"], ["Regression"]]:
logger.warning("User doesn't provide correct task type, it must be either Classification or Regression.")
return False

semantic_input_description = semantic_spec["Input"]
semantic_description_dim = int(semantic_input_description["Dimension"])
semantic_decription_feature_num = len(semantic_input_description["Description"])

if semantic_decription_feature_num <= 0:
logger.warning("At least one of Input.Description in semantic spec should be provides.")
return False

if table_input_shape != semantic_description_dim:
logger.warning("User data feature dimensions mismatch with semantic specification.")
return False

return True

except Exception as e:
logger.warning(f"Invalid heterogeneous search information provided due to {e}. Use homogeneous search instead.")
return False

+ 42
- 15
learnware/market/module.py View File

@@ -1,24 +1,51 @@
from .base import LearnwareMarket
from .easy import EasyOrganizer, EasySearcher, EasySemanticChecker, EasyStatChecker
from .heterogeneous import HeteroMapTableOrganizer, HeteroSearcher


def get_market_config():
market_config = {
"easy": {
"organizer": EasyOrganizer(),
"searcher": EasySearcher(),
"checker_list": [EasySemanticChecker(), EasyStatChecker()],
def get_market_component(name, market_id, rebuild, organizer_kwargs=None, searcher_kwargs=None, checker_kwargs=None):
organizer_kwargs = {} if organizer_kwargs is None else organizer_kwargs
searcher_kwargs = {} if searcher_kwargs is None else searcher_kwargs
checker_kwargs = {} if checker_kwargs is None else checker_kwargs

if name == "easy":
easy_organizer = EasyOrganizer(market_id=market_id, rebuild=rebuild)
easy_searcher = EasySearcher(organizer=easy_organizer)
easy_checker_list = [EasySemanticChecker(), EasyStatChecker()]
market_component = {
"organizer": easy_organizer,
"searcher": easy_searcher,
"checker_list": easy_checker_list,
}
elif name == "hetero":
hetero_organizer = HeteroMapTableOrganizer(market_id=market_id, rebuild=rebuild, **organizer_kwargs)
hetero_searcher = HeteroSearcher(organizer=hetero_organizer)
hetero_checker_list = [EasySemanticChecker(), EasyStatChecker()]

market_component = {
"organizer": hetero_organizer,
"searcher": hetero_searcher,
"checker_list": hetero_checker_list,
}
}
return market_config
else:
raise ValueError(f"name {name} is not supported for market")

return market_component


def instantiate_learnware_market(market_id="default", name="easy", **kwargs):
market_config = get_market_config()
def instantiate_learnware_market(
market_id="default",
name="easy",
rebuild=False,
organizer_kwargs: dict = None,
searcher_kwargs: dict = None,
checker_kwargs: dict = None,
**kwargs,
):
market_componets = get_market_component(name, market_id, rebuild, organizer_kwargs, searcher_kwargs, checker_kwargs)
return LearnwareMarket(
market_id=market_id,
organizer=market_config[name]["organizer"],
searcher=market_config[name]["searcher"],
checker_list=market_config[name]["checker_list"],
**kwargs
organizer=market_componets["organizer"],
searcher=market_componets["searcher"],
checker_list=market_componets["checker_list"],
**kwargs,
)

+ 7
- 1
learnware/market/utils.py View File

@@ -2,7 +2,13 @@ from ..specification import Specification


def parse_specification_type(
stat_specs: dict, spec_list=["RKMETableSpecification", "RKMETextSpecification", "RKMEImageSpecification"]
stat_specs: dict,
spec_list=[
"HeteroMapTableSpecification",
"RKMETableSpecification",
"RKMETextSpecification",
"RKMEImageSpecification",
],
):
for spec in spec_list:
if spec in stat_specs:


+ 11
- 1
learnware/reuse/__init__.py View File

@@ -1,3 +1,6 @@
from .base import BaseReuser
from .align import AlignLearnware

from ..logger import get_module_logger
from ..utils import is_torch_avaliable
from .utils import is_geatpy_avaliable, is_lightgbm_avaliable
@@ -12,9 +15,16 @@ else:

if not is_torch_avaliable(verbose=False):
AveragingReuser = None
logger.warning("AveragingReuser is skipped due to 'torch' is not installed!")
FeatureAugmentReuser = None
HeteroMapAlignLearnware = None
FeatureAlignLearnware = None
logger.warning(
"[AveragingReuser, FeatureAugmentReuser, HeteroMapAlignLearnware, FeatureAlignLearnware] is skipped due to 'torch' is not installed!"
)
else:
from .averaging import AveragingReuser
from .feature_augment import FeatureAugmentReuser
from .hetero import HeteroMapAlignLearnware, FeatureAlignLearnware

if not is_lightgbm_avaliable(verbose=False) or not is_torch_avaliable(verbose=False):
JobSelectorReuser = None


+ 25
- 0
learnware/reuse/align.py View File

@@ -0,0 +1,25 @@
from ..learnware import Learnware


class AlignLearnware(Learnware):
"""The aligned learnware class, providing the interfaces to align learnware and make predictions"""

def __init__(self, learnware: Learnware):
"""The initialization method for align learnware

Parameters
----------
learnware : Learnware
The learnware list to reuse and make predictions
"""
super(AlignLearnware, self).__init__(
id=learnware.id,
model=learnware.get_model(),
specification=learnware.get_specification(),
learnware_dirpath=learnware.get_dirpath(),
)

def align(self):
"""Align the learnware with specification or data"""

raise NotImplementedError("The align method is not implemented!")

+ 3
- 3
learnware/reuse/averaging.py View File

@@ -1,10 +1,10 @@
import torch
import numpy as np
from typing import List
from typing import List, Union
from scipy.special import softmax


from learnware.learnware import Learnware
from ..learnware import Learnware
from .base import BaseReuser
from ..logger import get_module_logger

@@ -20,7 +20,7 @@ class AveragingReuser(BaseReuser):
Parameters
----------
learnware_list : List[Learnware]
The learnware list
The list contains learnwares.
mode : str
- "mean": average the output of all learnwares for regression task (learnware output is a real number)
- "vote_by_label": vote by labels for classification task, learnware output belongs to the set {0, 1, ..., class_num}


+ 2
- 2
learnware/reuse/ensemble_pruning.py View File

@@ -4,7 +4,7 @@ import numpy as np
import geatpy as ea
from typing import List

from learnware.learnware import Learnware
from ..learnware import Learnware
from .base import BaseReuser
from ..logger import get_module_logger

@@ -24,7 +24,7 @@ class EnsemblePruningReuser(BaseReuser):
Parameters
----------
learnware_list : List[Learnware]
The learnware list
The list contains learnwares
mode : str
- "regression" for regression task (learnware output is a real number)
- "classification" for classification task (learnware output is a logitis vector or belongs to the set {0, 1, ..., class_num})


+ 111
- 0
learnware/reuse/feature_augment.py View File

@@ -0,0 +1,111 @@
import torch
import numpy as np
from typing import List
from sklearn.linear_model import RidgeCV, LogisticRegressionCV

from .base import BaseReuser
from .utils import fill_data_with_mean
from ..learnware import Learnware


class FeatureAugmentReuser(BaseReuser):
"""
FeatureAugmentReuser is a class for augmenting features using predictions of a given learnware model and applying regression or classification on the augmented dataset.

This class supports two modes:
- "regression": Uses RidgeCV for regression tasks.
- "classification": Uses LogisticRegressionCV for classification tasks.
"""

def __init__(self, learnware_list: List[Learnware] = None, mode: str = None):
"""
Initialize the FeatureAugmentReuser with a learnware model and a mode.

Parameters
----------
learnware : List[Learnware]
The list contains learnwares.
mode : str
The mode of operation, either "regression" or "classification".
"""
super(FeatureAugmentReuser, self).__init__(learnware_list)
assert mode in ["classification", "regression"], "Mode must be either 'classification' or 'regression'"
self.mode = mode
self.augment_reuser = None

def predict(self, user_data: np.ndarray) -> np.ndarray:
"""
Predict the output for user data using the trained output aligner model.

Parameters
----------
user_data : np.ndarray
Input data for making predictions.

Returns
-------
np.ndarray
Predicted output from the output aligner model.
"""
assert self.augment_reuser is not None, "FeatureAugmentReuser is not trained by labeled data yet."

user_data = fill_data_with_mean(user_data)
user_data_aug = self._get_augment_data(user_data)
y_pred_aug = self.augment_reuser.predict(user_data_aug)

return y_pred_aug

def fit(self, x_train: np.ndarray, y_train: np.ndarray):
"""
Train the output aligner model using the training data augmented with predictions from the learnware model.

Parameters
----------
x_train : np.ndarray
Training data features.
y_train : np.ndarray
Training data labels.
"""
x_train = fill_data_with_mean(x_train)
x_train_aug = self._get_augment_data(x_train)

if self.mode == "regression":
alpha_list = [0.01, 0.1, 1.0, 10, 100]
ridge_cv = RidgeCV(alphas=alpha_list, store_cv_values=True)
ridge_cv.fit(x_train_aug, y_train)
self.augment_reuser = ridge_cv
else:
self.augment_reuser = LogisticRegressionCV(cv=5, max_iter=1000, random_state=0, multi_class="auto")
self.augment_reuser.fit(x_train_aug, y_train)

def _get_augment_data(self, X: np.ndarray) -> np.ndarray:
"""Get the augmented data with model output.

Parameters
----------
X : np.ndarray
Input data.

Returns
-------
np.ndarray
Augment data with model output.

Raises
------
TypeError
If the type of model output not in [np.ndarray, torch.Tensor].
"""
y_preds = []
for learnware in self.learnware_list:
y_pred = learnware.predict(X)
if isinstance(y_pred, torch.Tensor):
y_pred = y_pred.detach().cpu().numpy()
if not isinstance(y_pred, np.ndarray):
raise TypeError(f"Model output must be np.ndarray or torch.Tensor")
if len(y_pred.shape) == 1:
y_pred = y_pred.reshape(-1, 1)
y_preds.append(y_pred)
y_preds = np.concatenate(y_preds, axis=1)

return np.concatenate((X, y_preds), axis=1)

+ 2
- 0
learnware/reuse/hetero/__init__.py View File

@@ -0,0 +1,2 @@
from .feature_align import FeatureAlignLearnware
from .hetero_map import HeteroMapAlignLearnware

+ 331
- 0
learnware/reuse/hetero/feature_align.py View File

@@ -0,0 +1,331 @@
import time
import torch
import numpy as np
import torch.nn as nn
from typing import List
from tqdm import trange
import torch.nn.functional as F

from ..align import AlignLearnware
from ..utils import fill_data_with_mean
from ...utils import choose_device
from ...logger import get_module_logger
from ...learnware import Learnware
from ...specification import RKMETableSpecification

logger = get_module_logger("feature_align")


class FeatureAlignLearnware(AlignLearnware):
"""
FeatureAlignLearnware is a class for aligning features from a user dataset with a target dataset using a learnware model.
It supports both classification and regression tasks and uses a feature alignment trainer for alignment.

Attributes
----------
learnware : Learnware
The learnware model used for final prediction.
align_arguments : dict
Additional arguments for the feature alignment trainer.
cuda_idx : int
Index of the CUDA device to be used for computations.
device : torch.device
The device (CPU or CUDA) on which computations will be performed.
"""

def __init__(self, learnware: Learnware = None, cuda_idx=0, **align_arguments):
"""
Initialize the FeatureAlignLearnware with a learnware model, mode, CUDA device index, and alignment arguments.

Parameters
----------
learnware : Learnware
A learnware model used for initial predictions.
cuda_idx : int
The index of the CUDA device for computations.
align_arguments : dict
Additional arguments to be passed to the feature alignment trainer.
"""
super(FeatureAlignLearnware, self).__init__(learnware)
self.align_arguments = align_arguments
self.cuda_idx = cuda_idx
self.device = choose_device(cuda_idx=cuda_idx)
self.align_model = None

def align(self, user_rkme: RKMETableSpecification):
"""
Train the align model using the RKME specifications from the user and the learnware.

Parameters
----------
user_rkme : RKMETableSpecification
The RKME specification from the user dataset.
"""
target_rkme = self.specification.get_stat_spec()["RKMETableSpecification"]
trainer = FeatureAlignTrainer(
target_rkme=target_rkme, user_rkme=user_rkme, cuda_idx=self.cuda_idx, **self.align_arguments
)
self.align_model = trainer.model
self.align_model.eval()

def predict(self, user_data: np.ndarray) -> np.ndarray:
"""
Predict the output for user data using the aligned model and learnware model.

Parameters
----------
user_data : np.ndarray
Input data for making predictions.

Returns
-------
np.ndarray
Predicted output from the learnware model after alignment.
"""
assert self.align_model is not None, "FeatureAlignLearnware must be aligned before making predictions."
user_data = fill_data_with_mean(user_data)
transformed_user_data = (
self.align_model(torch.tensor(user_data, device=self.device).float()).detach().cpu().numpy()
)
y_pred = super(FeatureAlignLearnware, self).predict(transformed_user_data)
return y_pred


class FeatureAlignModel(nn.Module):
"""
FeatureAlignModel is a neural network module designed for feature alignment tasks.
It consists of multiple fully connected (dense) layers, optional dropout and batch normalization layers,
and supports different activation functions.
"""

def __init__(
self,
input_dim: int,
output_dim: int,
hidden_dims: list = [1024],
activation: str = "relu",
dropout_ratio: float = 0,
use_bn: bool = False,
):
"""
Initialize the FeatureAlignModel.

Parameters
----------
input_dim : int
The dimensionality of the input features.
output_dim : int
The dimensionality of the output features.
hidden_dims : List[int], optional
A list specifying the number of units in each hidden layer.
activation : str, optional
The activation function to use. Supported options are "relu", "gelu", "selu", and "leakyrelu".
dropout_ratio : float, optional
The dropout ratio applied to each layer (0 means no dropout).
use_bn : bool, optional
Whether to use batch normalization after each fully connected layer.
"""
super().__init__()
dims = [input_dim] + hidden_dims + [output_dim]
self.fc_list = nn.ModuleList()
self.drop_list = nn.ModuleList()

if len(hidden_dims) > 0:
for i in range(len(dims) - 2):
self.drop_list.append(nn.Dropout(dropout_ratio))
if use_bn:
self.fc_list.append(nn.Sequential(nn.Linear(dims[i], dims[i + 1]), nn.BatchNorm1d(dims[i + 1])))
else:
self.fc_list.append(nn.Linear(dims[i], dims[i + 1]))

self.final_fc = nn.Linear(dims[-2], dims[-1])

if activation == "gelu":
self.activation = F.gelu
elif activation == "selu":
self.activation = F.selu
elif activation == "leakyrelu":
self.activation = F.leaky_relu
else:
self.activation = F.relu

def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of the model.

Parameters
----------
x : torch.Tensor
Input tensor.

Returns
-------
torch.Tensor
Output tensor after passing through the model.
"""
if len(self.fc_list) > 0:
for fc, drop in zip(self.fc_list, self.drop_list):
x = fc(x) # Apply fully connected layer
x = self.activation(x) # Apply activation function
x = drop(x) # Apply dropout

# Return output from final fully connected layer
return self.final_fc(x)


class FeatureAlignTrainer:
"""
FeatureAlignTrainer is a class designed to train a neural network for aligning features from a user dataset
to a target dataset. It utilizes Maximum Mean Discrepancy (MMD) as the loss function for training.

Attributes
----------
target_rkme : RKMETableSpecification
The RKME specification of the target dataset.
user_rkme : RKMETableSpecification
The RKME specification of the user dataset.
num_epoch : int
The number of training epochs.
lr : float
Learning rate for the optimizer.
gamma : float
The gamma parameter for the Gaussian kernel in MMD computation.
network_type : str
Type of the neural network used for feature alignment.
optimizer_type : str
Type of optimizer to be used in training ('Adam' or 'SGD').
hidden_dims : List[int]
A list specifying the number of units in each hidden layer.
activation : str
The activation function to use in the network.
dropout_ratio : float
The dropout ratio applied to each layer.
use_bn : bool
Whether to use batch normalization after each fully connected layer.
const : float
A constant value used in training.
cuda_idx : int
Index of the CUDA device to be used for computations.
"""

def __init__(
self,
target_rkme: RKMETableSpecification,
user_rkme: RKMETableSpecification,
num_epoch: int = 50,
lr: float = 1e-3,
gamma: float = 0.1,
network_type: str = "ArbitraryMapping",
optimizer_type: str = "Adam",
hidden_dims: List[int] = [1024],
activation: str = "relu",
dropout_ratio: float = 0,
use_bn: bool = False,
const: float = 1e1,
cuda_idx: int = 0,
):
"""
Initialize the FeatureAlignTrainer with the specified parameters.
"""
self.target_rkme = target_rkme
self.user_rkme = user_rkme
self.args = {
"lr": lr,
"num_epoch": num_epoch,
"gamma": gamma,
"hidden_dims": hidden_dims,
"activation": activation,
"dropout_ratio": dropout_ratio,
"use_bn": use_bn,
}
self.network_type = network_type
self.optimizer_type = optimizer_type
self.const = const
self.device = choose_device(cuda_idx=cuda_idx)
self.train()

def gaussian_kernel(self, x1: torch.Tensor, x2: torch.Tensor):
"""
Compute the Gaussian kernel between two sets of samples.

Parameters
----------
x1 : torch.Tensor
First set of samples.
x2 : torch.Tensor
Second set of samples.

Returns
-------
torch.Tensor
The computed Gaussian kernel matrix.
"""
x1 = x1.double()
x2 = x2.double()
X12norm = torch.sum(x1**2, 1, keepdim=True) - 2 * x1 @ x2.T + torch.sum(x2**2, 1, keepdim=True).T
return torch.exp(-X12norm * self.args["gamma"])

def compute_mmd(
self, user_X: torch.Tensor, user_weight: torch.Tensor, target_X: torch.Tensor, target_weight: torch.Tensor
) -> torch.Tensor:
"""
Compute the Maximum Mean Discrepancy (MMD) between the user and target datasets.

Parameters
----------
user_X : torch.Tensor
Transformed user data.
user_weight : torch.Tensor
Weights of the user data.
target_X : torch.Tensor
Target data.
target_weight : torch.Tensor
Weights of the target data.

Returns
-------
torch.Tensor
The computed MMD loss.
"""
term1 = torch.sum(self.gaussian_kernel(user_X, user_X) * (user_weight.T @ user_weight))
term2 = torch.sum(self.gaussian_kernel(user_X, target_X) * (user_weight.T @ target_weight))
term3 = torch.sum(self.gaussian_kernel(target_X, target_X) * (target_weight.T @ target_weight))
return term1 - 2 * term2 + term3

def train(self):
"""
Train the feature alignment model using MMD as the loss function.
"""
args = self.args
input_dim = self.user_rkme.get_z().shape[1]
output_dim = self.target_rkme.get_z().shape[1]

user_model = FeatureAlignModel(
input_dim, output_dim, args["hidden_dims"], args["activation"], args["dropout_ratio"], args["use_bn"]
)
user_model.to(self.device)
user_data_x = torch.tensor(self.user_rkme.get_z(), device=self.device).float()
user_data_weight = torch.tensor(self.user_rkme.get_beta(), device=self.device).view(1, -1).double()
target_data_x = torch.tensor(self.target_rkme.get_z(), device=self.device)
target_data_weight = torch.tensor(self.target_rkme.get_beta(), device=self.device).view(1, -1).double()
if self.optimizer_type == "Adam":
optimizer = torch.optim.Adam(user_model.parameters(), lr=args["lr"])
else:
optimizer = torch.optim.SGD(user_model.parameters(), lr=args["lr"])

start_time = time.time()
for epoch in trange(args["num_epoch"], desc="Epoch"):
transformed_user_data_x = user_model(user_data_x)
mmd_loss = self.compute_mmd(transformed_user_data_x, user_data_weight, target_data_x, target_data_weight)

optimizer.zero_grad()
mmd_loss.backward()
optimizer.step()
logger.info(
"epoch: {}, train mmd_loss: {:.4f}, lr: {:.6f}, spent: {:.1f} secs".format(
epoch, mmd_loss.item(), optimizer.param_groups[0]["lr"], time.time() - start_time
)
)

self.model = user_model
logger.info("training complete, cost {:.1f} secs.".format(time.time() - start_time))

+ 92
- 0
learnware/reuse/hetero/hetero_map.py View File

@@ -0,0 +1,92 @@
import numpy as np

from ..align import AlignLearnware
from ...learnware import Learnware
from ...logger import get_module_logger
from .feature_align import FeatureAlignLearnware
from ..feature_augment import FeatureAugmentReuser
from ...specification import RKMETableSpecification

logger = get_module_logger("hetero_map_align")


class HeteroMapAlignLearnware(AlignLearnware):
"""
HeteroMapAlignLearnware is a class designed for reusing learnware models with feature alignment and augmentation.
It can handle both classification and regression tasks and supports fine-tuning on additional training data.

Attributes
----------
learnware : Learnware
The learnware model to be reused.
mode : str
The mode of operation, either "classification" or "regression".
cuda_idx : int
Index of the CUDA device to be used for computations.
align_arguments : dict
Additional arguments for feature alignment.
"""

def __init__(self, learnware: Learnware = None, mode: str = None, cuda_idx=0, **align_arguments):
"""
Initialize the HeteroMapAlignLearnware with a learnware model, mode, CUDA device index, and alignment arguments.

Parameters
----------
learnware : Learnware
A learnware model used for initial predictions.
mode : str
The mode of operation, either "regression" or "classification".
cuda_idx : int
The index of the CUDA device for computations.
align_arguments : dict
Additional arguments to be passed to the feature alignment process.
"""
super(HeteroMapAlignLearnware, self).__init__(learnware)
assert mode in ["classification", "regression"], "Mode must be either 'classification' or 'regression'"
self.mode = mode
self.cuda_idx = cuda_idx
self.align_arguments = align_arguments
self.reuser = None

def align(self, user_rkme: RKMETableSpecification, x_train: np.ndarray = None, y_train: np.ndarray = None):
"""
Align the hetero learnware using the user RKME specification and labeled data.

Parameters
----------
user_rkme : RKMETableSpecification
The RKME specification from the user dataset.
x_train : ndarray
Training data features.
y_train : ndarray
Training data labels.
"""
self.feature_align_learnware = FeatureAlignLearnware(
learnware=self, cuda_idx=self.cuda_idx, **self.align_arguments
)
self.feature_align_learnware.align(user_rkme)

if x_train is None or y_train is None:
logger.warning("Hetero learnware may not perform well as labeled data alignment is not provided!")
self.reuser = self.feature_align_learnware
else:
self.reuser = FeatureAugmentReuser(learnware_list=[self.feature_align_learnware], mode=self.mode)
self.reuser.fit(x_train, y_train)

def predict(self, user_data):
"""
Predict the output for user data using the feature aligner or the fine-tuned model.

Parameters
----------
user_data : ndarray
Input data for making predictions.

Returns
-------
ndarray
Predicted output from the model.
"""
assert self.reuser is not None, "HeteroMapAlignLearnware must be aligned before making predictions."
return self.reuser.predict(user_data)

+ 4
- 6
learnware/reuse/job_selector.py View File

@@ -2,16 +2,14 @@ import torch
import numpy as np

from typing import List, Union
from qpsolvers import solve_qp
from lightgbm import LGBMClassifier, early_stopping
from sklearn.metrics import accuracy_score


from .base import BaseReuser
from ..market.utils import parse_specification_type
from ..learnware import Learnware
from ..specification import RKMETableSpecification, RKMETextSpecification
from ..specification import generate_rkme_spec, rkme_solve_qp
from ..specification import generate_rkme_table_spec, rkme_solve_qp
from ..logger import get_module_logger

logger = get_module_logger("job_selector_reuse")
@@ -39,7 +37,7 @@ class JobSelectorReuser(BaseReuser):

Parameters
----------
user_data : np.ndarray
user_data : Union[np.ndarray, List[str]]
User's unlabeled raw data.

Returns
@@ -168,7 +166,7 @@ class JobSelectorReuser(BaseReuser):
def _calculate_rkme_spec_mixture_weight(
self, user_data: np.ndarray, task_rkme_list: List[RKMETableSpecification], task_rkme_matrix: np.ndarray
) -> List[float]:
"""_summary_
"""Calculate mixture weight for the learnware_list based on user's data

Parameters
----------
@@ -180,7 +178,7 @@ class JobSelectorReuser(BaseReuser):
Inner product matrix calculated from task_rkme_list.
"""
task_num = len(task_rkme_list)
user_rkme_spec = generate_rkme_spec(X=user_data, reduce=False)
user_rkme_spec = generate_rkme_table_spec(X=user_data, reduce=False)
K = task_rkme_matrix
v = np.array([user_rkme_spec.inner_prod(task_rkme) for task_rkme in task_rkme_list])



+ 32
- 0
learnware/reuse/utils.py View File

@@ -1,3 +1,4 @@
import numpy as np
from ..logger import get_module_logger

logger = get_module_logger("reuse_utils")
@@ -23,3 +24,34 @@ def is_lightgbm_avaliable(verbose=False):
logger.warning("ModuleNotFoundError: lightgbm is not installed, please install lightgbm!")
return False
return True


def fill_data_with_mean(X: np.ndarray) -> np.ndarray:
"""
Fill missing data (NaN, Inf) in the input array with the mean of the column.

Parameters
----------
X : np.ndarray
Input data array that may contain missing values.

Returns
-------
np.ndarray
Data array with missing values filled.

Raises
------
ValueError
If a column in X contains only exceptional values (NaN, Inf).
"""
X[np.isinf(X) | np.isneginf(X) | np.isposinf(X) | np.isneginf(X)] = np.nan
if np.any(np.isnan(X)):
for col in range(X.shape[1]):
is_nan = np.isnan(X[:, col])
if np.any(is_nan):
if np.all(is_nan):
raise ValueError(f"All values in column {col} are exceptional, e.g., NaN and Inf.")
col_mean = np.nanmean(X[:, col])
X[:, col] = np.where(is_nan, col_mean, X[:, col])
return X

+ 6
- 3
learnware/specification/__init__.py View File

@@ -1,18 +1,21 @@
from .base import Specification, BaseStatSpecification
from .regular import (
RegularStatsSpecification,
RegularStatSpecification,
RKMEStatSpecification,
RKMETableSpecification,
RKMEImageSpecification,
RKMETextSpecification,
rkme_solve_qp,
)

from .system import HeteroMapTableSpecification

from ..utils import is_torch_avaliable

if not is_torch_avaliable(verbose=False):
generate_stat_spec = None
generate_rkme_spec = None
generate_rkme_table_spec = None
generate_rkme_image_spec = None
generate_rkme_text_spec = None
else:
from .module import generate_stat_spec, generate_rkme_spec, generate_rkme_image_spec, generate_rkme_text_spec
from .module import generate_stat_spec, generate_rkme_table_spec, generate_rkme_image_spec, generate_rkme_text_spec

+ 10
- 5
learnware/specification/module.py View File

@@ -9,7 +9,7 @@ from .regular import RKMETableSpecification, RKMEImageSpecification, RKMETextSpe
from ..config import C


def generate_rkme_spec(
def generate_rkme_table_spec(
X: Union[np.ndarray, pd.DataFrame, torch.Tensor],
gamma: float = 0.1,
reduced_set_size: int = 100,
@@ -197,13 +197,18 @@ def generate_rkme_text_spec(
return rkme_text_spec


def generate_stat_spec(type="table", *args, **kwargs) -> BaseStatSpecification:
def generate_stat_spec(
type: str, X: Union[np.ndarray, pd.DataFrame, torch.Tensor, List[str]], *args, **kwargs
) -> BaseStatSpecification:
"""
Interface for users to generate statistical specification.
Return a StatSpecification object, use .save() method to save as npy file.

Parameters
----------
type: str
Type of statistical specification.
Supported types: "table", "text", "image"
X : np.ndarray
Raw data in np.ndarray format.
Size of array: (n*d)
@@ -214,10 +219,10 @@ def generate_stat_spec(type="table", *args, **kwargs) -> BaseStatSpecification:
A StatSpecification object
"""
if type == "table":
return generate_rkme_spec(*args, **kwargs)
return generate_rkme_table_spec(X=X, *args, **kwargs)
elif type == "text":
return generate_rkme_text_spec(*args, **kwargs)
return generate_rkme_text_spec(X=X, *args, **kwargs)
elif type == "image":
return generate_rkme_image_spec(*args, **kwargs)
return generate_rkme_image_spec(X=X, *args, **kwargs)
else:
raise TypeError(f"type {type} is not supported!")

+ 1
- 1
learnware/specification/regular/__init__.py View File

@@ -1,4 +1,4 @@
from .base import RegularStatsSpecification
from .base import RegularStatSpecification
from ...utils import is_torch_avaliable

from .text import RKMETextSpecification


+ 1
- 1
learnware/specification/regular/base.py View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from ..base import BaseStatSpecification


class RegularStatsSpecification(BaseStatSpecification):
class RegularStatSpecification(BaseStatSpecification):
def generate_stat_spec(self, **kwargs):
self.generate_stat_spec_from_data(**kwargs)



+ 4
- 3
learnware/specification/regular/image/rkme.py View File

@@ -17,11 +17,12 @@ from torchvision.transforms import Resize
from tqdm import tqdm

from . import cnn_gp
from ..base import RegularStatsSpecification
from ..table.rkme import rkme_solve_qp, choose_device, setup_seed
from ..base import RegularStatSpecification
from ..table.rkme import rkme_solve_qp
from ....utils import choose_device, setup_seed


class RKMEImageSpecification(RegularStatsSpecification):
class RKMEImageSpecification(RegularStatSpecification):
# INNER_PRODUCT_COUNT = 0
IMAGE_WIDTH = 32



+ 3
- 42
learnware/specification/regular/table/rkme.py View File

@@ -13,13 +13,14 @@ from typing import Tuple, Any, List, Union, Dict
import scipy
from sklearn.cluster import MiniBatchKMeans

from ..base import RegularStatsSpecification
from ..base import RegularStatSpecification
from ....logger import get_module_logger
from ....utils import setup_seed, choose_device

logger = get_module_logger("rkme")


class RKMETableSpecification(RegularStatsSpecification):
class RKMETableSpecification(RegularStatSpecification):
"""Reduced Kernel Mean Embedding (RKME) Specification"""

def __init__(self, gamma: float = 0.1, cuda_idx: int = -1):
@@ -461,46 +462,6 @@ class RKMEStatSpecification(RKMETableSpecification):
super(RKMETableSpecification, self).__init__(type=RKMETableSpecification.__name__)


def setup_seed(seed):
"""Fix a random seed for addressing reproducibility issues.

Parameters
----------
seed : int
Random seed for torch, torch.cuda, numpy, random and cudnn libraries.
"""
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True


def choose_device(cuda_idx=-1):
"""Let users choose compuational device between CPU or GPU.

Parameters
----------
cuda_idx : int, optional
GPU index, by default -1 which stands for using CPU instead.

Returns
-------
torch.device
A torch.device object
"""
cuda_idx = int(cuda_idx)
if cuda_idx == -1 or not torch.cuda.is_available():
device = torch.device("cpu")
else:
device_count = torch.cuda.device_count()
if cuda_idx >= 0 and cuda_idx < device_count:
device = torch.device(f"cuda:{cuda_idx}")
else:
device = torch.device("cuda:0")
return device


def torch_rbf_kernel(x1, x2, gamma) -> torch.Tensor:
"""Use pytorch to compute rbf_kernel function at faster speed.



+ 1
- 0
learnware/specification/regular/text/rkme.py View File

@@ -2,6 +2,7 @@ import os
import langdetect
import numpy as np
from sentence_transformers import SentenceTransformer

from ..table import RKMETableSpecification
from ....logger import get_module_logger



+ 1
- 0
learnware/specification/system/__init__.py View File

@@ -0,0 +1 @@
from .hetero_table import HeteroMapTableSpecification

+ 13
- 0
learnware/specification/system/base.py View File

@@ -0,0 +1,13 @@
from ..base import BaseStatSpecification


class SystemStatSpecification(BaseStatSpecification):
def generate_stat_spec(self, **kwargs):
self.generate_stat_spec_from_system(**kwargs)

def generate_stat_spec_from_system(self, **kwargs):
"""Construct statistical specification from raw dataset
- kwargs may include the feature, label and model
- kwargs also can include hyperparameters of specific method for specifaction generation
"""
raise NotImplementedError("generate_stat_spec_from_data is not implemented")

+ 0
- 15
learnware/specification/system/heter_table.py View File

@@ -1,15 +0,0 @@
from ..base import BaseStatSpecification


class HeterMapTableSpecification(BaseStatSpecification):
def generate_stat_spec(self, **kwargs):
pass

def save(self, filepath: str):
pass

def load(self, filepath: str):
pass

def dist(self, other_spec):
pass

+ 160
- 0
learnware/specification/system/hetero_table.py View File

@@ -0,0 +1,160 @@
from __future__ import annotations

import os
import copy
import json
import torch
import codecs
import numpy as np

from .base import SystemStatSpecification
from ..regular import RKMETableSpecification
from ..regular.table.rkme import torch_rbf_kernel
from ...utils import choose_device, setup_seed


class HeteroMapTableSpecification(SystemStatSpecification):
"""Heterogeneous Map-Table Specification"""

def __init__(self, gamma: float = 0.1, cuda_idx: int = -1):
"""Initializing HeteroMapTableSpecification parameters.

Parameters
----------
gamma : float
Bandwidth in gaussian kernel, by default 0.1.
cuda_idx : int
A flag indicating whether use CUDA during RKME computation. -1 indicates CUDA not used.
"""
self.z = None
self.beta = None
self.embedding = None
self.weight = None
self.gamma = gamma
self.cuda_idx = cuda_idx
torch.cuda.empty_cache()
self.device = choose_device(cuda_idx=cuda_idx)
setup_seed(0)
super(HeteroMapTableSpecification, self).__init__(type=self.__class__.__name__)

def get_z(self) -> np.ndarray:
"""Move z(RKME reduced set points) back to memory accessible to the CPU.

Returns
-------
np.ndarray
A copy of z in CPU memory.
"""
return self.z.detach().cpu().numpy()

def get_beta(self) -> np.ndarray:
"""Move beta(RKME weights weights) back to memory accessible to the CPU.

Returns
-------
np.ndarray
A copy of beta in CPU memory.
"""
return self.beta.detach().cpu().numpy()

def generate_stat_spec_from_system(self, heter_embedding: np.ndarray, rkme_spec: RKMETableSpecification):
"""Construct heterogeneous map-table specification from RKME specification and embedding genereated by heterogeneous market mapping.

Parameters
----------
heter_embedding : np.ndarray
Embedding genereated by the heterogeneous market mapping.
rkme_spec : RKMETableSpecification
The RKME specification.
"""
self.beta = rkme_spec.beta.to(self.device)
self.z = torch.from_numpy(heter_embedding).double().to(self.device)

def inner_prod(self, Embed2: HeteroMapTableSpecification) -> float:
"""Compute the inner product between two HeteroMapTableSpecifications

Parameters
----------
Embed2 : HeteroMapTableSpecification
The other HeteroMapTableSpecification.

Returns
-------
float
The inner product between two HeteroMapTableSpecifications.
"""
beta_1 = self.beta.reshape(1, -1).double().to(self.device)
beta_2 = Embed2.beta.reshape(1, -1).double().to(self.device)
Z1 = self.z.double().reshape(self.z.shape[0], -1).to(self.device)
Z2 = Embed2.z.double().reshape(Embed2.z.shape[0], -1).to(self.device)
v = torch.sum(torch_rbf_kernel(Z1, Z2, self.gamma) * (beta_1.T @ beta_2))

return float(v)

def dist(self, Embed2: HeteroMapTableSpecification, omit_term1: bool = False) -> float:
"""Compute the Maximum-Mean-Discrepancy(MMD) between two HeteroMapTableSpecifications

Parameters
----------
Phi2 : HeteroMapTableSpecification
The other HeteroMapTableSpecification.
omit_term1 : bool, optional
True if the inner product of self with itself can be omitted, by default False.
"""
term1 = 0 if omit_term1 else self.inner_prod(self)
term2 = self.inner_prod(Embed2)
term3 = Embed2.inner_prod(Embed2)

return float(term1 - 2 * term2 + term3)

def load(self, filepath: str) -> bool:
"""Load a HeteroMapTableSpecification file in JSON format from the specified path.

Parameters
----------
filepath : str
The specified loading path.

Returns
-------
bool
True if the HeteroMapTableSpecification is loaded successfully.
"""
load_path = filepath
if os.path.exists(load_path):
with codecs.open(load_path, "r", encoding="utf-8") as fin:
obj_text = fin.read()
embedding_load = json.loads(obj_text)
embedding_load["device"] = choose_device(embedding_load["cuda_idx"])
embedding_load["z"] = torch.from_numpy(np.array(embedding_load["z"]))
embedding_load["beta"] = torch.from_numpy(np.array(embedding_load["beta"]))

for d in self.__dir__():
if d in embedding_load.keys():
setattr(self, d, embedding_load[d])
return True
else:
return False

def save(self, filepath: str) -> bool:
"""Save the computed HeteroMapTableSpecification to a specified path in JSON format.

Parameters
----------
filepath : str
The specified saving path.
"""
save_path = filepath
embedding_to_save = copy.deepcopy(self.__dict__)
if torch.is_tensor(embedding_to_save["z"]):
embedding_to_save["z"] = embedding_to_save["z"].detach().cpu().numpy()
embedding_to_save["z"] = embedding_to_save["z"].tolist()
if torch.is_tensor(embedding_to_save["beta"]):
embedding_to_save["beta"] = embedding_to_save["beta"].detach().cpu().numpy()
embedding_to_save["beta"] = embedding_to_save["beta"].tolist()
embedding_to_save["device"] = "gpu" if embedding_to_save["cuda_idx"] != -1 else "cpu"
json.dump(
embedding_to_save,
codecs.open(save_path, "w", encoding="utf-8"),
separators=(",", ":"),
)

+ 1
- 0
learnware/utils/__init__.py View File

@@ -4,6 +4,7 @@ import zipfile
from .import_utils import is_torch_avaliable
from .module import get_module_by_module_path
from .file import read_yaml_to_dict, save_dict_to_yaml
from .gpu import setup_seed, choose_device


def zip_learnware_folder(path: str, output_name: str):


+ 46
- 0
learnware/utils/gpu.py View File

@@ -0,0 +1,46 @@
import random
import numpy as np


def setup_seed(seed):
import torch

"""Fix a random seed for addressing reproducibility issues.

Parameters
----------
seed : int
Random seed for torch, torch.cuda, numpy, random and cudnn libraries.
"""
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True


def choose_device(cuda_idx=-1):
import torch

"""Let users choose compuational device between CPU or GPU.

Parameters
----------
cuda_idx : int, optional
GPU index, by default -1 which stands for using CPU instead.

Returns
-------
torch.device
A torch.device object
"""
cuda_idx = int(cuda_idx)
if cuda_idx == -1 or not torch.cuda.is_available():
device = torch.device("cpu")
else:
device_count = torch.cuda.device_count()
if cuda_idx >= 0 and cuda_idx < device_count:
device = torch.device(f"cuda:{cuda_idx}")
else:
device = torch.device("cuda:0")
return device

+ 1
- 0
setup.py View File

@@ -66,6 +66,7 @@ REQUIRED = [
"rapidfuzz>=3.4.0",
"langdetect>=1.0.9",
"huggingface-hub<0.18",
"transformers>=4.34.1",
"portalocker>=2.0.0",
"qpsolvers[clarabel]>=4.0.1",
]


+ 100
- 0
tests/test_hetero_market/example_learnwares/config.py View File

@@ -0,0 +1,100 @@
input_shape_list = [20, 30] # 20-input shape of example learnware 0, 30-input shape of example learnware 1

input_description_list = [
{
"Dimension": 20,
"Description": { # medical description
"0": "baseline value: Baseline Fetal Heart Rate (FHR)",
"1": "accelerations: Number of accelerations per second",
"2": "fetal_movement: Number of fetal movements per second",
"3": "uterine_contractions: Number of uterine contractions per second",
"4": "light_decelerations: Number of LDs per second",
"5": "severe_decelerations: Number of SDs per second",
"6": "prolongued_decelerations: Number of PDs per second",
"7": "abnormal_short_term_variability: Percentage of time with abnormal short term variability",
"8": "mean_value_of_short_term_variability: Mean value of short term variability",
"9": "percentage_of_time_with_abnormal_long_term_variability: Percentage of time with abnormal long term variability",
"10": "mean_value_of_long_term_variability: Mean value of long term variability",
"11": "histogram_width: Width of the histogram made using all values from a record",
"12": "histogram_min: Histogram minimum value",
"13": "histogram_max: Histogram maximum value",
"14": "histogram_number_of_peaks: Number of peaks in the exam histogram",
"15": "histogram_number_of_zeroes: Number of zeroes in the exam histogram",
"16": "histogram_mode: Hist mode",
"17": "histogram_mean: Hist mean",
"18": "histogram_median: Hist Median",
"19": "histogram_variance: Hist variance",
},
},
{
"Dimension": 30,
"Description": { # business description
"0": "This is a consecutive month number, used for convenience. For example, January 2013 is 0, February 2013 is 1,..., October 2015 is 33.",
"1": "This is the unique identifier for each shop.",
"2": "This is the unique identifier for each item.",
"3": "This is the code representing the city where the shop is located.",
"4": "This is the unique identifier for the category of the item.",
"5": "This is the code representing the type of the item.",
"6": "This is the code representing the subtype of the item.",
"7": "This is the number of this type of item sold in the shop one month ago.",
"8": "This is the number of this type of item sold in the shop two months ago.",
"9": "This is the number of this type of item sold in the shop three months ago.",
"10": "This is the number of this type of item sold in the shop six months ago.",
"11": "This is the number of this type of item sold in the shop twelve months ago.",
"12": "This is the average count of items sold one month ago.",
"13": "This is the average count of this type of item sold one month ago.",
"14": "This is the average count of this type of item sold two months ago.",
"15": "This is the average count of this type of item sold three months ago.",
"16": "This is the average count of this type of item sold six months ago.",
"17": "This is the average count of this type of item sold twelve months ago.",
"18": "This is the average count of items sold in the shop one month ago.",
"19": "This is the average count of items sold in the shop two months ago.",
"20": "This is the average count of items sold in the shop three months ago.",
"21": "This is the average count of items sold in the shop six months ago.",
"22": "This is the average count of items sold in the shop twelve months ago.",
"23": "This is the average count of items in the same category sold one month ago.",
"24": "This is the average count of items in the same category sold in the shop one month ago.",
"25": "This is the average count of items of the same type sold in the shop one month ago.",
"26": "This is the average count of items of the same subtype sold in the shop one month ago.",
"27": "This is the average count of items sold in the same city one month ago.",
"28": "This is the average count of this type of item sold in the same city one month ago.",
"29": "This is the average count of items of the same type sold one month ago.",
},
},
]

output_description_list = [
{
"Dimension": 1,
"Description": {"0": "length of stay: Length of hospital stay (days)"}, # medical description
},
{
"Dimension": 1,
"Description": { # business description
"0": "sales of the item in the next day: Number of items sold in the next day"
},
},
]

user_description_list = [
{
"Dimension": 15,
"Description": { # medical description
"0": "Whether the patient is on thyroxine medication (0: No, 1: Yes)",
"1": "Whether the patient has been queried about thyroxine medication (0: No, 1: Yes)",
"2": "Whether the patient is on antithyroid medication (0: No, 1: Yes)",
"3": "Whether the patient has undergone thyroid surgery (0: No, 1: Yes)",
"4": "Whether the patient has been queried about hypothyroidism (0: No, 1: Yes)",
"5": "Whether the patient has been queried about hyperthyroidism (0: No, 1: Yes)",
"6": "Whether the patient is pregnant (0: No, 1: Yes)",
"7": "Whether the patient is sick (0: No, 1: Yes)",
"8": "Whether the patient has a tumor (0: No, 1: Yes)",
"9": "Whether the patient is taking lithium (0: No, 1: Yes)",
"10": "Whether the patient has a goitre (enlarged thyroid gland) (0: No, 1: Yes)",
"11": "Whether TSH (Thyroid Stimulating Hormone) level has been measured (0: No, 1: Yes)",
"12": "Whether T3 (Triiodothyronine) level has been measured (0: No, 1: Yes)",
"13": "Whether TT4 (Total Thyroxine) level has been measured (0: No, 1: Yes)",
"14": "Whether T4U (Thyroxine Utilization) level has been measured (0: No, 1: Yes)",
},
}
]

+ 8
- 0
tests/test_hetero_market/example_learnwares/learnware.yaml View File

@@ -0,0 +1,8 @@
model:
class_name: MyModel
kwargs: {}
stat_specifications:
- module_path: learnware.specification
class_name: RKMETableSpecification
file_name: stat.json
kwargs: {}

+ 16
- 0
tests/test_hetero_market/example_learnwares/model0.py View File

@@ -0,0 +1,16 @@
from learnware.model import BaseModel
import numpy as np
import joblib
import os


class MyModel(BaseModel):
def __init__(self):
super(MyModel, self).__init__(input_shape=(20,), output_shape=(1,))
dir_path = os.path.dirname(os.path.abspath(__file__))
model_path = os.path.join(dir_path, "ridge.pkl")
model = joblib.load(model_path)
self.model = model

def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)

+ 16
- 0
tests/test_hetero_market/example_learnwares/model1.py View File

@@ -0,0 +1,16 @@
from learnware.model import BaseModel
import numpy as np
import joblib
import os


class MyModel(BaseModel):
def __init__(self):
super(MyModel, self).__init__(input_shape=(30,), output_shape=(1,))
dir_path = os.path.dirname(os.path.abspath(__file__))
model_path = os.path.join(dir_path, "ridge.pkl")
model = joblib.load(model_path)
self.model = model

def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)

+ 1
- 0
tests/test_hetero_market/example_learnwares/requirements.txt View File

@@ -0,0 +1 @@
learnware == 0.1.0.999

+ 425
- 0
tests/test_hetero_market/test_hetero.py View File

@@ -0,0 +1,425 @@
import torch
import unittest
import os
import copy
import joblib
import zipfile
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.datasets import make_regression
from shutil import copyfile, rmtree
from multiprocessing import Pool
from learnware.client import LearnwareClient
from sklearn.metrics import mean_squared_error

import learnware
from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.specification import RKMETableSpecification, generate_rkme_table_spec
from learnware.reuse import HeteroMapAlignLearnware, AveragingReuser, EnsemblePruningReuser
from example_learnwares.config import (
input_shape_list,
input_description_list,
output_description_list,
user_description_list,
)

curr_root = os.path.dirname(os.path.abspath(__file__))

user_semantic = {
"Data": {"Values": ["Table"], "Type": "Class"},
"Task": {
"Values": ["Regression"],
"Type": "Class",
},
"Library": {"Values": ["Scikit-learn"], "Type": "Class"},
"Scenario": {"Values": ["Education"], "Type": "Tag"},
"Description": {"Values": "", "Type": "String"},
"Name": {"Values": "", "Type": "String"},
}


def check_learnware(learnware_name, dir_path=os.path.join(curr_root, "learnware_pool")):
print(f"Checking Learnware: {learnware_name}")
zip_file_path = os.path.join(dir_path, learnware_name)
client = LearnwareClient()
# if check_learnware doesn't raise an exception, return True, otherwise, return false
try:
client.check_learnware(zip_file_path)
return True
except Exception as e:
print(f"Learnware {learnware_name} failed the check: {e}")
return False


class TestMarket(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
np.random.seed(2023)
learnware.init()

def _init_learnware_market(self, organizer_kwargs=None):
"""initialize learnware market"""
hetero_market = instantiate_learnware_market(
market_id="hetero_toy", name="hetero", rebuild=True, organizer_kwargs=organizer_kwargs
)
return hetero_market

def test_prepare_learnware_randomly(self, learnware_num=5):
self.zip_path_list = []

for i in range(learnware_num):
dir_path = os.path.join(curr_root, "learnware_pool", "ridge_%d" % (i))
os.makedirs(dir_path, exist_ok=True)

print("Preparing Learnware: %d" % (i))

example_learnware_idx = i % 2
input_dim = input_shape_list[example_learnware_idx]
learnware_example_dir = "example_learnwares"

X, y = make_regression(n_samples=5000, n_informative=15, n_features=input_dim, noise=0.1, random_state=42)

clf = Ridge(alpha=1.0)
clf.fit(X, y)

joblib.dump(clf, os.path.join(dir_path, "ridge.pkl"))

spec = generate_rkme_table_spec(X=X, gamma=0.1, cuda_idx=0)
spec.save(os.path.join(dir_path, "stat.json"))

init_file = os.path.join(dir_path, "__init__.py")
copyfile(
os.path.join(curr_root, learnware_example_dir, f"model{example_learnware_idx}.py"), init_file
) # cp example_init.py init_file

yaml_file = os.path.join(dir_path, "learnware.yaml")
copyfile(
os.path.join(curr_root, learnware_example_dir, "learnware.yaml"), yaml_file
) # cp example.yaml yaml_file

env_file = os.path.join(dir_path, "requirements.txt")
copyfile(os.path.join(curr_root, learnware_example_dir, "requirements.txt"), env_file)

zip_file = dir_path + ".zip"
# zip -q -r -j zip_file dir_path
with zipfile.ZipFile(zip_file, "w") as zip_obj:
for foldername, subfolders, filenames in os.walk(dir_path):
for filename in filenames:
file_path = os.path.join(foldername, filename)
zip_info = zipfile.ZipInfo(filename)
zip_info.compress_type = zipfile.ZIP_STORED
with open(file_path, "rb") as file:
zip_obj.writestr(zip_info, file.read())

rmtree(dir_path) # rm -r dir_path

self.zip_path_list.append(zip_file)

def test_generated_learnwares(self):
curr_root = os.path.dirname(os.path.abspath(__file__))
dir_path = os.path.join(curr_root, "learnware_pool")

# Execute multi-process checking using Pool
with Pool() as pool:
results = pool.starmap(check_learnware, [(name, dir_path) for name in os.listdir(dir_path)])

# Use an assert statement to ensure that all checks return True
self.assertTrue(all(results), "Not all learnwares passed the check")

def test_upload_delete_learnware(self, learnware_num=5, delete=True):
hetero_market = self._init_learnware_market()
self.test_prepare_learnware_randomly(learnware_num)
self.learnware_num = learnware_num

print("Total Item:", len(hetero_market))
assert len(hetero_market) == 0, f"The market should be empty!"

for idx, zip_path in enumerate(self.zip_path_list):
semantic_spec = copy.deepcopy(user_semantic)
semantic_spec["Name"]["Values"] = "learnware_%d" % (idx)
semantic_spec["Description"]["Values"] = "test_learnware_number_%d" % (idx)
semantic_spec["Input"] = input_description_list[idx % 2]
semantic_spec["Output"] = output_description_list[idx % 2]
hetero_market.add_learnware(zip_path, semantic_spec)

print("Total Item:", len(hetero_market))
assert len(hetero_market) == self.learnware_num, f"The number of learnwares must be {self.learnware_num}!"
curr_inds = hetero_market.get_learnware_ids()
print("Available ids After Uploading Learnwares:", curr_inds)
assert len(curr_inds) == self.learnware_num, f"The number of learnwares must be {self.learnware_num}!"

if delete:
for learnware_id in curr_inds:
hetero_market.delete_learnware(learnware_id)
self.learnware_num -= 1
assert (
len(hetero_market) == self.learnware_num
), f"The number of learnwares must be {self.learnware_num}!"

curr_inds = hetero_market.get_learnware_ids()
print("Available ids After Deleting Learnwares:", curr_inds)
assert len(curr_inds) == 0, f"The market should be empty!"

return hetero_market

def test_train_market_model(self, learnware_num=5):
hetero_market = self._init_learnware_market(
organizer_kwargs={"auto_update": False, "auto_update_limit": learnware_num}
)
self.test_prepare_learnware_randomly(learnware_num)
self.learnware_num = learnware_num

print("Total Item:", len(hetero_market))
assert len(hetero_market) == 0, f"The market should be empty!"

for idx, zip_path in enumerate(self.zip_path_list):
semantic_spec = copy.deepcopy(user_semantic)
semantic_spec["Name"]["Values"] = "learnware_%d" % (idx)
semantic_spec["Description"]["Values"] = "test_learnware_number_%d" % (idx)
semantic_spec["Input"] = input_description_list[idx % 2]
semantic_spec["Output"] = output_description_list[idx % 2]
hetero_market.add_learnware(zip_path, semantic_spec)

print("Total Item:", len(hetero_market))
assert len(hetero_market) == self.learnware_num, f"The number of learnwares must be {self.learnware_num}!"
curr_inds = hetero_market.get_learnware_ids()
print("Available ids After Uploading Learnwares:", curr_inds)
assert len(curr_inds) == self.learnware_num, f"The number of learnwares must be {self.learnware_num}!"

# organizer=hetero_market.learnware_organizer
# organizer.train(hetero_market.learnware_organizer.learnware_list.values())
return hetero_market

def test_search_semantics(self, learnware_num=5):
hetero_market = self.test_upload_delete_learnware(learnware_num, delete=False)
print("Total Item:", len(hetero_market))
assert len(hetero_market) == self.learnware_num, f"The number of learnwares must be {self.learnware_num}!"

semantic_spec = copy.deepcopy(user_semantic)
semantic_spec["Name"]["Values"] = f"learnware_{learnware_num - 1}"

user_info = BaseUserInfo(semantic_spec=semantic_spec)
_, single_learnware_list, _, _ = hetero_market.search_learnware(user_info)

print("User info:", user_info.get_semantic_spec())
print(f"Search result:")
assert len(single_learnware_list) == 1, f"Exact semantic search failed!"
for learnware in single_learnware_list:
semantic_spec1 = learnware.get_specification().get_semantic_spec()
print("Choose learnware:", learnware.id, semantic_spec1)
assert semantic_spec1["Name"]["Values"] == semantic_spec["Name"]["Values"], f"Exact semantic search failed!"

semantic_spec["Name"]["Values"] = "laernwaer"
user_info = BaseUserInfo(semantic_spec=semantic_spec)
_, single_learnware_list, _, _ = hetero_market.search_learnware(user_info)

print("User info:", user_info.get_semantic_spec())
print(f"Search result:")
assert len(single_learnware_list) == self.learnware_num, f"Fuzzy semantic search failed!"
for learnware in single_learnware_list:
semantic_spec1 = learnware.get_specification().get_semantic_spec()
print("Choose learnware:", learnware.id, semantic_spec1)

def test_stat_search(self, learnware_num=5):
hetero_market = self.test_train_market_model(learnware_num)
print("Total Item:", len(hetero_market))

# hetero test
print("+++++ HETERO TEST ++++++")
user_dim = 15

test_folder = os.path.join(curr_root, "test_stat")

for idx, zip_path in enumerate(self.zip_path_list):
unzip_dir = os.path.join(test_folder, f"{idx}")

# unzip -o -q zip_path -d unzip_dir
if os.path.exists(unzip_dir):
rmtree(unzip_dir)
os.makedirs(unzip_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zip_obj:
zip_obj.extractall(path=unzip_dir)

user_spec = RKMETableSpecification()
user_spec.load(os.path.join(unzip_dir, "stat.json"))
z = user_spec.get_z()
z = z[:, :user_dim]
device = user_spec.device
z = torch.tensor(z, device=device)
user_spec.z = z

print(">> normal case test:")
semantic_spec = copy.deepcopy(user_semantic)
semantic_spec["Input"] = copy.deepcopy(input_description_list[idx % 2])
semantic_spec["Input"]["Dimension"] = user_dim
# keep only the first user_dim descriptions
semantic_spec["Input"]["Description"] = {
str(key): semantic_spec["Input"]["Description"][str(key)] for key in range(user_dim)
}

user_info = BaseUserInfo(semantic_spec=semantic_spec, stat_info={"RKMETableSpecification": user_spec})
(
sorted_score_list,
single_learnware_list,
mixture_score,
mixture_learnware_list,
) = hetero_market.search_learnware(user_info)

print(f"search result of user{idx}:")
for score, learnware in zip(sorted_score_list, single_learnware_list):
print(f"score: {score}, learnware_id: {learnware.id}")
print(
f"mixture_score: {mixture_score}, mixture_learnware_ids: {[item.id for item in mixture_learnware_list]}"
)

# empty value of key "Task" in semantic_spec, use homo search and print invalid semantic_spec
print(">> test for key 'Task' has empty 'Values':")
semantic_spec["Task"] = {"Values": {}}

user_info = BaseUserInfo(semantic_spec=semantic_spec, stat_info={"RKMETableSpecification": user_spec})
(
sorted_score_list,
single_learnware_list,
mixture_score,
mixture_learnware_list,
) = hetero_market.search_learnware(user_info)

assert len(single_learnware_list) == 0, f"Statistical search failed!"

# delete key "Task" in semantic_spec, use homo search and print WARNING INFO with "User doesn't provide correct task type"
print(">> delele key 'Task' test:")
semantic_spec.pop("Task")

user_info = BaseUserInfo(semantic_spec=semantic_spec, stat_info={"RKMETableSpecification": user_spec})
(
sorted_score_list,
single_learnware_list,
mixture_score,
mixture_learnware_list,
) = hetero_market.search_learnware(user_info)

assert len(single_learnware_list) == 0, f"Statistical search failed!"

# modify semantic info with mismatch dim, use homo search and print "User data feature dimensions mismatch with semantic specification."
print(">> mismatch dim test")
semantic_spec = copy.deepcopy(user_semantic)
semantic_spec["Input"] = copy.deepcopy(input_description_list[idx % 2])
semantic_spec["Input"]["Dimension"] = user_dim - 2
semantic_spec["Input"]["Description"] = {
str(key): semantic_spec["Input"]["Description"][str(key)] for key in range(user_dim)
}

user_info = BaseUserInfo(semantic_spec=semantic_spec, stat_info={"RKMETableSpecification": user_spec})
(
sorted_score_list,
single_learnware_list,
mixture_score,
mixture_learnware_list,
) = hetero_market.search_learnware(user_info)

assert len(single_learnware_list) == 0, f"Statistical search failed!"

rmtree(test_folder) # rm -r test_folder

# homo test
print("\n+++++ HOMO TEST ++++++")
test_folder = os.path.join(curr_root, "test_stat")

for idx, zip_path in enumerate(self.zip_path_list):
unzip_dir = os.path.join(test_folder, f"{idx}")

# unzip -o -q zip_path -d unzip_dir
if os.path.exists(unzip_dir):
rmtree(unzip_dir)
os.makedirs(unzip_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zip_obj:
zip_obj.extractall(path=unzip_dir)

user_spec = RKMETableSpecification()
user_spec.load(os.path.join(unzip_dir, "stat.json"))
user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMETableSpecification": user_spec})
(
sorted_score_list,
single_learnware_list,
mixture_score,
mixture_learnware_list,
) = hetero_market.search_learnware(user_info)

target_spec_num = 3 if idx % 2 == 0 else 2
assert len(single_learnware_list) == target_spec_num, f"Statistical search failed!"
print(f"search result of user{idx}:")
for score, learnware in zip(sorted_score_list, single_learnware_list):
print(f"score: {score}, learnware_id: {learnware.id}")
print(f"mixture_score: {mixture_score}\n")
mixture_id = " ".join([learnware.id for learnware in mixture_learnware_list])
print(f"mixture_learnware: {mixture_id}\n")

rmtree(test_folder) # rm -r test_folder

def test_model_reuse(self, learnware_num=5):
# generate toy regression problem
X, y = make_regression(n_samples=5000, n_informative=10, n_features=15, noise=0.1, random_state=0)

# generate rkme
user_spec = generate_rkme_table_spec(X=X, gamma=0.1, cuda_idx=0)

# generate specification
semantic_spec = copy.deepcopy(user_semantic)
semantic_spec["Input"] = user_description_list[0]
user_info = BaseUserInfo(semantic_spec=semantic_spec, stat_info={"RKMETableSpecification": user_spec})

# learnware market search
hetero_market = self.test_train_market_model(learnware_num)
(
sorted_score_list,
single_learnware_list,
mixture_score,
mixture_learnware_list,
) = hetero_market.search_learnware(user_info)

# print search results
for score, learnware in zip(sorted_score_list, single_learnware_list):
print(f"score: {score}, learnware_id: {learnware.id}")
print(f"mixture_score: {mixture_score}, mixture_learnware_ids: {[item.id for item in mixture_learnware_list]}")

# single model reuse
hetero_learnware = HeteroMapAlignLearnware(single_learnware_list[0], mode="regression")
hetero_learnware.align(user_spec, X[:100], y[:100])
single_predict_y = hetero_learnware.predict(X)

# multi model reuse
hetero_learnware_list = []
for learnware in mixture_learnware_list:
hetero_learnware = HeteroMapAlignLearnware(learnware, mode="regression")
hetero_learnware.align(user_spec, X[:100], y[:100])
hetero_learnware_list.append(hetero_learnware)

# Use averaging ensemble reuser to reuse the searched learnwares to make prediction
reuse_ensemble = AveragingReuser(learnware_list=hetero_learnware_list, mode="mean")
ensemble_predict_y = reuse_ensemble.predict(user_data=X)

# Use ensemble pruning reuser to reuse the searched learnwares to make prediction
reuse_ensemble = EnsemblePruningReuser(learnware_list=hetero_learnware_list, mode="regression")
reuse_ensemble.fit(X[:100], y[:100])
ensemble_pruning_predict_y = reuse_ensemble.predict(user_data=X)

print("Single model RMSE by finetune:", mean_squared_error(y, single_predict_y, squared=False))
print("Averaging Reuser RMSE:", mean_squared_error(y, ensemble_predict_y, squared=False))
print("Ensemble Pruning Reuser RMSE:", mean_squared_error(y, ensemble_pruning_predict_y, squared=False))


def suite():
_suite = unittest.TestSuite()
_suite.addTest(TestMarket("test_prepare_learnware_randomly"))
_suite.addTest(TestMarket("test_generated_learnwares"))
_suite.addTest(TestMarket("test_upload_delete_learnware"))
_suite.addTest(TestMarket("test_train_market_model"))
_suite.addTest(TestMarket("test_search_semantics"))
_suite.addTest(TestMarket("test_stat_search"))
_suite.addTest(TestMarket("test_model_reuse"))
return _suite


if __name__ == "__main__":
runner = unittest.TextTestRunner()
runner.run(suite())

+ 2
- 2
tests/test_specification/test_rkme.py View File

@@ -8,13 +8,13 @@ import tempfile
import numpy as np

from learnware.specification import RKMETableSpecification, RKMEImageSpecification, RKMETextSpecification
from learnware.specification import generate_rkme_image_spec, generate_rkme_spec, generate_rkme_text_spec
from learnware.specification import generate_rkme_image_spec, generate_rkme_table_spec, generate_rkme_text_spec


class TestRKME(unittest.TestCase):
def test_rkme(self):
X = np.random.uniform(-10000, 10000, size=(5000, 200))
rkme = generate_rkme_spec(X)
rkme = generate_rkme_table_spec(X)
rkme.generate_stat_spec_from_data(X)

with tempfile.TemporaryDirectory(prefix="learnware_") as tempdir:


+ 10
- 4
tests/test_workflow/test_workflow.py View File

@@ -12,8 +12,8 @@ from shutil import copyfile, rmtree

import learnware
from learnware.market import instantiate_learnware_market, BaseUserInfo
from learnware.specification import RKMETableSpecification, generate_rkme_spec
from learnware.reuse import JobSelectorReuser, AveragingReuser, EnsemblePruningReuser
from learnware.specification import RKMETableSpecification, generate_rkme_table_spec
from learnware.reuse import JobSelectorReuser, AveragingReuser, EnsemblePruningReuser, FeatureAugmentReuser

curr_root = os.path.dirname(os.path.abspath(__file__))

@@ -57,7 +57,7 @@ class TestWorkflow(unittest.TestCase):

joblib.dump(clf, os.path.join(dir_path, "svm.pkl"))

spec = generate_rkme_spec(X=data_X, gamma=0.1, cuda_idx=0)
spec = generate_rkme_table_spec(X=data_X, gamma=0.1, cuda_idx=0)
spec.save(os.path.join(dir_path, "svm.json"))

init_file = os.path.join(dir_path, "__init__.py")
@@ -200,7 +200,7 @@ class TestWorkflow(unittest.TestCase):
X, y = load_digits(return_X_y=True)
train_X, data_X, train_y, data_y = train_test_split(X, y, test_size=0.3, shuffle=True)

stat_spec = generate_rkme_spec(X=data_X, gamma=0.1, cuda_idx=0)
stat_spec = generate_rkme_table_spec(X=data_X, gamma=0.1, cuda_idx=0)
user_info = BaseUserInfo(semantic_spec=user_semantic, stat_info={"RKMETableSpecification": stat_spec})

_, _, _, mixture_learnware_list = easy_market.search_learnware(user_info)
@@ -219,9 +219,15 @@ class TestWorkflow(unittest.TestCase):
reuse_ensemble.fit(train_X[-200:], train_y[-200:])
ensemble_pruning_predict_y = reuse_ensemble.predict(user_data=data_X)

# Use feature augment reuser to reuse the searched learnwares to make prediction
reuse_feature_augment = FeatureAugmentReuser(learnware_list=mixture_learnware_list, mode="classification")
reuse_feature_augment.fit(train_X[-200:], train_y[-200:])
feature_augment_predict_y = reuse_feature_augment.predict(user_data=data_X)

print("Job Selector Acc:", np.sum(np.argmax(job_selector_predict_y, axis=1) == data_y) / len(data_y))
print("Averaging Reuser Acc:", np.sum(np.argmax(ensemble_predict_y, axis=1) == data_y) / len(data_y))
print("Ensemble Pruning Reuser Acc:", np.sum(ensemble_pruning_predict_y == data_y) / len(data_y))
print("Feature Augment Reuser Acc:", np.sum(feature_augment_predict_y == data_y) / len(data_y))


def suite():


Loading…
Cancel
Save