|
- import torch
- import torch.nn.functional
- import typing as _typing
-
- from torch_geometric.nn.conv import GCNConv
- import autogl.data
- from . import register_model
- from .base import BaseAutoModel, activate_func, ClassificationSupportedSequentialModel
- from ....utils import get_logger
-
- LOGGER = get_logger("GCNModel")
-
-
- class GCN(ClassificationSupportedSequentialModel):
- class _GCNLayer(torch.nn.Module):
- def __init__(
- self,
- input_channels: int,
- output_channels: int,
- add_self_loops: bool = True,
- normalize: bool = True,
- activation_name: _typing.Optional[str] = ...,
- dropout_probability: _typing.Optional[float] = ...,
- ):
- super().__init__()
- self._convolution: GCNConv = GCNConv(
- input_channels,
- output_channels,
- add_self_loops=bool(add_self_loops),
- normalize=bool(normalize),
- )
- if (
- activation_name is not Ellipsis
- and activation_name is not None
- and type(activation_name) == str
- ):
- self._activation_name: _typing.Optional[str] = activation_name
- else:
- self._activation_name: _typing.Optional[str] = None
- if (
- dropout_probability is not Ellipsis
- and dropout_probability is not None
- and type(dropout_probability) == float
- ):
- if dropout_probability < 0:
- dropout_probability = 0
- if dropout_probability > 1:
- dropout_probability = 1
- self._dropout: _typing.Optional[torch.nn.Dropout] = torch.nn.Dropout(
- dropout_probability
- )
- else:
- self._dropout: _typing.Optional[torch.nn.Dropout] = None
-
- def forward(self, data, enable_activation: bool = True) -> torch.Tensor:
- x: torch.Tensor = getattr(data, "x")
- edge_index: torch.LongTensor = getattr(data, "edge_index")
- edge_weight: _typing.Optional[torch.Tensor] = getattr(
- data, "edge_weight", None
- )
- """ Validate the arguments """
- if not type(x) == type(edge_index) == torch.Tensor:
- raise TypeError
- if edge_weight is not None and (
- type(edge_weight) != torch.Tensor
- or edge_index.size() != (2, edge_weight.size(0))
- ):
- edge_weight: _typing.Optional[torch.Tensor] = None
- x: torch.Tensor = self._convolution.forward(x, edge_index, edge_weight)
- if self._activation_name is not None and enable_activation:
- x: torch.Tensor = activate_func(x, self._activation_name)
- if self._dropout is not None:
- x: torch.Tensor = self._dropout.forward(x)
- return x
-
- def __init__(
- self,
- num_features: int,
- num_classes: int,
- hidden_features: _typing.Sequence[int],
- activation_name: str,
- dropout: _typing.Union[
- _typing.Optional[float], _typing.Sequence[_typing.Optional[float]]
- ] = None,
- add_self_loops: bool = True,
- normalize: bool = True,
- ):
- if isinstance(dropout, _typing.Sequence):
- if len(dropout) != len(hidden_features) + 1:
- raise TypeError(
- "When the dropout argument is a sequence, "
- "The sequence length must equal to the number of layers to construct."
- )
- for _dropout in dropout:
- if _dropout is not None and type(_dropout) != float:
- raise TypeError(
- "When the dropout argument is a sequence, "
- "every item in the sequence must be float or None"
- )
- dropout_list: _typing.Sequence[_typing.Optional[float]] = dropout
- elif type(dropout) == float:
- if dropout < 0:
- dropout = 0
- if dropout > 1:
- dropout = 1
- dropout_list: _typing.Sequence[_typing.Optional[float]] = [
- dropout for _ in range(len(hidden_features))
- ] + [None]
- elif dropout in (None, Ellipsis, ...):
- dropout_list: _typing.Sequence[_typing.Optional[float]] = [
- None for _ in range(len(hidden_features) + 1)
- ]
- else:
- raise TypeError(
- "The provided dropout argument must be a float number or None or "
- "a sequence in which each item is either a float Number or None."
- )
- super().__init__()
- if len(hidden_features) == 0:
- self.__sequential_encoding_layers: torch.nn.ModuleList = (
- torch.nn.ModuleList(
- (
- self._GCNLayer(
- num_features,
- num_classes,
- add_self_loops,
- normalize,
- dropout_probability=dropout_list[0],
- ),
- )
- )
- )
- else:
- self.__sequential_encoding_layers: torch.nn.ModuleList = (
- torch.nn.ModuleList()
- )
- self.__sequential_encoding_layers.append(
- self._GCNLayer(
- num_features,
- hidden_features[0],
- add_self_loops,
- normalize,
- activation_name,
- dropout_list[0],
- )
- )
- for hidden_feature_index in range(len(hidden_features)):
- if hidden_feature_index + 1 < len(hidden_features):
- self.__sequential_encoding_layers.append(
- self._GCNLayer(
- hidden_features[hidden_feature_index],
- hidden_features[hidden_feature_index + 1],
- add_self_loops,
- normalize,
- activation_name,
- dropout_list[hidden_feature_index + 1],
- )
- )
- # print((
- # hidden_features[hidden_feature_index],
- # hidden_features[hidden_feature_index + 1],
- # add_self_loops,
- # normalize,
- # activation_name,
- # dropout_list[hidden_feature_index + 1],
- # ))
- else:
- self.__sequential_encoding_layers.append(
- self._GCNLayer(
- hidden_features[hidden_feature_index],
- num_classes,
- add_self_loops,
- normalize,
- dropout_list[-1],
- )
- )
- # print((
- # hidden_features[hidden_feature_index],
- # num_classes,
- # add_self_loops,
- # normalize,
- # dropout_list[-1],
- # ))
-
- @property
- def sequential_encoding_layers(self) -> torch.nn.ModuleList:
- return self.__sequential_encoding_layers
-
- def __extract_edge_indexes_and_weights(
- self, data
- ) -> _typing.Union[
- _typing.Sequence[
- _typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]]
- ],
- _typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]],
- ]:
- def __compose_edge_index_and_weight(
- _edge_index: torch.LongTensor,
- _edge_weight: _typing.Optional[torch.Tensor] = None,
- ) -> _typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]]:
- if type(_edge_index) != torch.Tensor or _edge_index.dtype != torch.int64:
- print(type(_edge_index))
- raise TypeError
- if _edge_weight is not None and (
- type(_edge_weight) != torch.Tensor
- or _edge_index.size() != (2, _edge_weight.size(0))
- ):
- _edge_weight: _typing.Optional[torch.Tensor] = None
- return _edge_index, _edge_weight
-
- if not (
- hasattr(data, "edge_indexes")
- and isinstance(getattr(data, "edge_indexes"), _typing.Sequence)
- and len(getattr(data, "edge_indexes"))
- == len(self.__sequential_encoding_layers)
- ):
- return __compose_edge_index_and_weight(
- getattr(data, "edge_index"), getattr(data, "edge_weight", None)
- )
- for __edge_index in getattr(data, "edge_indexes"):
- if type(__edge_index) != torch.Tensor or __edge_index.dtype != torch.int64:
- return __compose_edge_index_and_weight(
- getattr(data, "edge_index"), getattr(data, "edge_weight", None)
- )
-
- if (
- hasattr(data, "edge_weights")
- and isinstance(getattr(data, "edge_weights"), _typing.Sequence)
- and len(getattr(data, "edge_weights"))
- == len(self.__sequential_encoding_layers)
- ):
- return [
- __compose_edge_index_and_weight(_edge_index, _edge_weight)
- for _edge_index, _edge_weight in zip(
- getattr(data, "edge_indexes"), getattr(data, "edge_weights")
- )
- ]
- else:
- return [
- __compose_edge_index_and_weight(__edge_index)
- for __edge_index in getattr(data, "edge_indexes")
- ]
-
- def forward(self, data):
- return self.cls_decode(self.cls_encode(data))
-
- def cls_encode(self, data) -> torch.Tensor:
- edge_indexes_and_weights: _typing.Union[
- _typing.Sequence[
- _typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]]
- ],
- _typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]],
- ] = self.__extract_edge_indexes_and_weights(data)
-
- if (not isinstance(edge_indexes_and_weights, tuple)) and isinstance(
- edge_indexes_and_weights[0], tuple
- ):
- """ edge_indexes_and_weights is sequence of (edge_index, edge_weight) """
- assert len(edge_indexes_and_weights) == len(
- self.__sequential_encoding_layers
- )
- x: torch.Tensor = getattr(data, "x")
- for _edge_index_and_weight, gcn in zip(
- edge_indexes_and_weights, self.__sequential_encoding_layers
- ):
- _temp_data = autogl.data.Data(x=x, edge_index=_edge_index_and_weight[0])
- _temp_data.edge_weight = _edge_index_and_weight[1]
- x = gcn(_temp_data)
- return x
- else:
- """ edge_indexes_and_weights is (edge_index, edge_weight) """
- x = getattr(data, "x")
- for gcn in self.__sequential_encoding_layers:
- _temp_data = autogl.data.Data(
- x=x, edge_index=edge_indexes_and_weights[0]
- )
- _temp_data.edge_weight = edge_indexes_and_weights[1]
- x = gcn(_temp_data)
- return x
-
- def cls_decode(self, x: torch.Tensor) -> torch.Tensor:
- return torch.nn.functional.log_softmax(x, dim=1)
-
- def lp_encode(self, data):
- x: torch.Tensor = getattr(data, "x")
- for i in range(len(self.__sequential_encoding_layers) - 2):
- x = self.__sequential_encoding_layers[i](
- autogl.data.Data(x, getattr(data, "edge_index"))
- )
- # _GCNLayer(
- # (_convolution): GCNConv(1433, 128)
- # (_dropout): Dropout(p=0.0, inplace=False)
- # )
- # print(self.__sequential_encoding_layers)
- x = self.__sequential_encoding_layers[-2](
- autogl.data.Data(x, getattr(data, "edge_index")), enable_activation=False
- )
- return x
-
- def lp_decode(self, z, pos_edge_index, neg_edge_index):
- edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1)
- logits = (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1)
- return logits
-
- def lp_decode_all(self, z):
- prob_adj = z @ z.t()
- return (prob_adj > 0).nonzero(as_tuple=False).t()
-
-
- @register_model("gcn-model")
- class AutoGCN(BaseAutoModel):
- r"""
- AutoGCN.
- The model used in this automodel is GCN, i.e., the graph convolutional network from the
- `"Semi-supervised Classification with Graph Convolutional
- Networks" <https://arxiv.org/abs/1609.02907>`_ paper. The layer is
-
- .. math::
-
- \mathbf{X}^{\prime} = \mathbf{\hat{D}}^{-1/2} \mathbf{\hat{A}}
- \mathbf{\hat{D}}^{-1/2} \mathbf{X} \mathbf{\Theta},
-
- where :math:`\mathbf{\hat{A}} = \mathbf{A} + \mathbf{I}` denotes the
- adjacency matrix with inserted self-loops and
- :math:`\hat{D}_{ii} = \sum_{j=0} \hat{A}_{ij}` its diagonal degree matrix.
-
- Parameters
- ----------
- num_features: ``int``
- The dimension of features.
-
- num_classes: ``int``
- The number of classes.
-
- device: ``torch.device`` or ``str``
- The device where model will be running on.
-
- init: `bool`.
- If True(False), the model will (not) be initialized.
- """
-
- def __init__(
- self,
- num_features: int = ...,
- num_classes: int = ...,
- device: _typing.Union[str, torch.device] = ...,
- **kwargs
- ) -> None:
- super().__init__(num_features, num_classes, device, **kwargs)
- self.hyper_parameter_space = [
- {
- "parameterName": "add_self_loops",
- "type": "CATEGORICAL",
- "feasiblePoints": [1],
- },
- {
- "parameterName": "normalize",
- "type": "CATEGORICAL",
- "feasiblePoints": [1],
- },
- {
- "parameterName": "num_layers",
- "type": "DISCRETE",
- "feasiblePoints": "2,3,4",
- },
- {
- "parameterName": "hidden",
- "type": "NUMERICAL_LIST",
- "numericalType": "INTEGER",
- "length": 3,
- "minValue": [8, 8, 8],
- "maxValue": [128, 128, 128],
- "scalingType": "LOG",
- "cutPara": ("num_layers",),
- "cutFunc": lambda x: x[0] - 1,
- },
- {
- "parameterName": "dropout",
- "type": "DOUBLE",
- "maxValue": 0.8,
- "minValue": 0.2,
- "scalingType": "LINEAR",
- },
- {
- "parameterName": "act",
- "type": "CATEGORICAL",
- "feasiblePoints": ["leaky_relu", "relu", "elu", "tanh"],
- },
- ]
-
- self.hyper_parameters = {
- "num_layers": 3,
- "hidden": [128, 64],
- "dropout": 0,
- "act": "relu",
- }
-
- def _initialize(self):
- self._model = GCN(
- self.input_dimension,
- self.output_dimension,
- self.hyper_parameters.get("hidden"),
- self.hyper_parameters.get("act"),
- self.hyper_parameters.get("dropout", None),
- bool(self.hyper_parameters.get("add_self_loops", True)),
- bool(self.hyper_parameters.get("normalize", True)),
- ).to(self.device)
- # print((
- # self.input_dimension,
- # self.output_dimension,
- # self.hyper_parameters.get("hidden"),
- # self.hyper_parameters.get("act"),
- # self.hyper_parameters.get("dropout", None),
- # bool(self.hyper_parameters.get("add_self_loops", True)),
- # bool(self.hyper_parameters.get("normalize", True))))
|