You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

basic_nn.py 19 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. from __future__ import annotations
  2. import logging
  3. import os
  4. from typing import Any, Callable, List, Optional, Tuple
  5. import numpy
  6. import torch
  7. from torch.utils.data import DataLoader
  8. from ..dataset import ClassificationDataset, PredictionDataset
  9. from ..utils.logger import print_log
  10. class BasicNN:
  11. """
  12. Wrap NN models into the form of an sklearn estimator.
  13. Parameters
  14. ----------
  15. model : torch.nn.Module
  16. The PyTorch model to be trained or used for prediction.
  17. loss_fn : torch.nn.Module
  18. The loss function used for training.
  19. optimizer : torch.optim.Optimizer
  20. The optimizer used for training.
  21. device : torch.device, optional
  22. The device on which the model will be trained or used for prediction,
  23. by default torch.device("cpu").
  24. batch_size : int, optional
  25. The batch size used for training, by default 32.
  26. num_epochs : int, optional
  27. The number of epochs used for training, by default 1.
  28. stop_loss : Optional[float], optional
  29. The loss value at which to stop training, by default 0.0001.
  30. num_workers : int
  31. The number of workers used for loading data, by default 0.
  32. save_interval : Optional[int], optional
  33. The model will be saved every ``save_interval`` epochs during training, by default None.
  34. save_dir : Optional[str], optional
  35. The directory in which to save the model during training, by default None.
  36. train_transform : Callable[..., Any], optional
  37. A function/transform that takes an object and returns a transformed version used
  38. in the `fit` and `train_epoch` methods, by default None.
  39. test_transform : Callable[..., Any], optional
  40. A function/transform that takes an object and returns a transformed version in the
  41. `predict`, `predict_proba` and `score` methods, , by default None.
  42. collate_fn : Callable[[List[T]], Any], optional
  43. The function used to collate data, by default None.
  44. """
  45. def __init__(
  46. self,
  47. model: torch.nn.Module,
  48. loss_fn: torch.nn.Module,
  49. optimizer: torch.optim.Optimizer,
  50. device: torch.device = torch.device("cpu"),
  51. batch_size: int = 32,
  52. num_epochs: int = 1,
  53. stop_loss: Optional[float] = 0.0001,
  54. num_workers: int = 0,
  55. save_interval: Optional[int] = None,
  56. save_dir: Optional[str] = None,
  57. train_transform: Callable[..., Any] = None,
  58. test_transform: Callable[..., Any] = None,
  59. collate_fn: Callable[[List[Any]], Any] = None,
  60. ) -> None:
  61. if not isinstance(model, torch.nn.Module):
  62. raise TypeError("model must be an instance of torch.nn.Module")
  63. if not isinstance(loss_fn, torch.nn.Module):
  64. raise TypeError("loss_fn must be an instance of torch.nn.Module")
  65. if not isinstance(optimizer, torch.optim.Optimizer):
  66. raise TypeError("optimizer must be an instance of torch.optim.Optimizer")
  67. if not isinstance(device, torch.device):
  68. raise TypeError("device must be an instance of torch.device")
  69. if not isinstance(batch_size, int):
  70. raise TypeError("batch_size must be an integer")
  71. if not isinstance(num_epochs, int):
  72. raise TypeError("num_epochs must be an integer")
  73. if stop_loss is not None and not isinstance(stop_loss, float):
  74. raise TypeError("stop_loss must be a float")
  75. if not isinstance(num_workers, int):
  76. raise TypeError("num_workers must be an integer")
  77. if save_interval is not None and not isinstance(save_interval, int):
  78. raise TypeError("save_interval must be an integer")
  79. if save_dir is not None and not isinstance(save_dir, str):
  80. raise TypeError("save_dir must be a string")
  81. if train_transform is not None and not callable(train_transform):
  82. raise TypeError("train_transform must be callable")
  83. if test_transform is not None and not callable(test_transform):
  84. raise TypeError("test_transform must be callable")
  85. if collate_fn is not None and not callable(collate_fn):
  86. raise TypeError("collate_fn must be callable")
  87. self.model = model.to(device)
  88. self.loss_fn = loss_fn
  89. self.optimizer = optimizer
  90. self.device = device
  91. self.batch_size = batch_size
  92. self.num_epochs = num_epochs
  93. self.stop_loss = stop_loss
  94. self.num_workers = num_workers
  95. self.save_interval = save_interval
  96. self.save_dir = save_dir
  97. self.train_transform = train_transform
  98. self.test_transform = test_transform
  99. self.collate_fn = collate_fn
  100. if self.save_interval is not None and self.save_dir is None:
  101. raise ValueError("save_dir should not be None if save_interval is not None.")
  102. if self.train_transform is not None and self.test_transform is None:
  103. print_log(
  104. "Transform used in the training phase will be used in prediction.",
  105. logger="current",
  106. level=logging.WARNING,
  107. )
  108. self.test_transform = self.train_transform
  109. def _fit(self, data_loader: DataLoader) -> BasicNN:
  110. """
  111. Internal method to fit the model on data for self.num_epochs times,
  112. with early stopping.
  113. Parameters
  114. ----------
  115. data_loader : DataLoader
  116. Data loader providing training samples.
  117. Returns
  118. -------
  119. BasicNN
  120. The model itself after training.
  121. """
  122. if not isinstance(data_loader, DataLoader):
  123. raise TypeError(
  124. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  125. f"but got {type(data_loader)}"
  126. )
  127. for epoch in range(self.num_epochs):
  128. loss_value = self.train_epoch(data_loader)
  129. if self.save_interval is not None and (epoch + 1) % self.save_interval == 0:
  130. self.save(epoch + 1)
  131. if self.stop_loss is not None and loss_value < self.stop_loss:
  132. break
  133. print_log(f"model loss: {loss_value:.5f}", logger="current")
  134. return self
  135. def fit(
  136. self, data_loader: DataLoader = None, X: List[Any] = None, y: List[int] = None
  137. ) -> BasicNN:
  138. """
  139. Train the model for self.num_epochs times or until the average loss on one epoch
  140. is less than self.stop_loss. It supports training with either a DataLoader
  141. object (data_loader) or a pair of input data (X) and target labels (y). If both
  142. data_loader and (X, y) are provided, the method will prioritize using the data_loader.
  143. Parameters
  144. ----------
  145. data_loader : DataLoader, optional
  146. The data loader used for training, by default None.
  147. X : List[Any], optional
  148. The input data, by default None.
  149. y : List[int], optional
  150. The target data, by default None.
  151. Returns
  152. -------
  153. BasicNN
  154. The model itself after training.
  155. """
  156. if data_loader is not None and X is not None:
  157. print_log(
  158. "data_loader will be used to train the model instead of X and y.",
  159. logger="current",
  160. level=logging.WARNING,
  161. )
  162. if data_loader is None:
  163. if X is None:
  164. raise ValueError("data_loader and X can not be None simultaneously.")
  165. else:
  166. data_loader = self._data_loader(X, y)
  167. return self._fit(data_loader)
  168. def train_epoch(self, data_loader: DataLoader) -> float:
  169. """
  170. Train the model with an instance of DataLoader (data_loader) for one epoch.
  171. Parameters
  172. ----------
  173. data_loader : DataLoader
  174. The data loader used for training.
  175. Returns
  176. -------
  177. float
  178. The average loss on one epoch.
  179. """
  180. model = self.model
  181. loss_fn = self.loss_fn
  182. optimizer = self.optimizer
  183. device = self.device
  184. model.train()
  185. total_loss, total_num = 0.0, 0
  186. for data, target in data_loader:
  187. data, target = data.to(device), target.to(device)
  188. out = model(data)
  189. loss = loss_fn(out, target)
  190. optimizer.zero_grad()
  191. loss.backward()
  192. optimizer.step()
  193. total_loss += loss.item() * data.size(0)
  194. total_num += data.size(0)
  195. return total_loss / total_num
  196. def _predict(self, data_loader: DataLoader) -> torch.Tensor:
  197. """
  198. Internal method to predict the outputs given a DataLoader.
  199. Parameters
  200. ----------
  201. data_loader : DataLoader
  202. The DataLoader providing input samples.
  203. Returns
  204. -------
  205. torch.Tensor
  206. Raw output from the model.
  207. """
  208. if not isinstance(data_loader, DataLoader):
  209. raise TypeError(
  210. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  211. f"but got {type(data_loader)}"
  212. )
  213. model = self.model
  214. device = self.device
  215. model.eval()
  216. with torch.no_grad():
  217. results = []
  218. for data in data_loader:
  219. data = data.to(device)
  220. out = model(data)
  221. results.append(out)
  222. return torch.cat(results, axis=0)
  223. def predict(self, data_loader: DataLoader = None, X: List[Any] = None) -> numpy.ndarray:
  224. """
  225. Predict the class of the input data. This method supports prediction with either
  226. a DataLoader object (data_loader) or a list of input data (X). If both data_loader
  227. and X are provided, the method will predict the input data in data_loader
  228. instead of X.
  229. Parameters
  230. ----------
  231. data_loader : DataLoader, optional
  232. The data loader used for prediction, by default None.
  233. X : List[Any], optional
  234. The input data, by default None.
  235. Returns
  236. -------
  237. numpy.ndarray
  238. The predicted class of the input data.
  239. """
  240. if data_loader is not None and X is not None:
  241. print_log(
  242. "Predict the class of input data in data_loader instead of X.",
  243. logger="current",
  244. level=logging.WARNING,
  245. )
  246. if data_loader is None:
  247. dataset = PredictionDataset(X, self.test_transform)
  248. data_loader = DataLoader(
  249. dataset,
  250. batch_size=self.batch_size,
  251. num_workers=int(self.num_workers),
  252. collate_fn=self.collate_fn,
  253. )
  254. return self._predict(data_loader).argmax(axis=1).cpu().numpy()
  255. def predict_proba(self, data_loader: DataLoader = None, X: List[Any] = None) -> numpy.ndarray:
  256. """
  257. Predict the probability of each class for the input data. This method supports
  258. prediction with either a DataLoader object (data_loader) or a list of input data (X).
  259. If both data_loader and X are provided, the method will predict the input data in
  260. data_loader instead of X.
  261. Parameters
  262. ----------
  263. data_loader : DataLoader, optional
  264. The data loader used for prediction, by default None.
  265. X : List[Any], optional
  266. The input data, by default None.
  267. Returns
  268. -------
  269. numpy.ndarray
  270. The predicted probability of each class for the input data.
  271. """
  272. if data_loader is not None and X is not None:
  273. print_log(
  274. "Predict the class probability of input data in data_loader instead of X.",
  275. logger="current",
  276. level=logging.WARNING,
  277. )
  278. if data_loader is None:
  279. dataset = PredictionDataset(X, self.test_transform)
  280. data_loader = DataLoader(
  281. dataset,
  282. batch_size=self.batch_size,
  283. num_workers=int(self.num_workers),
  284. collate_fn=self.collate_fn,
  285. )
  286. return self._predict(data_loader).softmax(axis=1).cpu().numpy()
  287. def _score(self, data_loader: DataLoader) -> Tuple[float, float]:
  288. """
  289. Internal method to compute loss and accuracy for the data provided through a DataLoader.
  290. Parameters
  291. ----------
  292. data_loader : DataLoader
  293. Data loader to use for evaluation.
  294. Returns
  295. -------
  296. Tuple[float, float]
  297. mean_loss: float, The mean loss of the model on the provided data.
  298. accuracy: float, The accuracy of the model on the provided data.
  299. """
  300. if not isinstance(data_loader, DataLoader):
  301. raise TypeError(
  302. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  303. f"but got {type(data_loader)}"
  304. )
  305. model = self.model
  306. loss_fn = self.loss_fn
  307. device = self.device
  308. model.eval()
  309. total_correct_num, total_num, total_loss = 0, 0, 0.0
  310. with torch.no_grad():
  311. for data, target in data_loader:
  312. data, target = data.to(device), target.to(device)
  313. out = model(data)
  314. if len(out.shape) > 1:
  315. correct_num = (target == out.argmax(axis=1)).sum().item()
  316. else:
  317. correct_num = (target == (out > 0.5)).sum().item()
  318. loss = loss_fn(out, target)
  319. total_loss += loss.item() * data.size(0)
  320. total_correct_num += correct_num
  321. total_num += data.size(0)
  322. mean_loss = total_loss / total_num
  323. accuracy = total_correct_num / total_num
  324. return mean_loss, accuracy
  325. def score(
  326. self, data_loader: DataLoader = None, X: List[Any] = None, y: List[int] = None
  327. ) -> float:
  328. """
  329. Validate the model. It supports validation with either a DataLoader object (data_loader)
  330. or a pair of input data (X) and ground truth labels (y). If both data_loader and
  331. (X, y) are provided, the method will prioritize using the data_loader.
  332. Parameters
  333. ----------
  334. data_loader : DataLoader, optional
  335. The data loader used for scoring, by default None.
  336. X : List[Any], optional
  337. The input data, by default None.
  338. y : List[int], optional
  339. The target data, by default None.
  340. Returns
  341. -------
  342. float
  343. The accuracy of the model.
  344. """
  345. print_log("Start machine learning model validation", logger="current")
  346. if data_loader is not None and X is not None:
  347. print_log(
  348. "data_loader will be used to validate the model instead of X and y.",
  349. logger="current",
  350. level=logging.WARNING,
  351. )
  352. if data_loader is None:
  353. if X is None or y is None:
  354. raise ValueError("data_loader and (X, y) can not be None simultaneously.")
  355. else:
  356. data_loader = self._data_loader(X, y)
  357. mean_loss, accuracy = self._score(data_loader)
  358. print_log(f"mean loss: {mean_loss:.3f}, accuray: {accuracy:.3f}", logger="current")
  359. return accuracy
  360. def _data_loader(self, X: List[Any], y: List[int] = None, shuffle: bool = True) -> DataLoader:
  361. """
  362. Generate a DataLoader for user-provided input data and target labels.
  363. Parameters
  364. ----------
  365. X : List[Any]
  366. Input samples.
  367. y : List[int], optional
  368. Target labels. If None, dummy labels are created, by default None.
  369. shuffle : bool, optional
  370. Whether to shuffle the data, by default True.
  371. Returns
  372. -------
  373. DataLoader
  374. A DataLoader providing batches of (X, y) pairs.
  375. """
  376. if X is None:
  377. raise ValueError("X should not be None.")
  378. if y is None:
  379. y = [0] * len(X)
  380. if not (len(y) == len(X)):
  381. raise ValueError("X and y should have equal length.")
  382. dataset = ClassificationDataset(X, y, transform=self.train_transform)
  383. data_loader = DataLoader(
  384. dataset,
  385. batch_size=self.batch_size,
  386. shuffle=shuffle,
  387. num_workers=int(self.num_workers),
  388. collate_fn=self.collate_fn,
  389. )
  390. return data_loader
  391. def save(self, epoch_id: int = 0, save_path: str = None) -> None:
  392. """
  393. Save the model and the optimizer. User can either provide a save_path or specify
  394. the epoch_id at which the model and optimizer is saved. if both save_path and
  395. epoch_id are provided, save_path will be used. If only epoch_id is specified,
  396. model and optimizer will be saved to the path f"model_checkpoint_epoch_{epoch_id}.pth"
  397. under self.save_dir. save_path and epoch_id can not be None simultaneously.
  398. Parameters
  399. ----------
  400. epoch_id : int
  401. The epoch id.
  402. save_path : str, optional
  403. The path to save the model, by default None.
  404. """
  405. if self.save_dir is None and save_path is None:
  406. raise ValueError("'save_dir' and 'save_path' should not be None simultaneously.")
  407. if save_path is not None:
  408. if not os.path.exists(os.path.dirname(save_path)):
  409. os.makedirs(os.path.dirname(save_path))
  410. else:
  411. save_path = os.path.join(self.save_dir, f"model_checkpoint_epoch_{epoch_id}.pth")
  412. if not os.path.exists(self.save_dir):
  413. os.makedirs(self.save_dir)
  414. print_log(f"Checkpoints will be saved to {save_path}", logger="current")
  415. save_parma_dic = {
  416. "model": self.model.state_dict(),
  417. "optimizer": self.optimizer.state_dict(),
  418. }
  419. torch.save(save_parma_dic, save_path)
  420. def load(self, load_path: str) -> None:
  421. """
  422. Load the model and the optimizer.
  423. Parameters
  424. ----------
  425. load_path : str
  426. The directory to load the model, by default "".
  427. """
  428. if load_path is None:
  429. raise ValueError("Load path should not be None.")
  430. print_log(
  431. f"Loads checkpoint by local backend from path: {load_path}",
  432. logger="current",
  433. )
  434. param_dic = torch.load(load_path)
  435. self.model.load_state_dict(param_dic["model"])
  436. if "optimizer" in param_dic.keys():
  437. self.optimizer.load_state_dict(param_dic["optimizer"])

An efficient Python toolkit for Abductive Learning (ABL), a novel paradigm that integrates machine learning and logical reasoning in a unified framework.