import numpy as np import typing as _typing from sklearn.metrics import ( f1_score, log_loss, accuracy_score, roc_auc_score, label_ranking_average_precision_score, ) class Evaluation: @staticmethod def get_eval_name() -> str: """ Expected to return the name of this evaluation method """ raise NotImplementedError @staticmethod def is_higher_better() -> bool: """ Expected to return whether this evaluation method is higher better (bool) """ raise NotImplementedError @staticmethod def evaluate(predict, label) -> float: """ Expected to return the evaluation result (float) """ raise NotImplementedError class EvaluatorUtility: """ Auxiliary utilities for evaluation """ class PredictionBatchCumulativeBuilder: """ Batch-cumulative builder for prediction For large graph, as it is infeasible to predict all the nodes in validation set and test set in single batch, and layer-wise prediction mechanism is a practical evaluation approach, a batch-cumulative prediction collector `PredictionBatchCumulativeBuilder` is implemented for prediction in mini-batch manner. """ def __init__(self): self.__indexes_in_integral_data: _typing.Optional[np.ndarray] = None self.__prediction: _typing.Optional[np.ndarray] = None def clear_batches( self, *__args, **__kwargs ) -> "EvaluatorUtility.PredictionBatchCumulativeBuilder": self.__indexes_in_integral_data = None self.__prediction = None return self def add_batch( self, indexes_in_integral_data: np.ndarray, batch_prediction: np.ndarray ) -> "EvaluatorUtility.PredictionBatchCumulativeBuilder": if not ( isinstance(indexes_in_integral_data, np.ndarray) and isinstance(batch_prediction, np.ndarray) and len(indexes_in_integral_data.shape) == 1 ): raise TypeError elif indexes_in_integral_data.shape[0] != batch_prediction.shape[0]: raise ValueError if self.__indexes_in_integral_data is None: if ( indexes_in_integral_data.shape != np.unique(indexes_in_integral_data).shape ): raise ValueError( f"There exists duplicate index " f"in the argument indexes_in_integral_data {indexes_in_integral_data}" ) else: self.__indexes_in_integral_data: np.ndarray = np.unique( indexes_in_integral_data ) else: __indexes_in_integral_data = np.concatenate( (self.__indexes_in_integral_data, indexes_in_integral_data) ) if ( __indexes_in_integral_data.shape != np.unique(__indexes_in_integral_data).shape ): raise ValueError else: self.__indexes_in_integral_data: np.ndarray = ( __indexes_in_integral_data ) if self.__prediction is None: self.__prediction: np.ndarray = batch_prediction else: self.__prediction: np.ndarray = np.concatenate( (self.__prediction, batch_prediction) ) return self def compose( self, __sorted: bool = True, **__kwargs ) -> _typing.Tuple[np.ndarray, np.ndarray]: if __sorted: sorted_index = np.argsort(self.__indexes_in_integral_data) return ( self.__indexes_in_integral_data[sorted_index], self.__prediction[sorted_index], ) else: return self.__indexes_in_integral_data, self.__prediction EVALUATE_DICT: _typing.Dict[str, _typing.Type[Evaluation]] = {} def register_evaluate(*name): def register_evaluate_cls(cls): for n in name: if n in EVALUATE_DICT: raise ValueError("Cannot register duplicate evaluator ({})".format(n)) if not issubclass(cls, Evaluation): raise ValueError( "Evaluator ({}: {}) must extend Evaluation".format(n, cls.__name__) ) EVALUATE_DICT[n] = cls return cls return register_evaluate_cls def get_feval(feval): if isinstance(feval, str): return EVALUATE_DICT[feval] if isinstance(feval, type) and issubclass(feval, Evaluation): return feval if isinstance(feval, _typing.Sequence): return [get_feval(f) for f in feval] raise ValueError("feval argument of type", type(feval), "is not supported!") class EvaluationUniversalRegistry: @classmethod def register_evaluation( cls, *names ) -> _typing.Callable[[_typing.Type[Evaluation]], _typing.Type[Evaluation]]: def _register_evaluation( _class: _typing.Type[Evaluation], ) -> _typing.Type[Evaluation]: for n in names: if n in EVALUATE_DICT: raise ValueError( "Cannot register duplicate evaluator ({})".format(n) ) if not issubclass(_class, Evaluation): raise ValueError( "Evaluator ({}: {}) must extend Evaluation".format( n, cls.__name__ ) ) EVALUATE_DICT[n] = _class return _class return _register_evaluation @register_evaluate("logloss") class Logloss(Evaluation): @staticmethod def get_eval_name(): return "logloss" @staticmethod def is_higher_better(): """ Should return whether this evaluation method is higher better (bool) """ return False @staticmethod def evaluate(predict, label): """ Should return: the evaluation result (float) """ return log_loss(label, predict) @register_evaluate("auc", "ROC-AUC") class Auc(Evaluation): @staticmethod def get_eval_name(): return "auc" @staticmethod def is_higher_better(): """ Should return whether this evaluation method is higher better (bool) """ return True @staticmethod def evaluate(predict, label): """ Should return: the evaluation result (float) """ if len(predict.shape) == 1: pos_predict = predict else: assert ( predict.shape[1] == 2 ), "Cannot use auc on given data with %d classes!" % (predict.shape[1]) pos_predict = predict[:, 1] return roc_auc_score(label, pos_predict) @register_evaluate("acc", "Accuracy") class Acc(Evaluation): @staticmethod def get_eval_name(): return "acc" @staticmethod def is_higher_better(): """ Should return whether this evaluation method is higher better (bool) """ return True @staticmethod def evaluate(predict, label): """ Should return: the evaluation result (float) """ if len(predict.shape) == 2: predict = np.argmax(predict, axis=1) else: predict = [1 if p > 0.5 else 0 for p in predict] return accuracy_score(label, predict) @register_evaluate("mrr") class Mrr(Evaluation): @staticmethod def get_eval_name(): return "mrr" @staticmethod def is_higher_better(): """ Should return whether this evaluation method is higher better (bool) """ return True @staticmethod def evaluate(predict, label): """ Should return: the evaluation result (float) """ if len(predict.shape) == 2: assert ( predict.shape[1] == 2 ), "Cannot use mrr on given data with %d classes!" % (predict.shape[1]) pos_predict = predict[:, 1] else: pos_predict = predict return label_ranking_average_precision_score(label, pos_predict) @register_evaluate("MicroF1") class MicroF1(Evaluation): @staticmethod def get_eval_name() -> str: return "MicroF1" @staticmethod def is_higher_better() -> bool: return True @staticmethod def evaluate(predict, label) -> float: return f1_score(label, np.argmax(predict, axis=1), average="micro")