Browse Source

[DOC] add docs for hetero_organizer, searcher

tags/v0.3.2
liuht 2 years ago
parent
commit
b5d442d243
7 changed files with 773 additions and 180 deletions
  1. +3
    -3
      learnware/market/easy/organizer.py
  2. +132
    -3
      learnware/market/heterogeneous/organizer/__init__.py
  3. +252
    -85
      learnware/market/heterogeneous/organizer/hetero_map/__init__.py
  4. +182
    -49
      learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py
  5. +169
    -38
      learnware/market/heterogeneous/organizer/hetero_map/trainer.py
  6. +34
    -1
      learnware/market/heterogeneous/searcher.py
  7. +1
    -1
      learnware/reuse/job_selector.py

+ 3
- 3
learnware/market/easy/organizer.py View File

@@ -17,12 +17,12 @@ logger = get_module_logger("easy_organizer")

class EasyOrganizer(BaseOrganizer):
def reload_market(self, rebuild=False) -> bool:
"""Reload the learnware organizer when server restared.
"""Reload the learnware organizer when server restarted.

Returns
-------
bool
A flag indicating whether the market is reload successfully.
A flag indicating whether the market is reloaded successfully.
"""
self.market_store_path = os.path.join(conf.market_root_path, self.market_id)
self.learnware_pool_path = os.path.join(self.market_store_path, "learnware_pool")
@@ -234,7 +234,7 @@ class EasyOrganizer(BaseOrganizer):
----------
ids : Union[str, List[str]]
Give a id or a list of ids
str: id of targer learware
str: id of target learware
List[str]: A list of ids of target learnwares

Returns


+ 132
- 3
learnware/market/heterogeneous/organizer/__init__.py View File

@@ -14,7 +14,14 @@ logger = get_module_logger("hetero_map_table_organizer")


class HeteroMapTableOrganizer(EasyOrganizer):
def reload_market(self, rebuild=False):
def reload_market(self, rebuild=False) -> bool:
"""Reload the heterogeneous learnware organizer when server restarted.

Returns
-------
bool
A flag indicating whether the heterogeneous market is reloaded successfully.
"""
super(HeteroMapTableOrganizer, self).reload_market(rebuild=rebuild)

hetero_folder_path = os.path.join(self.market_store_path, "hetero")
@@ -44,6 +51,19 @@ class HeteroMapTableOrganizer(EasyOrganizer):
self.market_mapping = HeteroMap()

def reset(self, market_id, rebuild=False, auto_update=False, auto_update_limit=100, **training_args):
"""Reset the heterogeneous market with specified settings.

Parameters
----------
market_id : str
the heterogeneous market's id
rebuild : bool, optional
A flag indicating whether to reload market, by default False
auto_update : bool, optional
A flag indicating whether to enable automatic updating of market mapping, by default False
auto_update_limit : int, optional
The threshold for the number of learnwares required to trigger an automatic market mapping update, by default 100
"""
self.auto_update = auto_update
self.auto_update_limit = auto_update_limit
self.count_down = auto_update_limit
@@ -54,6 +74,26 @@ class HeteroMapTableOrganizer(EasyOrganizer):
def add_learnware(
self, zip_path: str, semantic_spec: dict, check_status: int, learnware_id: str = None
) -> Tuple[str, int]:
"""Add a learnware into the heterogeneous learnware market.
Initiates an update of the market mapping if `auto_update` is True and the number of learnwares supporting training reaches `auto_update_limit`.

Parameters
----------
zip_path : str
Filepath for learnware model, a zipped file.
semantic_spec : dict
semantic_spec for new learnware, in dictionary format.
check_status : int
A flag indicating whether the learnware is usable.
learnware_id : str, optional
A id in database for learnware

Returns
-------
Tuple[str, int]
- str indicating model_id
- int indicating the final learnware check_status
"""
learnware_id, learnwere_status = super(HeteroMapTableOrganizer, self).add_learnware(
zip_path, semantic_spec, check_status, learnware_id
)
@@ -83,6 +123,20 @@ class HeteroMapTableOrganizer(EasyOrganizer):
return learnware_id, learnwere_status

def delete_learnware(self, id: str) -> bool:
"""Delete learnware from heterogeneous learnware market.
If a corresponding HeteroMapTableSpecification exists, it is also removed.

Parameters
----------
id : str
Learnware to be deleted

Returns
-------
bool
True for successful operation.
False for id not found.
"""
flag = super(HeteroMapTableOrganizer, self).delete_learnware(id)
if flag:
hetero_spec_path = os.path.join(self.hetero_specs_path, f"{id}.json")
@@ -92,13 +146,40 @@ class HeteroMapTableOrganizer(EasyOrganizer):
pass
return flag

def update_learnware(self, id: str, zip_path: str = None, semantic_spec: dict = None, check_status: int = None):
def update_learnware(self, id: str, zip_path: str = None, semantic_spec: dict = None, check_status: int = None) -> bool:
"""Update learnware with zip_path, semantic_specification and check_status.
If the learnware supports heterogeneous market training, its HeteroMapTableSpecification is also updated.

Parameters
----------
id : str
Learnware id
zip_path : str, optional
Filepath for learnware model, a zipped file.
semantic_spec : dict, optional
semantic_spec for new learnware, in dictionary format.
check_status : int, optional
A flag indicating whether the learnware is usable.

Returns
-------
int
The final learnware check_status.
"""
final_status = super(HeteroMapTableOrganizer, self).update_learnware(id, zip_path, semantic_spec, check_status)
if final_status == BaseChecker.USABLE_LEARWARE and len(self._get_hetero_learnware_ids(id)):
self._update_learnware_by_ids(id)
return final_status

def reload_learnware(self, learnware_id: str):
"""Reload learnware into heterogeneous learnware market.
If a corresponding HeteroMapTableSpecification exists, it is also reloaded.

Parameters
----------
learnware_id : str
Learnware to be reloaded
"""
super(HeteroMapTableOrganizer, self).reload_learnware(learnware_id)
try:
hetero_spec_path = os.path.join(self.hetero_specs_path, f"{learnware_id}.json")
@@ -110,6 +191,15 @@ class HeteroMapTableOrganizer(EasyOrganizer):
logger.warning(f"Learnware {learnware_id} hetero spec loaded failed!")

def _update_learnware_by_ids(self, ids: Union[str, List[str]]):
"""Update learnware by ids, attempting to generate HeteroMapTableSpecification for them.

Parameters
----------
ids : Union[str, List[str]]
Give a id or a list of ids
str: id of target learware
List[str]: A list of ids of target learnwares
"""
ids = self._get_hetero_learnware_ids(ids)
for idx in ids:
try:
@@ -126,6 +216,20 @@ class HeteroMapTableOrganizer(EasyOrganizer):
logger.warning(f"Learnware {idx} generate HeteroMapTableSpecification failed! Due to {err}")

def _get_hetero_learnware_ids(self, ids: Union[str, List[str]]) -> List[str]:
"""Get learnware ids that supports heterogeneous market training and search.

Parameters
----------
ids : Union[str, List[str]]
Give a id or a list of ids
str: id of target learware
List[str]: A list of ids of target learnwares

Returns
-------
List[str]
Learnware ids
"""
if isinstance(ids, str):
ids = [ids]

@@ -141,6 +245,18 @@ class HeteroMapTableOrganizer(EasyOrganizer):
return ret

def generate_hetero_map_spec(self, user_info: BaseUserInfo) -> HeteroMapTableSpecification:
"""Generate HeteroMapTableSpecificaion based on user's input description and statistical information.

Parameters
----------
user_info : BaseUserInfo
user_info contains semantic_spec and stat_info

Returns
-------
HeteroMapTableSpecification
The generated HeteroMapTableSpecification for user
"""
user_stat_spec = user_info.stat_info["RKMETableSpecification"]
user_features = user_info.get_semantic_spec()["Input"]["Description"]
user_hetero_spec = self.market_mapping.hetero_mapping(user_stat_spec, user_features)
@@ -148,7 +264,20 @@ class HeteroMapTableOrganizer(EasyOrganizer):

@staticmethod
def train(learnware_list: List[Learnware], save_dir: str, **kwargs) -> HeteroMap:
# Convert learnware to dataframe
"""Build the market mapping model using learnwares that supports heterogeneous market training.

Parameters
----------
learnware_list : List[Learnware]
The learnware list to train the market mapping
save_dir : str
Filepath where the trained market mapping will be saved

Returns
-------
HeteroMap
The trained market mapping model
"""
learnware_df_dict = defaultdict(list)
for learnware in learnware_list:
spec = learnware.get_specification()


+ 252
- 85
learnware/market/heterogeneous/organizer/hetero_map/__init__.py View File

@@ -1,10 +1,11 @@
import os
import numpy as np
import pandas as pd
from typing import List, Optional
from typing import List, Optional, Union, Callable
import torch
import torch.nn.functional as F
from torch import Tensor, nn
from loguru import logger

from .....specification import HeteroMapTableSpecification, RKMETableSpecification
from .feature_extractor import *
@@ -28,22 +29,24 @@ class HeteroMap(nn.Module):

def __init__(
self,
feature_tokenizer=None,
hidden_dim=128,
num_layer=2,
num_attention_head=8,
hidden_dropout_prob=0,
ffn_dim=256,
projection_dim=128,
overlap_ratio=0.5,
num_partition=3,
temperature=10,
base_temperature=10,
activation="relu",
device="cuda:0",
feature_tokenizer: FeatureTokenizer = None,
hidden_dim: int = 128,
num_layer: int = 2,
num_attention_head: int = 8,
hidden_dropout_prob: float = 0,
ffn_dim: int = 256,
projection_dim: int = 128,
overlap_ratio: float = 0.5,
num_partition: int = 3,
temperature: int = 10,
base_temperature: int = 10,
activation: Union[str, Callable] = "relu",
device: Union[str, torch.device] = "cuda:0",
**kwargs,
):
"""
The initialization method for hetero map.
Parameters
----------
feature_tokenizer : FeatureTokenizer, optional
@@ -68,12 +71,12 @@ class HeteroMap(nn.Module):
Temperature parameter for contrastive learnin, by default 10
base_temperature : int, optional
Base temperature paramete, by default 10
activation : str, optional
activation : Union[str, Callable], optional
Activation function for transformer layer, by default "relu"
device : str, optional
device : Union[str, torch.device], optional
Device to run the model on, by default "cuda:0"
cache_dir : str, optional
The cache directory, by default None
kwargs:
Additional arguments to be passed to the feature tokenizer
"""
super(HeteroMap, self).__init__()

@@ -126,9 +129,8 @@ class HeteroMap(nn.Module):
self.to(device)

@staticmethod
def load(checkpoint=None):
"""Load the model state_dict and feature_tokenizer configuration
from the ``checkpoint``.
def load(checkpoint: str = None):
"""Load the model state_dict and architecture configuration from the specified checkpoint.

Parameters
----------
@@ -141,9 +143,8 @@ class HeteroMap(nn.Module):
model.load_state_dict(model_info["model_state_dict"], strict=False)
return model

def save(self, checkpoint):
"""Save the model state_dict and feature_tokenizer configuration
to the ``checkpoint``.
def save(self, checkpoint: str):
"""Save the model state_dict and architecture configuration to the specified checkpoint.

Parameters
----------
@@ -154,8 +155,19 @@ class HeteroMap(nn.Module):
model_info = {"model_state_dict": self.state_dict(), "model_args": self.model_args}
torch.save(model_info, checkpoint)

def forward(self, x, y=None):
# do positive sampling
def forward(self, x: dict):
"""Processes the input data 'x', performs positive sampling, and computes contrastive loss.

Parameters
----------
x : dict
Pre-tokenized input tabular data in the form of a dictionary

Returns
-------
torch.Tensor
The self-supervised VPCL loss
"""
feat_x_list = []
if isinstance(x, dict):
# pretokenized inputs
@@ -175,6 +187,20 @@ class HeteroMap(nn.Module):
return loss

def hetero_mapping(self, rkme_spec: RKMETableSpecification, features: dict) -> HeteroMapTableSpecification:
"""Generate HeteroMapTableSpecification from given tabular data's statistical specification and descriptions of features.

Parameters
----------
rkme_spec : RKMETableSpecification
The RKME specification from the tabular data
features : dict
A dictionary mapping each feature's numerical identifier to its semantic description.

Returns
-------
HeteroMapTableSpecification
The resulting HeteroMapTableSpecification
"""
hetero_spec = HeteroMapTableSpecification()
data = rkme_spec.get_z()
cols = [features.get(str(i), "") for i in range(data.shape[1])]
@@ -183,7 +209,22 @@ class HeteroMap(nn.Module):
hetero_spec.generate_stat_spec_from_system(hetero_embedding, rkme_spec)
return hetero_spec

def _build_positive_pairs(self, x, n):
def _build_positive_pairs(self, x: pd.DataFrame, n: int):
"""
Builds positive pairs by splitting the input DataFrame into 'n' parts with some overlap.

Parameters
----------
x : pd.DataFrame
The input DataFrame to be split.
n : int
The number of partitions to divide the DataFrame into.

Returns
-------
List[pd.DataFrame]
A list of DataFrames, each representing a partition of the input DataFrame with some overlap.
"""
x_cols = x.columns.tolist()
sub_col_list = np.array_split(np.array(x_cols), n)
len_cols = len(sub_col_list[0])
@@ -198,24 +239,21 @@ class HeteroMap(nn.Module):
sub_x_list.append(sub_x)
return sub_x_list

def _extract_features(self, x, cols=None):
"""Make forward pass given the input feature ``x``.
def _extract_features(self, x: Union[dict, pd.DataFrame], cols=None):
"""Performs a forward pass with the given input feature `x`, and extracts features.

Parameters
----------
x: pd.DataFrame or dict
pd.DataFrame: a batch of raw tabular samples; dict: the output of feature_tokenizer
x: Union[dict, pd.DataFrame]
pd.DataFrame: A batch of raw tabular samples
dict: The output of feature_tokenizer

Returns
-------
output_features: numpy.ndarray
the [CLS] embedding at the end of transformer encoder.
The [CLS] embedding at the end of transformer encoder
"""
if isinstance(x, dict):
# input is the pre-tokenized encoded inputs
inputs = x
elif isinstance(x, pd.DataFrame):
# input is dataframe
if isinstance(x, pd.DataFrame):
inputs = self.feature_tokenizer(x)
elif isinstance(x, torch.Tensor):
inputs = self.feature_tokenizer.forward(cols, x)
@@ -231,7 +269,21 @@ class HeteroMap(nn.Module):

return output_features

def _extract_batch_features(self, x_test, eval_batch_size=256) -> np.ndarray:
def _extract_batch_features(self, x_test: pd.DataFrame, eval_batch_size=256) -> np.ndarray:
"""Performs forward passes on a batch of input features `x_test`, extracting and returning features as an array.

Parameters
----------
x_test : pd.DataFrame
A batch of raw tabular samples
eval_batch_size : int, optional
The size of each batch for processing, by default 256

Returns
-------
np.ndarray
An array containing the extracted features from all batches
"""
self.eval()
output_feas_list = []
for i in range(0, len(x_test), eval_batch_size):
@@ -243,18 +295,19 @@ class HeteroMap(nn.Module):
all_output_features = np.concatenate(output_feas_list, 0)
return all_output_features

def _self_supervised_contrastive_loss(self, features):
"""Compute the self-supervised VPCL loss.
def _self_supervised_contrastive_loss(self, features: torch.Tensor):
"""
Compute the self-supervised VPCL loss.

Parameters
----------
features: torch.Tensor
the encoded features of multiple partitions of input tables, with shape ``(bs, n_partition, proj_dim)``.
features : torch.Tensor
The encoded features of multiple partitions of input tables, with shape (bs, n_partition, proj_dim).

Returns
-------
loss: torch.Tensor
the computed self-supervised VPCL loss.
torch.Tensor
The computed self-supervised VPCL loss.
"""
batch_size = features.shape[0]
labels = torch.arange(batch_size, dtype=torch.long, device=self.device).view(-1, 1)
@@ -286,35 +339,53 @@ class HeteroMap(nn.Module):
return loss


def _get_activation_fn(activation):
if activation == "relu":
return F.relu
elif activation == "gelu":
return F.gelu
elif activation == "selu":
return F.selu
elif activation == "leakyrelu":
return F.leaky_relu
raise RuntimeError("activation should be relu/gelu/selu/leakyrelu, not {}".format(activation))


class TransformerLayer(nn.Module):
"""A custom Transformer layer implemented as a PyTorch module.
"""
__config__ = ["batch_first", "norm_first"]

def __init__(
self,
d_model,
nhead,
dim_feedforward=2048,
dropout=0.1,
activation=F.relu,
layer_norm_eps=1e-5,
batch_first=True,
norm_first=False,
device=None,
dtype=None,
use_layer_norm=True,
d_model: int,
nhead: int,
dim_feedforward: int = 2048,
dropout: float = 0.1,
activation: Union[str, Callable] = F.relu,
layer_norm_eps: float = 1e-5,
batch_first: bool = True,
norm_first: bool = False,
device: Union[str, torch.device] = None,
dtype: torch.dtype = None,
use_layer_norm: bool = True,
):
"""
The initialization method for transformer layer.

Parameters
----------
d_model : int
The number of expected features in the input
nhead : int
The number of heads in the multiheadattention models
dim_feedforward : int, optional
The dimension of the feedforward network model, by default 2048
dropout : float, optional
The dropout value, by default 0.1
activation : Union[str, Callable], optional
The activation function to use, by default F.relu
layer_norm_eps : float, optional
The epsilon used for layer normalization, by default 1e-5
batch_first : bool, optional
Whether to use (batch, seq, feature) format for input and output tensors, by default True
norm_first : bool, optional
Whether to perform layer normalization before attention and feedforward operations, by default False
device : Union[str, torch.device], optional
The device on which the layer is to be run, by default None
dtype : torch.dtype, optional
The data type of the layer's parameters, by default None
use_layer_norm : bool, optional
Whether to use layer normalization, by default True
"""
factory_kwargs = {"device": device, "dtype": dtype}
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, batch_first=batch_first, **factory_kwargs)
@@ -338,12 +409,29 @@ class TransformerLayer(nn.Module):

# Legacy string support for activation function.
if isinstance(activation, str):
self.activation = _get_activation_fn(activation)
self.activation = self._get_activation_fn(activation)
else:
self.activation = activation

# self-attention block
def _sa_block(self, x: Tensor, attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor]) -> Tensor:
def _sa_block(self, x: torch.Tensor, attn_mask: torch.Tensor, key_padding_mask: torch.Tensor) -> torch.Tensor:
"""
Applies a self-attention block to the input tensor.

Parameters
----------
x : torch.Tensor
The input tensor for the self-attention block.
attn_mask : torch.Tensor
The attention mask for the self-attention operation.
key_padding_mask : torch.Tensor
The key padding mask for the self-attention operation.

Returns
-------
torch.Tensor
The output tensor after applying the self-attention block.
"""
key_padding_mask = ~key_padding_mask.bool()
x = self.self_attn(
x,
@@ -355,7 +443,20 @@ class TransformerLayer(nn.Module):
return self.dropout1(x)

# feed forward block
def _ff_block(self, x: Tensor) -> Tensor:
def _ff_block(self, x: torch.Tensor) -> torch.Tensor:
"""
Applies a feed-forward block to the input tensor.

Parameters
----------
x : torch.Tensor
The input tensor for the feed-forward block.

Returns
-------
torch.Tensor
The output tensor after applying the feed-forward block.
"""
g = self.gate_act(self.gate_linear(x))
h = self.linear1(x)
h = h * g # add gate
@@ -367,19 +468,56 @@ class TransformerLayer(nn.Module):
state["activation"] = F.relu
super().__setstate__(state)

def forward(self, src, src_mask=None, src_key_padding_mask=None, is_causal=None, **kwargs) -> Tensor:
@staticmethod
def _get_activation_fn(activation: str) -> Callable:
"""
Retrieves the activation function based on the provided activation name.

Parameters
----------
activation : str
Name of the activation function. Supported values are "relu", "gelu", "selu", and "leakyrelu".

Returns
-------
Callable
The corresponding activation function from torch.nn.functional.
"""
if activation == "relu":
return F.relu
elif activation == "gelu":
return F.gelu
elif activation == "selu":
return F.selu
elif activation == "leakyrelu":
return F.leaky_relu
raise RuntimeError("activation should be relu/gelu/selu/leakyrelu, not {}".format(activation))

def forward(self,
src: torch.Tensor,
src_mask: torch.Tensor = None,
src_key_padding_mask: torch.Tensor = None,
is_causal: torch.Tensor = None,
**kwargs
) -> torch.Tensor:
"""Pass the input through the encoder layer.

Parameters
----------
src : Any
src : torch.Tensor
The sequence to the encoder layer.
src_mask : Any, optional
src_mask : torch.Tensor, optional
The mask for the src sequence, by default None
src_key_padding_mask : Any, optional
src_key_padding_mask : torch.Tensor, optional
The mask for the src keys per batch, by default None
is_causal : torch.Tensor, optional
A flag indicating whether the layer should be causal, by default None
Returns
-------
torch.Tensor
The output tensor after passing through the encoder layer.
"""

# see Fig. 1 of https://arxiv.org/pdf/2002.04745v1.pdf
x = src
if self.use_layer_norm:
@@ -397,15 +535,35 @@ class TransformerLayer(nn.Module):


class TransformerMultiLayer(nn.Module):
"""A custom multi-layer Transformer module.
"""
def __init__(
self,
hidden_dim=128,
num_layer=2,
num_attention_head=2,
hidden_dropout_prob=0,
ffn_dim=256,
activation="relu",
hidden_dim: int = 128,
num_layer: int = 2,
num_attention_head: int = 2,
hidden_dropout_prob: float = 0,
ffn_dim: int = 256,
activation: Union[str, Callable] = "relu",
):
"""
The initialization method for align transformer multilayer.
Parameters
----------
hidden_dim : int, optional
Dimension of the hidden layer in the Transformer, by default 128.
num_layer : int, optional
Number of Transformer layers, by default 2.
num_attention_head : int, optional
Number of attention heads in each Transformer layer, by default 2.
hidden_dropout_prob : float, optional
Dropout probability for the hidden layers, by default 0.
ffn_dim : int, optional
Dimension of the feedforward network model, by default 256.
activation : Union[str, Callable], optional
The activation function to be used, by default "relu".
"""
super().__init__()
self.transformer_encoder = nn.ModuleList(
[
@@ -437,14 +595,23 @@ class TransformerMultiLayer(nn.Module):
stacked_transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layer - 1)
self.transformer_encoder.append(stacked_transformer)

def forward(self, embedding, attention_mask=None, **kwargs) -> Tensor:
def forward(self, embedding: torch.Tensor, attention_mask: torch.Tensor = None, **kwargs) -> torch.Tensor:
"""
Passes the input embedding through the Transformer encoder layers.

Parameters
----------
embedding : Any
bs, num_token, hidden_dim
embedding : torch.Tensor
The input embedding tensor with shape (batch size, number of tokens, hidden dimension).
attention_mask : torch.Tensor, optional
The attention mask for the input tensor, by default None.

Returns
-------
Tensor
The output tensor after processing through Transformer encoder layers.
"""
outputs = embedding
for i, mod in enumerate(self.transformer_encoder):
outputs = mod(outputs, src_key_padding_mask=attention_mask)
return outputs
return outputs

+ 182
- 49
learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py View File

@@ -1,6 +1,6 @@
import os
import math
from typing import Dict
from typing import Dict, Callable

import numpy as np
import torch
@@ -12,23 +12,53 @@ from .....config import C as conf


class WordEmbedding(nn.Module):
"""Encode tokens drawn from column names"""
"""Encodes tokens drawn from column names into word embeddings.
"""

def __init__(
self,
vocab_size,
hidden_dim,
padding_idx=0,
hidden_dropout_prob=0,
layer_norm_eps=1e-5,
vocab_size: int,
hidden_dim: int,
padding_idx: int = 0,
hidden_dropout_prob: float = 0,
layer_norm_eps: float = 1e-5,
):
"""
The initialization method for word embedding.

Parameters
----------
vocab_size : int
The size of the vocabulary.
hidden_dim : int
The dimension of the hidden layer.
padding_idx : int, optional
The index of the padding token, by default 0.
hidden_dropout_prob : float, optional
The dropout probability for the hidden layer, by default 0.
layer_norm_eps : float, optional
The epsilon value for layer normalization, by default 1e-5.
"""
super().__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_dim, padding_idx)
nn_init.kaiming_normal_(self.word_embeddings.weight)
self.norm = nn.LayerNorm(hidden_dim, eps=layer_norm_eps)
self.dropout = nn.Dropout(hidden_dropout_prob)

def forward(self, input_ids) -> Tensor:
def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
"""
Performs the forward pass of the WordEmbedding module.

Parameters
----------
input_ids : torch.Tensor
The input token IDs.

Returns
-------
torch.Tensor
The word embeddings corresponding to the input token IDs.
"""
embeddings = self.word_embeddings(input_ids)
embeddings = self.norm(embeddings)
embeddings = self.dropout(embeddings)
@@ -38,20 +68,35 @@ class WordEmbedding(nn.Module):
class NumEmbedding(nn.Module):
"""Encode tokens drawn from column names and the corresponding numerical features."""

def __init__(self, hidden_dim):
def __init__(self, hidden_dim: int):
"""
The initialization method for num embedding.

Parameters
----------
hidden_dim : int
The dimension of the hidden layer.
"""
super().__init__()
self.norm = nn.LayerNorm(hidden_dim)
self.num_bias = nn.Parameter(Tensor(1, 1, hidden_dim)) # add bias
nn_init.uniform_(self.num_bias, a=-1 / math.sqrt(hidden_dim), b=1 / math.sqrt(hidden_dim))

def forward(self, col_emb, x_ts) -> Tensor:
def forward(self, col_emb: torch.Tensor, x_ts: torch.Tensor) -> torch.Tensor:
"""
Performs the forward pass of the NumEmbedding module.

Parameters
----------
col_emb : Any
numerical column embedding, (# numerical columns, emb_dim)
x_ts : Any
numerical features, (bs, emb_dim)
col_emb : torch.Tensor
The numerical column embeddings with shape (# numerical columns, emb_dim).
x_ts : torch.Tensor
The numerical features with shape (bs, emb_dim).

Returns
-------
torch.Tensor
The combined feature embeddings.
"""
col_emb = col_emb.unsqueeze(0).expand((x_ts.shape[0], -1, -1))
feat_emb = col_emb * x_ts.unsqueeze(-1).float() + self.num_bias
@@ -63,14 +108,16 @@ class FeatureTokenizer:

def __init__(
self,
disable_tokenizer_parallel=True,
disable_tokenizer_parallel: bool = True,
**kwargs,
):
"""
The initialization method for feature tokenizer.
.
Parameters
----------
disable_tokenizer_parallel : bool, optional
true if use extractor for collator function in torch.DataLoader
Whether to disable tokenizer parallelism, by default True.
"""
cache_dir = conf["cache_path"]
os.makedirs(cache_dir, exist_ok=True)
@@ -81,22 +128,23 @@ class FeatureTokenizer:
self.vocab_size = self.tokenizer.vocab_size
self.pad_token_id = self.tokenizer.pad_token_id

def __call__(self, x, shuffle=False, keep_input_grad=False) -> Dict:
def __call__(self, x: pd.DataFrame, shuffle: bool = False, keep_input_grad: bool = False) -> Dict:
"""
Tokenizes the input DataFrame.

Parameters
----------
x: pd.DataFrame
with column names and features.

shuffle: bool
if shuffle column order during the training.
x : pd.DataFrame
The input DataFrame with column names and features.
shuffle : bool, optional
Whether to shuffle column order during training, by default False.
keep_input_grad : bool, optional
Whether to keep input gradients, by default False.

Returns
-------
encoded_inputs: a dict with {
'x_num': tensor contains numerical features,
'num_col_input_ids': tensor contains numerical column tokenized ids,
}
Dict
A dictionary with tokenized inputs.
"""
encoded_inputs = {"x_num": None, "num_col_input_ids": None}
num_cols = x.columns.tolist() if not shuffle else np.random.shuffle(x.columns.tolist())
@@ -120,21 +168,24 @@ class FeatureTokenizer:

return encoded_inputs

def forward(self, cols, x) -> Dict:
def forward(self, cols: List[str], x: torch.Tensor) -> Dict:
"""
Processes the input data and generates encoded inputs suitable for model encoding.

Parameters
----------
cols: List[str]
Contain all column names in order.
A list containing all column names in order.

x: torch.Tensor
The tensor containing numerical features.

Returns
-------
encoded_inputs: a dict with {
'x_num': tensor contains numerical features,
'num_col_input_ids': tensor contains numerical column tokenized ids,
}
Dict
- 'x_num': Tensor containing numerical features.
- 'num_col_input_ids': Tensor containing tokenized IDs of numerical columns.
- 'num_att_mask': Attention mask for the numerical column tokens.
"""
encoded_inputs = {
"x_num": None,
@@ -156,18 +207,32 @@ class FeatureTokenizer:


class FeatureProcessor(nn.Module):
"""
Process inputs from feature extractor to map them to embeddings.
"""
"""Process inputs from feature extractor to map them to embeddings."""

def __init__(
self,
vocab_size=None,
hidden_dim=128,
hidden_dropout_prob=0,
pad_token_id=0,
device="cuda:0",
vocab_size: int = None,
hidden_dim: int = 128,
hidden_dropout_prob: float = 0,
pad_token_id: int = 0,
device: Union[str, torch.device] = "cuda:0",
):
"""
The initialization method for feature processor.

Parameters
----------
vocab_size : int, optional
The size of the vocabulary.
hidden_dim : int, optional
The dimension of the hidden layer, by default 128.
hidden_dropout_prob : float, optional
The dropout probability for the hidden layer, by default 0.
pad_token_id : int, optional
The index of the padding token, by default 0.
device : Union[str, torch.device], optional
The device to run the module on, by default "cuda:0".
"""
super().__init__()
self.word_embedding = WordEmbedding(
vocab_size=vocab_size,
@@ -179,7 +244,22 @@ class FeatureProcessor(nn.Module):
self.align_layer = nn.Linear(hidden_dim, hidden_dim, bias=False)
self.device = device

def _avg_embedding_by_mask(self, embs, att_mask=None):
def _avg_embedding_by_mask(self, embs: torch.Tensor, att_mask: torch.Tensor = None) -> torch.Tensor:
"""
Averages the embeddings based on the attention mask.

Parameters
----------
embs : torch.Tensor
The embeddings tensor.
att_mask : torch.Tensor, optional
The attention mask to apply on the embeddings. If None, the mean of the embeddings is returned, by default None.

Returns
-------
torch.Tensor
The resulting averaged embeddings.
"""
if att_mask is None:
return embs.mean(1)
else:
@@ -189,11 +269,28 @@ class FeatureProcessor(nn.Module):

def forward(
self,
x_num=None,
num_col_input_ids=None,
num_att_mask=None,
x_num: torch.Tensor = None,
num_col_input_ids: torch.Tensor = None,
num_att_mask: torch.Tensor = None,
**kwargs,
) -> Tensor:
) -> torch.Tensor:
"""
Performs the forward pass of the FeatureProcessor module.

Parameters
----------
x_num : torch.Tensor, optional
The numerical features.
num_col_input_ids : torch.Tensor, optional
The input IDs for numerical columns.
num_att_mask : torch.Tensor, optional
The attention mask.

Returns
-------
torch.Tensor
The processed feature embeddings.
"""
x_num = x_num.to(self.device)

num_col_emb = self.word_embedding(num_col_input_ids.to(self.device))
@@ -209,22 +306,58 @@ class FeatureProcessor(nn.Module):


class CLSToken(nn.Module):
"""add a learnable cls token embedding at the end of each sequence."""
"""Add a learnable cls token embedding at the end of each sequence."""

def __init__(self, hidden_dim):
def __init__(self, hidden_dim: int):
"""
The initialization method for CLSToken.

Parameters
----------
hidden_dim : int
The dimension of the hidden layer.
"""
super().__init__()
self.weight = nn.Parameter(Tensor(hidden_dim))
nn_init.uniform_(self.weight, a=-1 / math.sqrt(hidden_dim), b=1 / math.sqrt(hidden_dim))
self.hidden_dim = hidden_dim

def expand(self, *leading_dimensions):
def expand(self, *leading_dimensions) -> torch.Tensor:
"""
Expands the CLS token embedding to match the leading dimensions of the input.

Parameters
----------
leading_dimensions : tuple
A variable number of integer arguments representing the leading dimensions to which the CLS token embedding will be expanded.

Returns
-------
torch.Tensor
Expanded CLS token embedding.
"""
new_dims = (1,) * (len(leading_dimensions) - 1)
# cls token (128,) -> view(*new_dims, -1) -> (1, 128)
# (1, 128) -> expand(*leading_dimensions, -1) -> (64, 1, 128)
# here expand means "shared", the cls token embedding remains the same for each sample
return self.weight.view(*new_dims, -1).expand(*leading_dimensions, -1)

def forward(self, embedding, attention_mask=None, **kwargs) -> Tensor:
def forward(self, embedding: torch.Tensor, attention_mask: torch.Tensor = None, **kwargs) -> torch.Tensor:
"""
Performs a forward pass by adding a learnable CLS token to the embedding.

Parameters
----------
embedding : torch.Tensor
The input embedding tensor.
attention_mask : torch.Tensor, optional
The attention mask for the input tensor, by default None.

Returns
-------
torch.Tensor
Output embedding with the CLS token added.
"""
# embedding shape: (64, 11, 128), where 11 is the largest sequence length after tokenizing
# after concat, learnable cls token [self.weight] is added to each semantic embedding
# embedding shape: (64, d+1, 128)


+ 169
- 38
learnware/market/heterogeneous/organizer/hetero_map/trainer.py View File

@@ -1,7 +1,8 @@
import json
import json_get_parameter_names
import math
import os
import time
from typings import Any, Callable, List, Dict

import numpy as np
import pandas as pd
@@ -19,17 +20,44 @@ logger = get_module_logger("hetero_mapping_trainer")
class Trainer:
def __init__(
self,
model,
train_set_list,
collate_fn=None,
output_dir="./ckpt",
num_epoch=10,
batch_size=64,
lr=1e-4,
weight_decay=0,
eval_batch_size=256,
model: Any,
train_set_list: List[Any],
collate_fn: Callable = None,
output_dir: str = "./ckpt",
num_epoch: int = 10,
batch_size: int = 64,
lr: float = 1e-4,
weight_decay: float = 0,
eval_batch_size: int = 256,
**kwargs,
):
"""
The initialization method for trainer.

Parameters
----------
model : Any
The model to be trained.
train_set_list : List[Any]
A list of training datasets.
collate_fn : Callable, optional
The collate function to be used, by default None.
output_dir : str, optional
The directory where the trained model checkpoints will be saved, by default "./ckpt".
num_epoch : int, optional
Number of epochs for training, by default 10.
batch_size : int, optional
Batch size for training, by default 64.
lr : float, optional
Learning rate, by default 1e-4.
weight_decay : float, optional
Weight decay, by default 0.
eval_batch_size : int, optional
Batch size for evaluation, by default 256.
kwargs : dict
Additional keyword arguments.
"""

self.model = model
if isinstance(train_set_list, tuple):
train_set_list = [train_set_list]
@@ -52,7 +80,21 @@ class Trainer:
self.args["steps_per_epoch"] = int(self.args["num_training_steps"] / (num_epoch * len(self.train_set_list)))
self.optimizer = None

def train(self, verbose: bool = True):
def train(self, verbose: bool = True) -> float:
"""
Trains the model using the provided training data.

Parameters
----------
verbose : bool, optional
Whether to display verbose output, by default True.

Returns
-------
float
The final training loss.
"""

self._create_optimizer()
start_time = time.time()
final_train_loss = 0
@@ -82,11 +124,22 @@ class Trainer:
logger.info("training complete, cost {:.1f} secs.".format(time.time() - start_time))
return final_train_loss

def save_model(self, output_dir=None):
def save_model(self, output_dir: str = None):
"""
Saves the trained model to the specified directory.

Parameters
----------
output_dir : str, optional
The directory where the model will be saved, by default None.
"""

logger.info(f"saving model checkpoint to {output_dir}")
self.model.save(output_dir)

def _create_optimizer(self):
"""Creates an optimizer for training the model."""
if self.optimizer is None:
decay_parameters = self._get_parameter_names(self.model, [nn.LayerNorm])
decay_parameters = [name for name in decay_parameters if "bias" not in name]
@@ -104,7 +157,25 @@ class Trainer:

self.optimizer = torch.optim.Adam(optimizer_grouped_parameters, lr=self.args["lr"])

def _get_num_train_steps(self, train_set_list, num_epoch, batch_size):
def _get_num_train_steps(self, train_set_list: List[Any], num_epoch: int, batch_size: int) -> int:
"""
Calculates the total number of training steps.

Parameters
----------
train_set_list : List[Any]
A list of training datasets.
num_epoch : int
Number of training epochs.
batch_size : int
Batch size for training.

Returns
-------
int
The total number of training steps.
"""

total_step = 0
for trainset in train_set_list:
x_train = trainset
@@ -112,7 +183,29 @@ class Trainer:
total_step *= num_epoch
return total_step

def _build_dataloader(self, trainset, batch_size, collator, shuffle=True):
def _build_dataloader(
self, trainset: Any, batch_size: int, collator: Callable, shuffle: bool = True
) -> torch.utils.data.DataLoader:
"""
Builds a DataLoader for training.

Parameters
----------
trainset : Any
The training dataset.
batch_size : int
Batch size for the DataLoader.
collator : Callable
Collate function for the DataLoader.
shuffle : bool, optional
Whether to shuffle the data, by default True.

Returns
-------
torch.utils.data.DataLoader
The DataLoader for the training data.
"""

trainloader = DataLoader(
TrainDataset(trainset),
collate_fn=collator,
@@ -123,8 +216,22 @@ class Trainer:
)
return trainloader

def _get_parameter_names(self, model, forbidden_layer_types):
"""Returns the names of the model parameters that are not inside a forbidden layer."""
def _get_parameter_names(self, model: Any, forbidden_layer_types: List[torch.dtype]) -> List[str]:
"""
Retrieves the names of parameters not inside forbidden layers.

Parameters
----------
model : Any
The model from which to retrieve parameters.
forbidden_layer_types : List[torch.dtype]
A list of layer types to exclude.

Returns
-------
List[str]
A list of parameter names not inside the forbidden layers.
"""
result = []
for name, child in model.named_children():
result += [
@@ -150,15 +257,27 @@ class TrainDataset(Dataset):


class TransTabCollatorForCL:
"""support positive pair sampling for contrastive learning."""
"""Collator class supporting positive pair sampling for contrastive learning."""

def __init__(
self,
feature_tokenizer=None,
overlap_ratio=0.5,
num_partition=3,
feature_tokenizer: Callable = None,
overlap_ratio: float = 0.5,
num_partition: int = 3,
**kwargs,
):
"""
The initialization method for TransTabCollatorForCL.

Parameters
----------
feature_tokenizer : Callable, optional
The tokenizer used to process data, by default None.
overlap_ratio : float, optional
The ratio of overlap between partitions, must be between 0 and 1 (exclusive), by default 0.5.
num_partition : int, optional
The number of partitions to create from the data for contrastive learning, by default 3.
"""
self.feature_tokenizer = feature_tokenizer or FeatureTokenizer(disable_tokenizer_parallel=True)
assert num_partition > 0, f"number of contrastive subsets must be greater than 0, got {num_partition}"
assert isinstance(num_partition, int), f"number of constrative subsets must be int, got {type(num_partition)}"
@@ -166,10 +285,20 @@ class TransTabCollatorForCL:
self.overlap_ratio = overlap_ratio
self.num_partition = num_partition

def __call__(self, data):
"""Take a list of subsets (views) from the original tests."""
# 1. build positive pairs
# 2. encode each pair using feature extractor
def __call__(self, data: List[Any]) -> Dict[str, Any]:
"""
Processes the data into subsets for contrastive learning.

Parameters
----------
data : List[Any]
The input data to be processed.

Returns
-------
Dict[str, Any]
A dictionary containing the processed data subsets.
"""
df_x = pd.concat([row for row in data])
if self.num_partition > 1:
sub_x_list = self._build_positive_pairs(df_x, self.num_partition)
@@ -182,20 +311,21 @@ class TransTabCollatorForCL:
res = {"input_sub_x": input_x_list}
return res

def _build_positive_pairs(self, x, n):
"""Builds positive pairs of sub-dataframes from the input dataframe x.
def _build_positive_pairs(self, x: pd.DataFrame, n: int) -> List[pd.DataFrame]:
"""
Builds positive pairs of sub-dataframes from the input dataframe.

Parameters
----------
x : pandas.DataFrame
Input dataframe.
x : pd.DataFrame
The input dataframe.
n : int
Number of sub-dataframes to split x into.
The number of sub-dataframes to create.

Returns
-------
List
List of sub-dataframes, each containing a positive pair of columns from x.
List[pd.DataFrame]
A list of sub-dataframes, each containing a positive pair of columns.
"""
x_cols = x.columns.tolist()
sub_col_list = np.array_split(np.array(x_cols), n)
@@ -211,18 +341,19 @@ class TransTabCollatorForCL:
sub_x_list.append(sub_x)
return sub_x_list

def _build_positive_pairs_single_view(self, x):
"""Builds positive pairs for a single view of data by corrupting half of the columns and shuffling the corrupted columns.
def _build_positive_pairs_single_view(self, x: pd.DataFrame) -> List[pd.DataFrame]:
"""
Builds positive pairs for a single view of data by corrupting half of the columns and shuffling the corrupted columns..

Parameters
----------
x : pandas.DataFrame
The input data.
x : pd.DataFrame
The input dataframe.

Returns
-------
List
A list of two pandas DataFrames, where each DataFrame contains the original data with half of the columns corrupted and shuffled.
List[pd.DataFrame]
A list containing two dataframes, one with original data and one with shuffled columns.
"""
x_cols = x.columns.tolist()
sub_x_list = [x]
@@ -231,4 +362,4 @@ class TransTabCollatorForCL:
x_corrupt = x.copy()[corrupt_cols]
np.random.shuffle(x_corrupt.values)
sub_x_list.append(pd.concat([x.copy().drop(corrupt_cols, axis=1), x_corrupt], axis=1))
return sub_x_list
return sub_x_list

+ 34
- 1
learnware/market/heterogeneous/searcher.py View File

@@ -11,7 +11,19 @@ logger = get_module_logger("hetero_searcher")

class HeteroSearcher(EasySearcher):
@staticmethod
def check_user_info(user_info: BaseUserInfo):
def check_user_info(user_info: BaseUserInfo) -> bool:
"""Check if user_info satifies all the criteria required for enabling heterogeneous learnware search

Parameters
----------
user_info : BaseUserInfo
user_info contains semantic_spec and stat_info

Returns
-------
bool
A flag indicating whether heterogeneous search is enabled for user_info
"""
try:
user_stat_spec = user_info.get_stat_info("RKMETableSpecification")
user_input_shape = user_stat_spec.get_z().shape[1]
@@ -42,6 +54,27 @@ class HeteroSearcher(EasySearcher):
def __call__(
self, user_info: BaseUserInfo, check_status: int = None, max_search_num: int = 5, search_method: str = "greedy"
) -> Tuple[List[float], List[Learnware], float, List[Learnware]]:
"""Search learnwares based on user_info from learnwares with check_status.
Employs heterogeneous learnware search if specific requirements are met, otherwise resorts to homogeneous search methods.

Parameters
----------
user_info : BaseUserInfo
user_info contains semantic_spec and stat_info
max_search_num : int
The maximum number of the returned learnwares
check_status : int, optional
- None: search from all learnwares
- Others: search from learnwares with check_status

Returns
-------
Tuple[List[float], List[Learnware], float, List[Learnware]]
the first is the sorted list of rkme dist
the second is the sorted list of Learnware (single) by the rkme dist
the third is the score of Learnware (mixture)
the fourth is the list of Learnware (mixture), the size is search_num
"""
learnware_list = self.learnware_organizer.get_learnwares(check_status=check_status)
learnware_list = self.semantic_searcher(learnware_list, user_info)



+ 1
- 1
learnware/reuse/job_selector.py View File

@@ -166,7 +166,7 @@ class JobSelectorReuser(BaseReuser):
def _calculate_rkme_spec_mixture_weight(
self, user_data: np.ndarray, task_rkme_list: List[RKMETableSpecification], task_rkme_matrix: np.ndarray
) -> List[float]:
"""_summary_
"""Calculate mixture weight for the learnware_list based on user's data

Parameters
----------


Loading…
Cancel
Save