{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import torch.nn as nn\n", "import torch\n", "\n", "from abl.reasoning import ReasonerBase, prolog_KB\n", "from abl.learning import BasicNN, ABLModel\n", "from abl.evaluation import SymbolMetric, SemanticsMetric\n", "from abl.utils import ABLLogger, reform_idx\n", "\n", "from examples.hed.hed_bridge import HEDBridge\n", "from models.nn import SymbolNet\n", "from datasets.get_hed import get_hed, split_equation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize logger\n", "logger = ABLLogger.get_instance(\"abl\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Logic Part" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize knowledge base and abducer\n", "class HED_prolog_KB(prolog_KB):\n", " def __init__(self, pseudo_label_list, pl_file):\n", " super().__init__(pseudo_label_list, pl_file)\n", " \n", " def consist_rule(self, exs, rules):\n", " rules = str(rules).replace(\"\\'\",\"\")\n", " return len(list(self.prolog.query(\"eval_inst_feature(%s, %s).\" % (exs, rules)))) != 0\n", "\n", " def abduce_rules(self, pred_res):\n", " prolog_result = list(self.prolog.query(\"consistent_inst_feature(%s, X).\" % pred_res))\n", " if len(prolog_result) == 0:\n", " return None\n", " prolog_rules = prolog_result[0]['X']\n", " rules = [rule.value for rule in prolog_rules]\n", " return rules\n", " \n", "\n", "kb = HED_prolog_KB(pseudo_label_list=[1, 0, '+', '='], pl_file='./datasets/learn_add.pl')\n", "\n", "class HED_Abducer(ReasonerBase):\n", " def __init__(self, kb, dist_func='hamming'):\n", " super().__init__(kb, dist_func, zoopt=True)\n", " \n", " def _revise_by_idxs(self, pred_res, key, all_address_flag, idxs):\n", " pred = []\n", " k = []\n", " address_flag = []\n", " for idx in idxs:\n", " pred.append(pred_res[idx])\n", " k.append(key[idx])\n", " address_flag += list(all_address_flag[idx])\n", " address_idx = np.where(np.array(address_flag) != 0)[0] \n", " candidate = self.revise_by_idx(pred, k, address_idx)\n", " return candidate\n", " \n", " def zoopt_revision_score(self, pred_res, pseudo_label, pred_res_prob, key, sol): \n", " all_address_flag = reform_idx(sol.get_x(), pseudo_label)\n", " lefted_idxs = [i for i in range(len(pred_res))]\n", " candidate_size = [] \n", " while lefted_idxs:\n", " idxs = []\n", " idxs.append(lefted_idxs.pop(0))\n", " max_candidate_idxs = []\n", " found = False\n", " for idx in range(-1, len(pred_res)):\n", " if (not idx in idxs) and (idx >= 0):\n", " idxs.append(idx)\n", " candidate = self._revise_by_idxs(pseudo_label, key, all_address_flag, idxs)\n", " if len(candidate) == 0:\n", " if len(idxs) > 1:\n", " idxs.pop()\n", " else:\n", " if len(idxs) > len(max_candidate_idxs):\n", " found = True\n", " max_candidate_idxs = idxs.copy() \n", " removed = [i for i in lefted_idxs if i in max_candidate_idxs]\n", " if found:\n", " candidate_size.append(len(removed) + 1)\n", " lefted_idxs = [i for i in lefted_idxs if i not in max_candidate_idxs]\n", " candidate_size.sort()\n", " score = 0\n", " import math\n", " for i in range(0, len(candidate_size)):\n", " score -= math.exp(-i) * candidate_size[i]\n", " return score\n", " \n", " def abduce(self, data, max_revision=-1, require_more_revision=0):\n", " batch_pred_label, batch_pred_prob, batch_pred_pseudo_label, batch_y = data\n", "\n", " solution = self.zoopt_get_solution(\n", " batch_pred_label, batch_pred_pseudo_label, batch_pred_prob, batch_y, max_revision\n", " )\n", " batch_revision_idx = reform_idx(solution.astype(np.int32), batch_pred_label)\n", " \n", " batch_abduced_pseudo_label = []\n", " for pred_pseudo_label, pred_prob, revision_idx in zip(batch_pred_pseudo_label, batch_pred_prob, batch_revision_idx):\n", " candidates = self.revise_by_idx([pred_pseudo_label], None, list(np.nonzero(np.array(revision_idx))[0]))\n", " if len(candidates) == 0:\n", " batch_abduced_pseudo_label.append([])\n", " else:\n", " batch_abduced_pseudo_label.append(candidates[0][0])\n", " # batch_abduced_pseudo_label.append(self._get_one_candidate(pred_pseudo_label, pred_prob, candidates)[0])\n", " return batch_abduced_pseudo_label\n", "\n", " def abduce_rules(self, pred_res):\n", " return self.kb.abduce_rules(pred_res)\n", " \n", "abducer = HED_Abducer(kb)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Machine Learning Part" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize necessary component for machine learning part\n", "cls = SymbolNet(\n", " num_classes=len(kb.pseudo_label_list),\n", " image_size=(28, 28, 1),\n", ")\n", "device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n", "criterion = nn.CrossEntropyLoss()\n", "optimizer = torch.optim.RMSprop(cls.parameters(), lr=0.001, weight_decay=1e-6)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize BasicNN\n", "# The function of BasicNN is to wrap NN models into the form of an sklearn estimator\n", "base_model = BasicNN(\n", " cls,\n", " criterion,\n", " optimizer,\n", " device,\n", " save_interval=1,\n", " save_dir=logger.save_dir,\n", " batch_size=32,\n", " num_epochs=1,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize ABL model\n", "# The main function of the ABL model is to serialize data and \n", "# provide a unified interface for different machine learning models\n", "model = ABLModel(base_model)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Metric" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add metric\n", "metric = [SymbolMetric(prefix=\"hed\"), SemanticsMetric(prefix=\"hed\")]" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Bridge Machine Learning and Logic Reasoning" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bridge = HEDBridge(model, abducer, metric)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total_train_data = get_hed(train=True)\n", "train_data, val_data = split_equation(total_train_data, 3, 1)\n", "test_data = get_hed(train=False)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Train and Test" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bridge.pretrain(\"./weights/\")\n", "bridge.train(train_data, val_data)" ] } ], "metadata": { "kernelspec": { "display_name": "ABL", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.16" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "fb6f4ceeabb9a733f366948eb80109f83aedf798cc984df1e68fb411adb27d58" } } }, "nbformat": 4, "nbformat_minor": 2 }