diff --git a/learnware/market/easy/organizer.py b/learnware/market/easy/organizer.py index 412664b..3165fd6 100644 --- a/learnware/market/easy/organizer.py +++ b/learnware/market/easy/organizer.py @@ -17,12 +17,12 @@ logger = get_module_logger("easy_organizer") class EasyOrganizer(BaseOrganizer): def reload_market(self, rebuild=False) -> bool: - """Reload the learnware organizer when server restared. + """Reload the learnware organizer when server restarted. Returns ------- bool - A flag indicating whether the market is reload successfully. + A flag indicating whether the market is reloaded successfully. """ self.market_store_path = os.path.join(conf.market_root_path, self.market_id) self.learnware_pool_path = os.path.join(self.market_store_path, "learnware_pool") @@ -234,7 +234,7 @@ class EasyOrganizer(BaseOrganizer): ---------- ids : Union[str, List[str]] Give a id or a list of ids - str: id of targer learware + str: id of target learware List[str]: A list of ids of target learnwares Returns diff --git a/learnware/market/heterogeneous/organizer/__init__.py b/learnware/market/heterogeneous/organizer/__init__.py index 758570e..16d66fb 100644 --- a/learnware/market/heterogeneous/organizer/__init__.py +++ b/learnware/market/heterogeneous/organizer/__init__.py @@ -14,7 +14,14 @@ logger = get_module_logger("hetero_map_table_organizer") class HeteroMapTableOrganizer(EasyOrganizer): - def reload_market(self, rebuild=False): + def reload_market(self, rebuild=False) -> bool: + """Reload the heterogeneous learnware organizer when server restarted. + + Returns + ------- + bool + A flag indicating whether the heterogeneous market is reloaded successfully. + """ super(HeteroMapTableOrganizer, self).reload_market(rebuild=rebuild) hetero_folder_path = os.path.join(self.market_store_path, "hetero") @@ -44,6 +51,19 @@ class HeteroMapTableOrganizer(EasyOrganizer): self.market_mapping = HeteroMap() def reset(self, market_id, rebuild=False, auto_update=False, auto_update_limit=100, **training_args): + """Reset the heterogeneous market with specified settings. + + Parameters + ---------- + market_id : str + the heterogeneous market's id + rebuild : bool, optional + A flag indicating whether to reload market, by default False + auto_update : bool, optional + A flag indicating whether to enable automatic updating of market mapping, by default False + auto_update_limit : int, optional + The threshold for the number of learnwares required to trigger an automatic market mapping update, by default 100 + """ self.auto_update = auto_update self.auto_update_limit = auto_update_limit self.count_down = auto_update_limit @@ -54,6 +74,26 @@ class HeteroMapTableOrganizer(EasyOrganizer): def add_learnware( self, zip_path: str, semantic_spec: dict, check_status: int, learnware_id: str = None ) -> Tuple[str, int]: + """Add a learnware into the heterogeneous learnware market. + Initiates an update of the market mapping if `auto_update` is True and the number of learnwares supporting training reaches `auto_update_limit`. + + Parameters + ---------- + zip_path : str + Filepath for learnware model, a zipped file. + semantic_spec : dict + semantic_spec for new learnware, in dictionary format. + check_status : int + A flag indicating whether the learnware is usable. + learnware_id : str, optional + A id in database for learnware + + Returns + ------- + Tuple[str, int] + - str indicating model_id + - int indicating the final learnware check_status + """ learnware_id, learnwere_status = super(HeteroMapTableOrganizer, self).add_learnware( zip_path, semantic_spec, check_status, learnware_id ) @@ -83,6 +123,20 @@ class HeteroMapTableOrganizer(EasyOrganizer): return learnware_id, learnwere_status def delete_learnware(self, id: str) -> bool: + """Delete learnware from heterogeneous learnware market. + If a corresponding HeteroMapTableSpecification exists, it is also removed. + + Parameters + ---------- + id : str + Learnware to be deleted + + Returns + ------- + bool + True for successful operation. + False for id not found. + """ flag = super(HeteroMapTableOrganizer, self).delete_learnware(id) if flag: hetero_spec_path = os.path.join(self.hetero_specs_path, f"{id}.json") @@ -92,13 +146,40 @@ class HeteroMapTableOrganizer(EasyOrganizer): pass return flag - def update_learnware(self, id: str, zip_path: str = None, semantic_spec: dict = None, check_status: int = None): + def update_learnware(self, id: str, zip_path: str = None, semantic_spec: dict = None, check_status: int = None) -> bool: + """Update learnware with zip_path, semantic_specification and check_status. + If the learnware supports heterogeneous market training, its HeteroMapTableSpecification is also updated. + + Parameters + ---------- + id : str + Learnware id + zip_path : str, optional + Filepath for learnware model, a zipped file. + semantic_spec : dict, optional + semantic_spec for new learnware, in dictionary format. + check_status : int, optional + A flag indicating whether the learnware is usable. + + Returns + ------- + int + The final learnware check_status. + """ final_status = super(HeteroMapTableOrganizer, self).update_learnware(id, zip_path, semantic_spec, check_status) if final_status == BaseChecker.USABLE_LEARWARE and len(self._get_hetero_learnware_ids(id)): self._update_learnware_by_ids(id) return final_status def reload_learnware(self, learnware_id: str): + """Reload learnware into heterogeneous learnware market. + If a corresponding HeteroMapTableSpecification exists, it is also reloaded. + + Parameters + ---------- + learnware_id : str + Learnware to be reloaded + """ super(HeteroMapTableOrganizer, self).reload_learnware(learnware_id) try: hetero_spec_path = os.path.join(self.hetero_specs_path, f"{learnware_id}.json") @@ -110,6 +191,15 @@ class HeteroMapTableOrganizer(EasyOrganizer): logger.warning(f"Learnware {learnware_id} hetero spec loaded failed!") def _update_learnware_by_ids(self, ids: Union[str, List[str]]): + """Update learnware by ids, attempting to generate HeteroMapTableSpecification for them. + + Parameters + ---------- + ids : Union[str, List[str]] + Give a id or a list of ids + str: id of target learware + List[str]: A list of ids of target learnwares + """ ids = self._get_hetero_learnware_ids(ids) for idx in ids: try: @@ -126,6 +216,20 @@ class HeteroMapTableOrganizer(EasyOrganizer): logger.warning(f"Learnware {idx} generate HeteroMapTableSpecification failed! Due to {err}") def _get_hetero_learnware_ids(self, ids: Union[str, List[str]]) -> List[str]: + """Get learnware ids that supports heterogeneous market training and search. + + Parameters + ---------- + ids : Union[str, List[str]] + Give a id or a list of ids + str: id of target learware + List[str]: A list of ids of target learnwares + + Returns + ------- + List[str] + Learnware ids + """ if isinstance(ids, str): ids = [ids] @@ -141,6 +245,18 @@ class HeteroMapTableOrganizer(EasyOrganizer): return ret def generate_hetero_map_spec(self, user_info: BaseUserInfo) -> HeteroMapTableSpecification: + """Generate HeteroMapTableSpecificaion based on user's input description and statistical information. + + Parameters + ---------- + user_info : BaseUserInfo + user_info contains semantic_spec and stat_info + + Returns + ------- + HeteroMapTableSpecification + The generated HeteroMapTableSpecification for user + """ user_stat_spec = user_info.stat_info["RKMETableSpecification"] user_features = user_info.get_semantic_spec()["Input"]["Description"] user_hetero_spec = self.market_mapping.hetero_mapping(user_stat_spec, user_features) @@ -148,7 +264,20 @@ class HeteroMapTableOrganizer(EasyOrganizer): @staticmethod def train(learnware_list: List[Learnware], save_dir: str, **kwargs) -> HeteroMap: - # Convert learnware to dataframe + """Build the market mapping model using learnwares that supports heterogeneous market training. + + Parameters + ---------- + learnware_list : List[Learnware] + The learnware list to train the market mapping + save_dir : str + Filepath where the trained market mapping will be saved + + Returns + ------- + HeteroMap + The trained market mapping model + """ learnware_df_dict = defaultdict(list) for learnware in learnware_list: spec = learnware.get_specification() diff --git a/learnware/market/heterogeneous/organizer/hetero_map/__init__.py b/learnware/market/heterogeneous/organizer/hetero_map/__init__.py index 2f849a3..e16f46a 100644 --- a/learnware/market/heterogeneous/organizer/hetero_map/__init__.py +++ b/learnware/market/heterogeneous/organizer/hetero_map/__init__.py @@ -1,10 +1,11 @@ import os import numpy as np import pandas as pd -from typing import List, Optional +from typing import List, Optional, Union, Callable import torch import torch.nn.functional as F from torch import Tensor, nn +from loguru import logger from .....specification import HeteroMapTableSpecification, RKMETableSpecification from .feature_extractor import * @@ -28,22 +29,24 @@ class HeteroMap(nn.Module): def __init__( self, - feature_tokenizer=None, - hidden_dim=128, - num_layer=2, - num_attention_head=8, - hidden_dropout_prob=0, - ffn_dim=256, - projection_dim=128, - overlap_ratio=0.5, - num_partition=3, - temperature=10, - base_temperature=10, - activation="relu", - device="cuda:0", + feature_tokenizer: FeatureTokenizer = None, + hidden_dim: int = 128, + num_layer: int = 2, + num_attention_head: int = 8, + hidden_dropout_prob: float = 0, + ffn_dim: int = 256, + projection_dim: int = 128, + overlap_ratio: float = 0.5, + num_partition: int = 3, + temperature: int = 10, + base_temperature: int = 10, + activation: Union[str, Callable] = "relu", + device: Union[str, torch.device] = "cuda:0", **kwargs, ): """ + The initialization method for hetero map. + Parameters ---------- feature_tokenizer : FeatureTokenizer, optional @@ -68,12 +71,12 @@ class HeteroMap(nn.Module): Temperature parameter for contrastive learnin, by default 10 base_temperature : int, optional Base temperature paramete, by default 10 - activation : str, optional + activation : Union[str, Callable], optional Activation function for transformer layer, by default "relu" - device : str, optional + device : Union[str, torch.device], optional Device to run the model on, by default "cuda:0" - cache_dir : str, optional - The cache directory, by default None + kwargs: + Additional arguments to be passed to the feature tokenizer """ super(HeteroMap, self).__init__() @@ -126,9 +129,8 @@ class HeteroMap(nn.Module): self.to(device) @staticmethod - def load(checkpoint=None): - """Load the model state_dict and feature_tokenizer configuration - from the ``checkpoint``. + def load(checkpoint: str = None): + """Load the model state_dict and architecture configuration from the specified checkpoint. Parameters ---------- @@ -141,9 +143,8 @@ class HeteroMap(nn.Module): model.load_state_dict(model_info["model_state_dict"], strict=False) return model - def save(self, checkpoint): - """Save the model state_dict and feature_tokenizer configuration - to the ``checkpoint``. + def save(self, checkpoint: str): + """Save the model state_dict and architecture configuration to the specified checkpoint. Parameters ---------- @@ -154,8 +155,19 @@ class HeteroMap(nn.Module): model_info = {"model_state_dict": self.state_dict(), "model_args": self.model_args} torch.save(model_info, checkpoint) - def forward(self, x, y=None): - # do positive sampling + def forward(self, x: dict): + """Processes the input data 'x', performs positive sampling, and computes contrastive loss. + + Parameters + ---------- + x : dict + Pre-tokenized input tabular data in the form of a dictionary + + Returns + ------- + torch.Tensor + The self-supervised VPCL loss + """ feat_x_list = [] if isinstance(x, dict): # pretokenized inputs @@ -175,6 +187,20 @@ class HeteroMap(nn.Module): return loss def hetero_mapping(self, rkme_spec: RKMETableSpecification, features: dict) -> HeteroMapTableSpecification: + """Generate HeteroMapTableSpecification from given tabular data's statistical specification and descriptions of features. + + Parameters + ---------- + rkme_spec : RKMETableSpecification + The RKME specification from the tabular data + features : dict + A dictionary mapping each feature's numerical identifier to its semantic description. + + Returns + ------- + HeteroMapTableSpecification + The resulting HeteroMapTableSpecification + """ hetero_spec = HeteroMapTableSpecification() data = rkme_spec.get_z() cols = [features.get(str(i), "") for i in range(data.shape[1])] @@ -183,7 +209,22 @@ class HeteroMap(nn.Module): hetero_spec.generate_stat_spec_from_system(hetero_embedding, rkme_spec) return hetero_spec - def _build_positive_pairs(self, x, n): + def _build_positive_pairs(self, x: pd.DataFrame, n: int): + """ + Builds positive pairs by splitting the input DataFrame into 'n' parts with some overlap. + + Parameters + ---------- + x : pd.DataFrame + The input DataFrame to be split. + n : int + The number of partitions to divide the DataFrame into. + + Returns + ------- + List[pd.DataFrame] + A list of DataFrames, each representing a partition of the input DataFrame with some overlap. + """ x_cols = x.columns.tolist() sub_col_list = np.array_split(np.array(x_cols), n) len_cols = len(sub_col_list[0]) @@ -198,24 +239,21 @@ class HeteroMap(nn.Module): sub_x_list.append(sub_x) return sub_x_list - def _extract_features(self, x, cols=None): - """Make forward pass given the input feature ``x``. + def _extract_features(self, x: Union[dict, pd.DataFrame], cols=None): + """Performs a forward pass with the given input feature `x`, and extracts features. Parameters ---------- - x: pd.DataFrame or dict - pd.DataFrame: a batch of raw tabular samples; dict: the output of feature_tokenizer + x: Union[dict, pd.DataFrame] + pd.DataFrame: A batch of raw tabular samples + dict: The output of feature_tokenizer Returns ------- output_features: numpy.ndarray - the [CLS] embedding at the end of transformer encoder. + The [CLS] embedding at the end of transformer encoder """ - if isinstance(x, dict): - # input is the pre-tokenized encoded inputs - inputs = x - elif isinstance(x, pd.DataFrame): - # input is dataframe + if isinstance(x, pd.DataFrame): inputs = self.feature_tokenizer(x) elif isinstance(x, torch.Tensor): inputs = self.feature_tokenizer.forward(cols, x) @@ -231,7 +269,21 @@ class HeteroMap(nn.Module): return output_features - def _extract_batch_features(self, x_test, eval_batch_size=256) -> np.ndarray: + def _extract_batch_features(self, x_test: pd.DataFrame, eval_batch_size=256) -> np.ndarray: + """Performs forward passes on a batch of input features `x_test`, extracting and returning features as an array. + + Parameters + ---------- + x_test : pd.DataFrame + A batch of raw tabular samples + eval_batch_size : int, optional + The size of each batch for processing, by default 256 + + Returns + ------- + np.ndarray + An array containing the extracted features from all batches + """ self.eval() output_feas_list = [] for i in range(0, len(x_test), eval_batch_size): @@ -243,18 +295,19 @@ class HeteroMap(nn.Module): all_output_features = np.concatenate(output_feas_list, 0) return all_output_features - def _self_supervised_contrastive_loss(self, features): - """Compute the self-supervised VPCL loss. + def _self_supervised_contrastive_loss(self, features: torch.Tensor): + """ + Compute the self-supervised VPCL loss. Parameters ---------- - features: torch.Tensor - the encoded features of multiple partitions of input tables, with shape ``(bs, n_partition, proj_dim)``. + features : torch.Tensor + The encoded features of multiple partitions of input tables, with shape (bs, n_partition, proj_dim). Returns ------- - loss: torch.Tensor - the computed self-supervised VPCL loss. + torch.Tensor + The computed self-supervised VPCL loss. """ batch_size = features.shape[0] labels = torch.arange(batch_size, dtype=torch.long, device=self.device).view(-1, 1) @@ -286,35 +339,53 @@ class HeteroMap(nn.Module): return loss -def _get_activation_fn(activation): - if activation == "relu": - return F.relu - elif activation == "gelu": - return F.gelu - elif activation == "selu": - return F.selu - elif activation == "leakyrelu": - return F.leaky_relu - raise RuntimeError("activation should be relu/gelu/selu/leakyrelu, not {}".format(activation)) - - class TransformerLayer(nn.Module): + """A custom Transformer layer implemented as a PyTorch module. + """ __config__ = ["batch_first", "norm_first"] def __init__( self, - d_model, - nhead, - dim_feedforward=2048, - dropout=0.1, - activation=F.relu, - layer_norm_eps=1e-5, - batch_first=True, - norm_first=False, - device=None, - dtype=None, - use_layer_norm=True, + d_model: int, + nhead: int, + dim_feedforward: int = 2048, + dropout: float = 0.1, + activation: Union[str, Callable] = F.relu, + layer_norm_eps: float = 1e-5, + batch_first: bool = True, + norm_first: bool = False, + device: Union[str, torch.device] = None, + dtype: torch.dtype = None, + use_layer_norm: bool = True, ): + """ + The initialization method for transformer layer. + + Parameters + ---------- + d_model : int + The number of expected features in the input + nhead : int + The number of heads in the multiheadattention models + dim_feedforward : int, optional + The dimension of the feedforward network model, by default 2048 + dropout : float, optional + The dropout value, by default 0.1 + activation : Union[str, Callable], optional + The activation function to use, by default F.relu + layer_norm_eps : float, optional + The epsilon used for layer normalization, by default 1e-5 + batch_first : bool, optional + Whether to use (batch, seq, feature) format for input and output tensors, by default True + norm_first : bool, optional + Whether to perform layer normalization before attention and feedforward operations, by default False + device : Union[str, torch.device], optional + The device on which the layer is to be run, by default None + dtype : torch.dtype, optional + The data type of the layer's parameters, by default None + use_layer_norm : bool, optional + Whether to use layer normalization, by default True + """ factory_kwargs = {"device": device, "dtype": dtype} super().__init__() self.self_attn = nn.MultiheadAttention(d_model, nhead, batch_first=batch_first, **factory_kwargs) @@ -338,12 +409,29 @@ class TransformerLayer(nn.Module): # Legacy string support for activation function. if isinstance(activation, str): - self.activation = _get_activation_fn(activation) + self.activation = self._get_activation_fn(activation) else: self.activation = activation # self-attention block - def _sa_block(self, x: Tensor, attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor]) -> Tensor: + def _sa_block(self, x: torch.Tensor, attn_mask: torch.Tensor, key_padding_mask: torch.Tensor) -> torch.Tensor: + """ + Applies a self-attention block to the input tensor. + + Parameters + ---------- + x : torch.Tensor + The input tensor for the self-attention block. + attn_mask : torch.Tensor + The attention mask for the self-attention operation. + key_padding_mask : torch.Tensor + The key padding mask for the self-attention operation. + + Returns + ------- + torch.Tensor + The output tensor after applying the self-attention block. + """ key_padding_mask = ~key_padding_mask.bool() x = self.self_attn( x, @@ -355,7 +443,20 @@ class TransformerLayer(nn.Module): return self.dropout1(x) # feed forward block - def _ff_block(self, x: Tensor) -> Tensor: + def _ff_block(self, x: torch.Tensor) -> torch.Tensor: + """ + Applies a feed-forward block to the input tensor. + + Parameters + ---------- + x : torch.Tensor + The input tensor for the feed-forward block. + + Returns + ------- + torch.Tensor + The output tensor after applying the feed-forward block. + """ g = self.gate_act(self.gate_linear(x)) h = self.linear1(x) h = h * g # add gate @@ -367,19 +468,56 @@ class TransformerLayer(nn.Module): state["activation"] = F.relu super().__setstate__(state) - def forward(self, src, src_mask=None, src_key_padding_mask=None, is_causal=None, **kwargs) -> Tensor: + @staticmethod + def _get_activation_fn(activation: str) -> Callable: + """ + Retrieves the activation function based on the provided activation name. + + Parameters + ---------- + activation : str + Name of the activation function. Supported values are "relu", "gelu", "selu", and "leakyrelu". + + Returns + ------- + Callable + The corresponding activation function from torch.nn.functional. + """ + if activation == "relu": + return F.relu + elif activation == "gelu": + return F.gelu + elif activation == "selu": + return F.selu + elif activation == "leakyrelu": + return F.leaky_relu + raise RuntimeError("activation should be relu/gelu/selu/leakyrelu, not {}".format(activation)) + + def forward(self, + src: torch.Tensor, + src_mask: torch.Tensor = None, + src_key_padding_mask: torch.Tensor = None, + is_causal: torch.Tensor = None, + **kwargs + ) -> torch.Tensor: """Pass the input through the encoder layer. Parameters ---------- - src : Any + src : torch.Tensor The sequence to the encoder layer. - src_mask : Any, optional + src_mask : torch.Tensor, optional The mask for the src sequence, by default None - src_key_padding_mask : Any, optional + src_key_padding_mask : torch.Tensor, optional The mask for the src keys per batch, by default None + is_causal : torch.Tensor, optional + A flag indicating whether the layer should be causal, by default None + + Returns + ------- + torch.Tensor + The output tensor after passing through the encoder layer. """ - # see Fig. 1 of https://arxiv.org/pdf/2002.04745v1.pdf x = src if self.use_layer_norm: @@ -397,15 +535,35 @@ class TransformerLayer(nn.Module): class TransformerMultiLayer(nn.Module): + """A custom multi-layer Transformer module. + """ def __init__( self, - hidden_dim=128, - num_layer=2, - num_attention_head=2, - hidden_dropout_prob=0, - ffn_dim=256, - activation="relu", + hidden_dim: int = 128, + num_layer: int = 2, + num_attention_head: int = 2, + hidden_dropout_prob: float = 0, + ffn_dim: int = 256, + activation: Union[str, Callable] = "relu", ): + """ + The initialization method for align transformer multilayer. + + Parameters + ---------- + hidden_dim : int, optional + Dimension of the hidden layer in the Transformer, by default 128. + num_layer : int, optional + Number of Transformer layers, by default 2. + num_attention_head : int, optional + Number of attention heads in each Transformer layer, by default 2. + hidden_dropout_prob : float, optional + Dropout probability for the hidden layers, by default 0. + ffn_dim : int, optional + Dimension of the feedforward network model, by default 256. + activation : Union[str, Callable], optional + The activation function to be used, by default "relu". + """ super().__init__() self.transformer_encoder = nn.ModuleList( [ @@ -437,14 +595,23 @@ class TransformerMultiLayer(nn.Module): stacked_transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layer - 1) self.transformer_encoder.append(stacked_transformer) - def forward(self, embedding, attention_mask=None, **kwargs) -> Tensor: + def forward(self, embedding: torch.Tensor, attention_mask: torch.Tensor = None, **kwargs) -> torch.Tensor: """ + Passes the input embedding through the Transformer encoder layers. + Parameters ---------- - embedding : Any - bs, num_token, hidden_dim + embedding : torch.Tensor + The input embedding tensor with shape (batch size, number of tokens, hidden dimension). + attention_mask : torch.Tensor, optional + The attention mask for the input tensor, by default None. + + Returns + ------- + Tensor + The output tensor after processing through Transformer encoder layers. """ outputs = embedding for i, mod in enumerate(self.transformer_encoder): outputs = mod(outputs, src_key_padding_mask=attention_mask) - return outputs + return outputs \ No newline at end of file diff --git a/learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py b/learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py index ef27344..d424a72 100644 --- a/learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py +++ b/learnware/market/heterogeneous/organizer/hetero_map/feature_extractor.py @@ -1,6 +1,6 @@ import os import math -from typing import Dict +from typing import Dict, Callable import numpy as np import torch @@ -12,23 +12,53 @@ from .....config import C as conf class WordEmbedding(nn.Module): - """Encode tokens drawn from column names""" + """Encodes tokens drawn from column names into word embeddings. + """ def __init__( self, - vocab_size, - hidden_dim, - padding_idx=0, - hidden_dropout_prob=0, - layer_norm_eps=1e-5, + vocab_size: int, + hidden_dim: int, + padding_idx: int = 0, + hidden_dropout_prob: float = 0, + layer_norm_eps: float = 1e-5, ): + """ + The initialization method for word embedding. + + Parameters + ---------- + vocab_size : int + The size of the vocabulary. + hidden_dim : int + The dimension of the hidden layer. + padding_idx : int, optional + The index of the padding token, by default 0. + hidden_dropout_prob : float, optional + The dropout probability for the hidden layer, by default 0. + layer_norm_eps : float, optional + The epsilon value for layer normalization, by default 1e-5. + """ super().__init__() self.word_embeddings = nn.Embedding(vocab_size, hidden_dim, padding_idx) nn_init.kaiming_normal_(self.word_embeddings.weight) self.norm = nn.LayerNorm(hidden_dim, eps=layer_norm_eps) self.dropout = nn.Dropout(hidden_dropout_prob) - def forward(self, input_ids) -> Tensor: + def forward(self, input_ids: torch.Tensor) -> torch.Tensor: + """ + Performs the forward pass of the WordEmbedding module. + + Parameters + ---------- + input_ids : torch.Tensor + The input token IDs. + + Returns + ------- + torch.Tensor + The word embeddings corresponding to the input token IDs. + """ embeddings = self.word_embeddings(input_ids) embeddings = self.norm(embeddings) embeddings = self.dropout(embeddings) @@ -38,20 +68,35 @@ class WordEmbedding(nn.Module): class NumEmbedding(nn.Module): """Encode tokens drawn from column names and the corresponding numerical features.""" - def __init__(self, hidden_dim): + def __init__(self, hidden_dim: int): + """ + The initialization method for num embedding. + + Parameters + ---------- + hidden_dim : int + The dimension of the hidden layer. + """ super().__init__() self.norm = nn.LayerNorm(hidden_dim) self.num_bias = nn.Parameter(Tensor(1, 1, hidden_dim)) # add bias nn_init.uniform_(self.num_bias, a=-1 / math.sqrt(hidden_dim), b=1 / math.sqrt(hidden_dim)) - def forward(self, col_emb, x_ts) -> Tensor: + def forward(self, col_emb: torch.Tensor, x_ts: torch.Tensor) -> torch.Tensor: """ + Performs the forward pass of the NumEmbedding module. + Parameters ---------- - col_emb : Any - numerical column embedding, (# numerical columns, emb_dim) - x_ts : Any - numerical features, (bs, emb_dim) + col_emb : torch.Tensor + The numerical column embeddings with shape (# numerical columns, emb_dim). + x_ts : torch.Tensor + The numerical features with shape (bs, emb_dim). + + Returns + ------- + torch.Tensor + The combined feature embeddings. """ col_emb = col_emb.unsqueeze(0).expand((x_ts.shape[0], -1, -1)) feat_emb = col_emb * x_ts.unsqueeze(-1).float() + self.num_bias @@ -63,14 +108,16 @@ class FeatureTokenizer: def __init__( self, - disable_tokenizer_parallel=True, + disable_tokenizer_parallel: bool = True, **kwargs, ): """ + The initialization method for feature tokenizer. + . Parameters ---------- disable_tokenizer_parallel : bool, optional - true if use extractor for collator function in torch.DataLoader + Whether to disable tokenizer parallelism, by default True. """ cache_dir = conf["cache_path"] os.makedirs(cache_dir, exist_ok=True) @@ -81,22 +128,23 @@ class FeatureTokenizer: self.vocab_size = self.tokenizer.vocab_size self.pad_token_id = self.tokenizer.pad_token_id - def __call__(self, x, shuffle=False, keep_input_grad=False) -> Dict: + def __call__(self, x: pd.DataFrame, shuffle: bool = False, keep_input_grad: bool = False) -> Dict: """ + Tokenizes the input DataFrame. + Parameters ---------- - x: pd.DataFrame - with column names and features. - - shuffle: bool - if shuffle column order during the training. + x : pd.DataFrame + The input DataFrame with column names and features. + shuffle : bool, optional + Whether to shuffle column order during training, by default False. + keep_input_grad : bool, optional + Whether to keep input gradients, by default False. Returns ------- - encoded_inputs: a dict with { - 'x_num': tensor contains numerical features, - 'num_col_input_ids': tensor contains numerical column tokenized ids, - } + Dict + A dictionary with tokenized inputs. """ encoded_inputs = {"x_num": None, "num_col_input_ids": None} num_cols = x.columns.tolist() if not shuffle else np.random.shuffle(x.columns.tolist()) @@ -120,21 +168,24 @@ class FeatureTokenizer: return encoded_inputs - def forward(self, cols, x) -> Dict: + def forward(self, cols: List[str], x: torch.Tensor) -> Dict: """ + Processes the input data and generates encoded inputs suitable for model encoding. + Parameters ---------- cols: List[str] - Contain all column names in order. + A list containing all column names in order. x: torch.Tensor + The tensor containing numerical features. Returns ------- - encoded_inputs: a dict with { - 'x_num': tensor contains numerical features, - 'num_col_input_ids': tensor contains numerical column tokenized ids, - } + Dict + - 'x_num': Tensor containing numerical features. + - 'num_col_input_ids': Tensor containing tokenized IDs of numerical columns. + - 'num_att_mask': Attention mask for the numerical column tokens. """ encoded_inputs = { "x_num": None, @@ -156,18 +207,32 @@ class FeatureTokenizer: class FeatureProcessor(nn.Module): - """ - Process inputs from feature extractor to map them to embeddings. - """ + """Process inputs from feature extractor to map them to embeddings.""" def __init__( self, - vocab_size=None, - hidden_dim=128, - hidden_dropout_prob=0, - pad_token_id=0, - device="cuda:0", + vocab_size: int = None, + hidden_dim: int = 128, + hidden_dropout_prob: float = 0, + pad_token_id: int = 0, + device: Union[str, torch.device] = "cuda:0", ): + """ + The initialization method for feature processor. + + Parameters + ---------- + vocab_size : int, optional + The size of the vocabulary. + hidden_dim : int, optional + The dimension of the hidden layer, by default 128. + hidden_dropout_prob : float, optional + The dropout probability for the hidden layer, by default 0. + pad_token_id : int, optional + The index of the padding token, by default 0. + device : Union[str, torch.device], optional + The device to run the module on, by default "cuda:0". + """ super().__init__() self.word_embedding = WordEmbedding( vocab_size=vocab_size, @@ -179,7 +244,22 @@ class FeatureProcessor(nn.Module): self.align_layer = nn.Linear(hidden_dim, hidden_dim, bias=False) self.device = device - def _avg_embedding_by_mask(self, embs, att_mask=None): + def _avg_embedding_by_mask(self, embs: torch.Tensor, att_mask: torch.Tensor = None) -> torch.Tensor: + """ + Averages the embeddings based on the attention mask. + + Parameters + ---------- + embs : torch.Tensor + The embeddings tensor. + att_mask : torch.Tensor, optional + The attention mask to apply on the embeddings. If None, the mean of the embeddings is returned, by default None. + + Returns + ------- + torch.Tensor + The resulting averaged embeddings. + """ if att_mask is None: return embs.mean(1) else: @@ -189,11 +269,28 @@ class FeatureProcessor(nn.Module): def forward( self, - x_num=None, - num_col_input_ids=None, - num_att_mask=None, + x_num: torch.Tensor = None, + num_col_input_ids: torch.Tensor = None, + num_att_mask: torch.Tensor = None, **kwargs, - ) -> Tensor: + ) -> torch.Tensor: + """ + Performs the forward pass of the FeatureProcessor module. + + Parameters + ---------- + x_num : torch.Tensor, optional + The numerical features. + num_col_input_ids : torch.Tensor, optional + The input IDs for numerical columns. + num_att_mask : torch.Tensor, optional + The attention mask. + + Returns + ------- + torch.Tensor + The processed feature embeddings. + """ x_num = x_num.to(self.device) num_col_emb = self.word_embedding(num_col_input_ids.to(self.device)) @@ -209,22 +306,58 @@ class FeatureProcessor(nn.Module): class CLSToken(nn.Module): - """add a learnable cls token embedding at the end of each sequence.""" + """Add a learnable cls token embedding at the end of each sequence.""" - def __init__(self, hidden_dim): + def __init__(self, hidden_dim: int): + """ + The initialization method for CLSToken. + + Parameters + ---------- + hidden_dim : int + The dimension of the hidden layer. + """ super().__init__() self.weight = nn.Parameter(Tensor(hidden_dim)) nn_init.uniform_(self.weight, a=-1 / math.sqrt(hidden_dim), b=1 / math.sqrt(hidden_dim)) self.hidden_dim = hidden_dim - def expand(self, *leading_dimensions): + def expand(self, *leading_dimensions) -> torch.Tensor: + """ + Expands the CLS token embedding to match the leading dimensions of the input. + + Parameters + ---------- + leading_dimensions : tuple + A variable number of integer arguments representing the leading dimensions to which the CLS token embedding will be expanded. + + Returns + ------- + torch.Tensor + Expanded CLS token embedding. + """ new_dims = (1,) * (len(leading_dimensions) - 1) # cls token (128,) -> view(*new_dims, -1) -> (1, 128) # (1, 128) -> expand(*leading_dimensions, -1) -> (64, 1, 128) # here expand means "shared", the cls token embedding remains the same for each sample return self.weight.view(*new_dims, -1).expand(*leading_dimensions, -1) - def forward(self, embedding, attention_mask=None, **kwargs) -> Tensor: + def forward(self, embedding: torch.Tensor, attention_mask: torch.Tensor = None, **kwargs) -> torch.Tensor: + """ + Performs a forward pass by adding a learnable CLS token to the embedding. + + Parameters + ---------- + embedding : torch.Tensor + The input embedding tensor. + attention_mask : torch.Tensor, optional + The attention mask for the input tensor, by default None. + + Returns + ------- + torch.Tensor + Output embedding with the CLS token added. + """ # embedding shape: (64, 11, 128), where 11 is the largest sequence length after tokenizing # after concat, learnable cls token [self.weight] is added to each semantic embedding # embedding shape: (64, d+1, 128) diff --git a/learnware/market/heterogeneous/organizer/hetero_map/trainer.py b/learnware/market/heterogeneous/organizer/hetero_map/trainer.py index c4a85e6..3194b71 100644 --- a/learnware/market/heterogeneous/organizer/hetero_map/trainer.py +++ b/learnware/market/heterogeneous/organizer/hetero_map/trainer.py @@ -1,7 +1,8 @@ -import json +import json_get_parameter_names import math import os import time +from typings import Any, Callable, List, Dict import numpy as np import pandas as pd @@ -19,17 +20,44 @@ logger = get_module_logger("hetero_mapping_trainer") class Trainer: def __init__( self, - model, - train_set_list, - collate_fn=None, - output_dir="./ckpt", - num_epoch=10, - batch_size=64, - lr=1e-4, - weight_decay=0, - eval_batch_size=256, + model: Any, + train_set_list: List[Any], + collate_fn: Callable = None, + output_dir: str = "./ckpt", + num_epoch: int = 10, + batch_size: int = 64, + lr: float = 1e-4, + weight_decay: float = 0, + eval_batch_size: int = 256, **kwargs, ): + """ + The initialization method for trainer. + + Parameters + ---------- + model : Any + The model to be trained. + train_set_list : List[Any] + A list of training datasets. + collate_fn : Callable, optional + The collate function to be used, by default None. + output_dir : str, optional + The directory where the trained model checkpoints will be saved, by default "./ckpt". + num_epoch : int, optional + Number of epochs for training, by default 10. + batch_size : int, optional + Batch size for training, by default 64. + lr : float, optional + Learning rate, by default 1e-4. + weight_decay : float, optional + Weight decay, by default 0. + eval_batch_size : int, optional + Batch size for evaluation, by default 256. + kwargs : dict + Additional keyword arguments. + """ + self.model = model if isinstance(train_set_list, tuple): train_set_list = [train_set_list] @@ -52,7 +80,21 @@ class Trainer: self.args["steps_per_epoch"] = int(self.args["num_training_steps"] / (num_epoch * len(self.train_set_list))) self.optimizer = None - def train(self, verbose: bool = True): + def train(self, verbose: bool = True) -> float: + """ + Trains the model using the provided training data. + + Parameters + ---------- + verbose : bool, optional + Whether to display verbose output, by default True. + + Returns + ------- + float + The final training loss. + """ + self._create_optimizer() start_time = time.time() final_train_loss = 0 @@ -82,11 +124,22 @@ class Trainer: logger.info("training complete, cost {:.1f} secs.".format(time.time() - start_time)) return final_train_loss - def save_model(self, output_dir=None): + def save_model(self, output_dir: str = None): + """ + Saves the trained model to the specified directory. + + Parameters + ---------- + output_dir : str, optional + The directory where the model will be saved, by default None. + """ + logger.info(f"saving model checkpoint to {output_dir}") self.model.save(output_dir) def _create_optimizer(self): + """Creates an optimizer for training the model.""" + if self.optimizer is None: decay_parameters = self._get_parameter_names(self.model, [nn.LayerNorm]) decay_parameters = [name for name in decay_parameters if "bias" not in name] @@ -104,7 +157,25 @@ class Trainer: self.optimizer = torch.optim.Adam(optimizer_grouped_parameters, lr=self.args["lr"]) - def _get_num_train_steps(self, train_set_list, num_epoch, batch_size): + def _get_num_train_steps(self, train_set_list: List[Any], num_epoch: int, batch_size: int) -> int: + """ + Calculates the total number of training steps. + + Parameters + ---------- + train_set_list : List[Any] + A list of training datasets. + num_epoch : int + Number of training epochs. + batch_size : int + Batch size for training. + + Returns + ------- + int + The total number of training steps. + """ + total_step = 0 for trainset in train_set_list: x_train = trainset @@ -112,7 +183,29 @@ class Trainer: total_step *= num_epoch return total_step - def _build_dataloader(self, trainset, batch_size, collator, shuffle=True): + def _build_dataloader( + self, trainset: Any, batch_size: int, collator: Callable, shuffle: bool = True + ) -> torch.utils.data.DataLoader: + """ + Builds a DataLoader for training. + + Parameters + ---------- + trainset : Any + The training dataset. + batch_size : int + Batch size for the DataLoader. + collator : Callable + Collate function for the DataLoader. + shuffle : bool, optional + Whether to shuffle the data, by default True. + + Returns + ------- + torch.utils.data.DataLoader + The DataLoader for the training data. + """ + trainloader = DataLoader( TrainDataset(trainset), collate_fn=collator, @@ -123,8 +216,22 @@ class Trainer: ) return trainloader - def _get_parameter_names(self, model, forbidden_layer_types): - """Returns the names of the model parameters that are not inside a forbidden layer.""" + def _get_parameter_names(self, model: Any, forbidden_layer_types: List[torch.dtype]) -> List[str]: + """ + Retrieves the names of parameters not inside forbidden layers. + + Parameters + ---------- + model : Any + The model from which to retrieve parameters. + forbidden_layer_types : List[torch.dtype] + A list of layer types to exclude. + + Returns + ------- + List[str] + A list of parameter names not inside the forbidden layers. + """ result = [] for name, child in model.named_children(): result += [ @@ -150,15 +257,27 @@ class TrainDataset(Dataset): class TransTabCollatorForCL: - """support positive pair sampling for contrastive learning.""" + """Collator class supporting positive pair sampling for contrastive learning.""" def __init__( self, - feature_tokenizer=None, - overlap_ratio=0.5, - num_partition=3, + feature_tokenizer: Callable = None, + overlap_ratio: float = 0.5, + num_partition: int = 3, **kwargs, ): + """ + The initialization method for TransTabCollatorForCL. + + Parameters + ---------- + feature_tokenizer : Callable, optional + The tokenizer used to process data, by default None. + overlap_ratio : float, optional + The ratio of overlap between partitions, must be between 0 and 1 (exclusive), by default 0.5. + num_partition : int, optional + The number of partitions to create from the data for contrastive learning, by default 3. + """ self.feature_tokenizer = feature_tokenizer or FeatureTokenizer(disable_tokenizer_parallel=True) assert num_partition > 0, f"number of contrastive subsets must be greater than 0, got {num_partition}" assert isinstance(num_partition, int), f"number of constrative subsets must be int, got {type(num_partition)}" @@ -166,10 +285,20 @@ class TransTabCollatorForCL: self.overlap_ratio = overlap_ratio self.num_partition = num_partition - def __call__(self, data): - """Take a list of subsets (views) from the original tests.""" - # 1. build positive pairs - # 2. encode each pair using feature extractor + def __call__(self, data: List[Any]) -> Dict[str, Any]: + """ + Processes the data into subsets for contrastive learning. + + Parameters + ---------- + data : List[Any] + The input data to be processed. + + Returns + ------- + Dict[str, Any] + A dictionary containing the processed data subsets. + """ df_x = pd.concat([row for row in data]) if self.num_partition > 1: sub_x_list = self._build_positive_pairs(df_x, self.num_partition) @@ -182,20 +311,21 @@ class TransTabCollatorForCL: res = {"input_sub_x": input_x_list} return res - def _build_positive_pairs(self, x, n): - """Builds positive pairs of sub-dataframes from the input dataframe x. + def _build_positive_pairs(self, x: pd.DataFrame, n: int) -> List[pd.DataFrame]: + """ + Builds positive pairs of sub-dataframes from the input dataframe. Parameters ---------- - x : pandas.DataFrame - Input dataframe. + x : pd.DataFrame + The input dataframe. n : int - Number of sub-dataframes to split x into. + The number of sub-dataframes to create. Returns ------- - List - List of sub-dataframes, each containing a positive pair of columns from x. + List[pd.DataFrame] + A list of sub-dataframes, each containing a positive pair of columns. """ x_cols = x.columns.tolist() sub_col_list = np.array_split(np.array(x_cols), n) @@ -211,18 +341,19 @@ class TransTabCollatorForCL: sub_x_list.append(sub_x) return sub_x_list - def _build_positive_pairs_single_view(self, x): - """Builds positive pairs for a single view of data by corrupting half of the columns and shuffling the corrupted columns. + def _build_positive_pairs_single_view(self, x: pd.DataFrame) -> List[pd.DataFrame]: + """ + Builds positive pairs for a single view of data by corrupting half of the columns and shuffling the corrupted columns.. Parameters ---------- - x : pandas.DataFrame - The input data. + x : pd.DataFrame + The input dataframe. Returns ------- - List - A list of two pandas DataFrames, where each DataFrame contains the original data with half of the columns corrupted and shuffled. + List[pd.DataFrame] + A list containing two dataframes, one with original data and one with shuffled columns. """ x_cols = x.columns.tolist() sub_x_list = [x] @@ -231,4 +362,4 @@ class TransTabCollatorForCL: x_corrupt = x.copy()[corrupt_cols] np.random.shuffle(x_corrupt.values) sub_x_list.append(pd.concat([x.copy().drop(corrupt_cols, axis=1), x_corrupt], axis=1)) - return sub_x_list + return sub_x_list \ No newline at end of file diff --git a/learnware/market/heterogeneous/searcher.py b/learnware/market/heterogeneous/searcher.py index 3605609..f8ef680 100644 --- a/learnware/market/heterogeneous/searcher.py +++ b/learnware/market/heterogeneous/searcher.py @@ -11,7 +11,19 @@ logger = get_module_logger("hetero_searcher") class HeteroSearcher(EasySearcher): @staticmethod - def check_user_info(user_info: BaseUserInfo): + def check_user_info(user_info: BaseUserInfo) -> bool: + """Check if user_info satifies all the criteria required for enabling heterogeneous learnware search + + Parameters + ---------- + user_info : BaseUserInfo + user_info contains semantic_spec and stat_info + + Returns + ------- + bool + A flag indicating whether heterogeneous search is enabled for user_info + """ try: user_stat_spec = user_info.get_stat_info("RKMETableSpecification") user_input_shape = user_stat_spec.get_z().shape[1] @@ -42,6 +54,27 @@ class HeteroSearcher(EasySearcher): def __call__( self, user_info: BaseUserInfo, check_status: int = None, max_search_num: int = 5, search_method: str = "greedy" ) -> Tuple[List[float], List[Learnware], float, List[Learnware]]: + """Search learnwares based on user_info from learnwares with check_status. + Employs heterogeneous learnware search if specific requirements are met, otherwise resorts to homogeneous search methods. + + Parameters + ---------- + user_info : BaseUserInfo + user_info contains semantic_spec and stat_info + max_search_num : int + The maximum number of the returned learnwares + check_status : int, optional + - None: search from all learnwares + - Others: search from learnwares with check_status + + Returns + ------- + Tuple[List[float], List[Learnware], float, List[Learnware]] + the first is the sorted list of rkme dist + the second is the sorted list of Learnware (single) by the rkme dist + the third is the score of Learnware (mixture) + the fourth is the list of Learnware (mixture), the size is search_num + """ learnware_list = self.learnware_organizer.get_learnwares(check_status=check_status) learnware_list = self.semantic_searcher(learnware_list, user_info) diff --git a/learnware/reuse/job_selector.py b/learnware/reuse/job_selector.py index 8077106..91ad512 100644 --- a/learnware/reuse/job_selector.py +++ b/learnware/reuse/job_selector.py @@ -166,7 +166,7 @@ class JobSelectorReuser(BaseReuser): def _calculate_rkme_spec_mixture_weight( self, user_data: np.ndarray, task_rkme_list: List[RKMETableSpecification], task_rkme_matrix: np.ndarray ) -> List[float]: - """_summary_ + """Calculate mixture weight for the learnware_list based on user's data Parameters ----------