|
- import torch
- import torch.nn.functional as F
- from torch.nn import Linear, ReLU, Sequential, LeakyReLU, Tanh, ELU
- from torch_geometric.nn import GINConv, global_add_pool
- from torch.nn import BatchNorm1d
- from . import register_model
- from .base import BaseAutoModel, activate_func
- from copy import deepcopy
- from ....utils import get_logger
-
- LOGGER = get_logger("GINModel")
-
-
- def set_default(args, d):
- for k, v in d.items():
- if k not in args:
- args[k] = v
- return args
-
-
- class GIN(torch.nn.Module):
- def __init__(self, args):
- super(GIN, self).__init__()
- self.args = args
- self.num_layer = int(self.args["num_layers"])
- assert self.num_layer > 2, "Number of layers in GIN should not less than 3"
-
- missing_keys = list(
- set(
- [
- "features_num",
- "num_class",
- "num_graph_features",
- "num_layers",
- "hidden",
- "dropout",
- "act",
- "mlp_layers",
- "eps",
- ]
- )
- - set(self.args.keys())
- )
- if len(missing_keys) > 0:
- raise Exception("Missing keys: %s." % ",".join(missing_keys))
- if not self.num_layer == len(self.args["hidden"]) + 1:
- LOGGER.warn("Warning: layer size does not match the length of hidden units")
- self.num_graph_features = self.args["num_graph_features"]
-
- if self.args["act"] == "leaky_relu":
- act = LeakyReLU()
- elif self.args["act"] == "relu":
- act = ReLU()
- elif self.args["act"] == "elu":
- act = ELU()
- elif self.args["act"] == "tanh":
- act = Tanh()
- else:
- act = ReLU()
-
- train_eps = True if self.args["eps"] == "True" else False
-
- self.convs = torch.nn.ModuleList()
- self.bns = torch.nn.ModuleList()
-
- nn = [Linear(self.args["features_num"], self.args["hidden"][0])]
- for _ in range(self.args["mlp_layers"] - 1):
- nn.append(act)
- nn.append(Linear(self.args["hidden"][0], self.args["hidden"][0]))
- # nn.append(BatchNorm1d(self.args['hidden'][0]))
- self.convs.append(GINConv(Sequential(*nn), train_eps=train_eps))
- self.bns.append(BatchNorm1d(self.args["hidden"][0]))
-
- for i in range(self.num_layer - 3):
- nn = [Linear(self.args["hidden"][i], self.args["hidden"][i + 1])]
- for _ in range(self.args["mlp_layers"] - 1):
- nn.append(act)
- nn.append(
- Linear(self.args["hidden"][i + 1], self.args["hidden"][i + 1])
- )
- # nn.append(BatchNorm1d(self.args['hidden'][i+1]))
- self.convs.append(GINConv(Sequential(*nn), train_eps=train_eps))
- self.bns.append(BatchNorm1d(self.args["hidden"][i + 1]))
-
- self.fc1 = Linear(
- self.args["hidden"][self.num_layer - 3] + self.num_graph_features,
- self.args["hidden"][self.num_layer - 2],
- )
- self.fc2 = Linear(
- self.args["hidden"][self.num_layer - 2], self.args["num_class"]
- )
-
- def forward(self, data):
- x, edge_index, batch = data.x, data.edge_index, data.batch
-
- if self.num_graph_features > 0:
- graph_feature = data.gf
-
- for i in range(self.num_layer - 2):
- x = self.convs[i](x, edge_index)
- x = activate_func(x, self.args["act"])
- x = self.bns[i](x)
-
- x = global_add_pool(x, batch)
- if self.num_graph_features > 0:
- x = torch.cat([x, graph_feature], dim=-1)
- x = self.fc1(x)
- x = activate_func(x, self.args["act"])
- x = F.dropout(x, p=self.args["dropout"], training=self.training)
-
- x = self.fc2(x)
-
- return F.log_softmax(x, dim=1)
-
-
- @register_model("gin-model")
- class AutoGIN(BaseAutoModel):
- r"""
- AutoGIN. The model used in this automodel is GIN, i.e., the graph isomorphism network from the `"How Powerful are
- Graph Neural Networks?" <https://arxiv.org/abs/1810.00826>`_ paper. The layer is
-
- .. math::
- \mathbf{x}^{\prime}_i = h_{\mathbf{\Theta}} \left( (1 + \epsilon) \cdot
- \mathbf{x}_i + \sum_{j \in \mathcal{N}(i)} \mathbf{x}_j \right)
-
- or
-
- .. math::
- \mathbf{X}^{\prime} = h_{\mathbf{\Theta}} \left( \left( \mathbf{A} +
- (1 + \epsilon) \cdot \mathbf{I} \right) \cdot \mathbf{X} \right),
-
- here :math:`h_{\mathbf{\Theta}}` denotes a neural network, *.i.e.* an MLP.
-
- Parameters
- ----------
- num_features: `int`.
- The dimension of features.
-
- num_classes: `int`.
- The number of classes.
-
- device: `torch.device` or `str`
- The device where model will be running on.
-
- init: `bool`.
- If True(False), the model will (not) be initialized.
- """
-
- def __init__(
- self,
- num_features=None,
- num_classes=None,
- device=None,
- init=False,
- num_graph_features=0,
- **args
- ):
-
- super().__init__(num_features, num_classes, device, num_graph_features=num_graph_features, **args)
- self.num_graph_features = num_graph_features
-
- self.hyper_parameter_space = [
- {
- "parameterName": "num_layers",
- "type": "DISCRETE",
- "feasiblePoints": "4,5,6",
- },
- {
- "parameterName": "hidden",
- "type": "NUMERICAL_LIST",
- "numericalType": "INTEGER",
- "length": 5,
- "minValue": [8, 8, 8, 8, 8],
- "maxValue": [64, 64, 64, 64, 64],
- "scalingType": "LOG",
- "cutPara": ("num_layers",),
- "cutFunc": lambda x: x[0] - 1,
- },
- {
- "parameterName": "dropout",
- "type": "DOUBLE",
- "maxValue": 0.9,
- "minValue": 0.1,
- "scalingType": "LINEAR",
- },
- {
- "parameterName": "act",
- "type": "CATEGORICAL",
- "feasiblePoints": ["leaky_relu", "relu", "elu", "tanh"],
- },
- {
- "parameterName": "eps",
- "type": "CATEGORICAL",
- "feasiblePoints": ["True", "False"],
- },
- {
- "parameterName": "mlp_layers",
- "type": "DISCRETE",
- "feasiblePoints": "2,3,4",
- },
- ]
-
- self.hyper_parameters = {
- "num_layers": 3,
- "hidden": [64, 32],
- "dropout": 0.5,
- "act": "relu",
- "eps": "True",
- "mlp_layers": 2,
- }
-
- def from_hyper_parameter(self, hp, **kwargs):
- return super().from_hyper_parameter(hp, num_graph_features=self.num_graph_features, **kwargs)
-
- def _initialize(self):
- # """Initialize model."""
- self._model = GIN({
- "features_num": self.input_dimension,
- "num_class": self.output_dimension,
- "num_graph_features": self.num_graph_features,
- **self.hyper_parameters
- }).to(self.device)
|