You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

basic_nn.py 19 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. from __future__ import annotations
  2. import logging
  3. import os
  4. from typing import Any, Callable, List, Optional, Tuple
  5. import numpy
  6. import torch
  7. from torch.utils.data import DataLoader
  8. from ..utils.logger import print_log
  9. from .torch_dataset import ClassificationDataset, PredictionDataset
  10. class BasicNN:
  11. """
  12. Wrap NN models into the form of an sklearn estimator.
  13. Parameters
  14. ----------
  15. model : torch.nn.Module
  16. The PyTorch model to be trained or used for prediction.
  17. loss_fn : torch.nn.Module
  18. The loss function used for training.
  19. optimizer : torch.optim.Optimizer
  20. The optimizer used for training.
  21. scheduler : Callable[..., Any], optional
  22. The learning rate scheduler used for training, which will be called
  23. at the end of each run of the ``fit`` method. It should implement the
  24. ``step`` method, by default None.
  25. device : torch.device, optional
  26. The device on which the model will be trained or used for prediction,
  27. by default torch.device("cpu").
  28. batch_size : int, optional
  29. The batch size used for training, by default 32.
  30. num_epochs : int, optional
  31. The number of epochs used for training, by default 1.
  32. stop_loss : float, optional
  33. The loss value at which to stop training, by default 0.0001.
  34. num_workers : int
  35. The number of workers used for loading data, by default 0.
  36. save_interval : int, optional
  37. The model will be saved every ``save_interval`` epochs during training, by default None.
  38. save_dir : str, optional
  39. The directory in which to save the model during training, by default None.
  40. train_transform : Callable[..., Any], optional
  41. A function/transform that takes an object and returns a transformed version used
  42. in the `fit` and `train_epoch` methods, by default None.
  43. test_transform : Callable[..., Any], optional
  44. A function/transform that takes an object and returns a transformed version in the
  45. `predict`, `predict_proba` and `score` methods, , by default None.
  46. collate_fn : Callable[[List[T]], Any], optional
  47. The function used to collate data, by default None.
  48. """
  49. def __init__(
  50. self,
  51. model: torch.nn.Module,
  52. loss_fn: torch.nn.Module,
  53. optimizer: torch.optim.Optimizer,
  54. scheduler: Optional[Callable[..., Any]] = None,
  55. device: torch.device = torch.device("cpu"),
  56. batch_size: int = 32,
  57. num_epochs: int = 1,
  58. stop_loss: Optional[float] = 0.0001,
  59. num_workers: int = 0,
  60. save_interval: Optional[int] = None,
  61. save_dir: Optional[str] = None,
  62. train_transform: Callable[..., Any] = None,
  63. test_transform: Callable[..., Any] = None,
  64. collate_fn: Callable[[List[Any]], Any] = None,
  65. ) -> None:
  66. if not isinstance(model, torch.nn.Module):
  67. raise TypeError("model must be an instance of torch.nn.Module")
  68. if not isinstance(loss_fn, torch.nn.Module):
  69. raise TypeError("loss_fn must be an instance of torch.nn.Module")
  70. if not isinstance(optimizer, torch.optim.Optimizer):
  71. raise TypeError("optimizer must be an instance of torch.optim.Optimizer")
  72. if scheduler is not None and not hasattr(scheduler, "step"):
  73. raise NotImplementedError("scheduler should implement the ``step`` method")
  74. if not isinstance(device, torch.device):
  75. raise TypeError("device must be an instance of torch.device")
  76. if not isinstance(batch_size, int):
  77. raise TypeError("batch_size must be an integer")
  78. if not isinstance(num_epochs, int):
  79. raise TypeError("num_epochs must be an integer")
  80. if stop_loss is not None and not isinstance(stop_loss, float):
  81. raise TypeError("stop_loss must be a float")
  82. if not isinstance(num_workers, int):
  83. raise TypeError("num_workers must be an integer")
  84. if save_interval is not None and not isinstance(save_interval, int):
  85. raise TypeError("save_interval must be an integer")
  86. if save_dir is not None and not isinstance(save_dir, str):
  87. raise TypeError("save_dir must be a string")
  88. if train_transform is not None and not callable(train_transform):
  89. raise TypeError("train_transform must be callable")
  90. if test_transform is not None and not callable(test_transform):
  91. raise TypeError("test_transform must be callable")
  92. if collate_fn is not None and not callable(collate_fn):
  93. raise TypeError("collate_fn must be callable")
  94. self.model = model.to(device)
  95. self.loss_fn = loss_fn
  96. self.optimizer = optimizer
  97. self.scheduler = scheduler
  98. self.device = device
  99. self.batch_size = batch_size
  100. self.num_epochs = num_epochs
  101. self.stop_loss = stop_loss
  102. self.num_workers = num_workers
  103. self.save_interval = save_interval
  104. self.save_dir = save_dir
  105. self.train_transform = train_transform
  106. self.test_transform = test_transform
  107. self.collate_fn = collate_fn
  108. if self.save_interval is not None and self.save_dir is None:
  109. raise ValueError("save_dir should not be None if save_interval is not None.")
  110. if self.train_transform is not None and self.test_transform is None:
  111. print_log(
  112. "Transform used in the training phase will be used in prediction.",
  113. logger="current",
  114. level=logging.WARNING,
  115. )
  116. self.test_transform = self.train_transform
  117. def _fit(self, data_loader: DataLoader) -> BasicNN:
  118. """
  119. Internal method to fit the model on data for ``self.num_epochs`` times,
  120. with early stopping.
  121. Parameters
  122. ----------
  123. data_loader : DataLoader
  124. Data loader providing training samples.
  125. Returns
  126. -------
  127. BasicNN
  128. The model itself after training.
  129. """
  130. if not isinstance(data_loader, DataLoader):
  131. raise TypeError(
  132. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  133. f"but got {type(data_loader)}"
  134. )
  135. for epoch in range(self.num_epochs):
  136. loss_value = self.train_epoch(data_loader)
  137. if self.save_interval is not None and (epoch + 1) % self.save_interval == 0:
  138. self.save(epoch + 1)
  139. if self.stop_loss is not None and loss_value < self.stop_loss:
  140. break
  141. if self.scheduler is not None:
  142. self.scheduler.step()
  143. print_log(f"model loss: {loss_value:.5f}", logger="current")
  144. return self
  145. def fit(
  146. self, data_loader: DataLoader = None, X: List[Any] = None, y: List[int] = None
  147. ) -> BasicNN:
  148. """
  149. Train the model for self.num_epochs times or until the average loss on one epoch
  150. is less than self.stop_loss. It supports training with either a DataLoader
  151. object (data_loader) or a pair of input data (X) and target labels (y). If both
  152. data_loader and (X, y) are provided, the method will prioritize using the data_loader.
  153. Parameters
  154. ----------
  155. data_loader : DataLoader, optional
  156. The data loader used for training, by default None.
  157. X : List[Any], optional
  158. The input data, by default None.
  159. y : List[int], optional
  160. The target data, by default None.
  161. Returns
  162. -------
  163. BasicNN
  164. The model itself after training.
  165. """
  166. if data_loader is not None and X is not None:
  167. print_log(
  168. "data_loader will be used to train the model instead of X and y.",
  169. logger="current",
  170. level=logging.WARNING,
  171. )
  172. if data_loader is None:
  173. if X is None:
  174. raise ValueError("data_loader and X can not be None simultaneously.")
  175. else:
  176. data_loader = self._data_loader(X, y)
  177. return self._fit(data_loader)
  178. def train_epoch(self, data_loader: DataLoader) -> float:
  179. """
  180. Train the model with an instance of DataLoader (data_loader) for one epoch.
  181. Parameters
  182. ----------
  183. data_loader : DataLoader
  184. The data loader used for training.
  185. Returns
  186. -------
  187. float
  188. The average loss on one epoch.
  189. """
  190. model = self.model
  191. loss_fn = self.loss_fn
  192. optimizer = self.optimizer
  193. device = self.device
  194. model.train()
  195. total_loss, total_num = 0.0, 0
  196. for data, target in data_loader:
  197. data, target = data.to(device), target.to(device)
  198. out = model(data)
  199. loss = loss_fn(out, target)
  200. optimizer.zero_grad()
  201. loss.backward()
  202. optimizer.step()
  203. total_loss += loss.item() * data.size(0)
  204. total_num += data.size(0)
  205. return total_loss / total_num
  206. def _predict(self, data_loader: DataLoader) -> torch.Tensor:
  207. """
  208. Internal method to predict the outputs given a DataLoader.
  209. Parameters
  210. ----------
  211. data_loader : DataLoader
  212. The DataLoader providing input samples.
  213. Returns
  214. -------
  215. torch.Tensor
  216. Raw output from the model.
  217. """
  218. if not isinstance(data_loader, DataLoader):
  219. raise TypeError(
  220. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  221. f"but got {type(data_loader)}"
  222. )
  223. model = self.model
  224. device = self.device
  225. model.eval()
  226. with torch.no_grad():
  227. results = []
  228. for data in data_loader:
  229. data = data.to(device)
  230. out = model(data)
  231. results.append(out)
  232. return torch.cat(results, axis=0)
  233. def predict(self, data_loader: DataLoader = None, X: List[Any] = None) -> numpy.ndarray:
  234. """
  235. Predict the class of the input data. This method supports prediction with either
  236. a DataLoader object (data_loader) or a list of input data (X). If both data_loader
  237. and X are provided, the method will predict the input data in data_loader
  238. instead of X.
  239. Parameters
  240. ----------
  241. data_loader : DataLoader, optional
  242. The data loader used for prediction, by default None.
  243. X : List[Any], optional
  244. The input data, by default None.
  245. Returns
  246. -------
  247. numpy.ndarray
  248. The predicted class of the input data.
  249. """
  250. if data_loader is not None and X is not None:
  251. print_log(
  252. "Predict the class of input data in data_loader instead of X.",
  253. logger="current",
  254. level=logging.WARNING,
  255. )
  256. if data_loader is None:
  257. dataset = PredictionDataset(X, self.test_transform)
  258. data_loader = DataLoader(
  259. dataset,
  260. batch_size=self.batch_size,
  261. num_workers=int(self.num_workers),
  262. collate_fn=self.collate_fn,
  263. )
  264. return self._predict(data_loader).argmax(axis=1).cpu().numpy()
  265. def predict_proba(self, data_loader: DataLoader = None, X: List[Any] = None) -> numpy.ndarray:
  266. """
  267. Predict the probability of each class for the input data. This method supports
  268. prediction with either a DataLoader object (data_loader) or a list of input data (X).
  269. If both data_loader and X are provided, the method will predict the input data in
  270. data_loader instead of X.
  271. Parameters
  272. ----------
  273. data_loader : DataLoader, optional
  274. The data loader used for prediction, by default None.
  275. X : List[Any], optional
  276. The input data, by default None.
  277. Returns
  278. -------
  279. numpy.ndarray
  280. The predicted probability of each class for the input data.
  281. """
  282. if data_loader is not None and X is not None:
  283. print_log(
  284. "Predict the class probability of input data in data_loader instead of X.",
  285. logger="current",
  286. level=logging.WARNING,
  287. )
  288. if data_loader is None:
  289. dataset = PredictionDataset(X, self.test_transform)
  290. data_loader = DataLoader(
  291. dataset,
  292. batch_size=self.batch_size,
  293. num_workers=int(self.num_workers),
  294. collate_fn=self.collate_fn,
  295. )
  296. return self._predict(data_loader).softmax(axis=1).cpu().numpy()
  297. def _score(self, data_loader: DataLoader) -> Tuple[float, float]:
  298. """
  299. Internal method to compute loss and accuracy for the data provided through a DataLoader.
  300. Parameters
  301. ----------
  302. data_loader : DataLoader
  303. Data loader to use for evaluation.
  304. Returns
  305. -------
  306. Tuple[float, float]
  307. mean_loss: float, The mean loss of the model on the provided data.
  308. accuracy: float, The accuracy of the model on the provided data.
  309. """
  310. if not isinstance(data_loader, DataLoader):
  311. raise TypeError(
  312. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  313. f"but got {type(data_loader)}"
  314. )
  315. model = self.model
  316. loss_fn = self.loss_fn
  317. device = self.device
  318. model.eval()
  319. total_correct_num, total_num, total_loss = 0, 0, 0.0
  320. with torch.no_grad():
  321. for data, target in data_loader:
  322. data, target = data.to(device), target.to(device)
  323. out = model(data)
  324. if len(out.shape) > 1:
  325. correct_num = (target == out.argmax(axis=1)).sum().item()
  326. else:
  327. correct_num = (target == (out > 0.5)).sum().item()
  328. loss = loss_fn(out, target)
  329. total_loss += loss.item() * data.size(0)
  330. total_correct_num += correct_num
  331. total_num += data.size(0)
  332. mean_loss = total_loss / total_num
  333. accuracy = total_correct_num / total_num
  334. return mean_loss, accuracy
  335. def score(
  336. self, data_loader: DataLoader = None, X: List[Any] = None, y: List[int] = None
  337. ) -> float:
  338. """
  339. Validate the model. It supports validation with either a DataLoader object (data_loader)
  340. or a pair of input data (X) and ground truth labels (y). If both data_loader and
  341. (X, y) are provided, the method will prioritize using the data_loader.
  342. Parameters
  343. ----------
  344. data_loader : DataLoader, optional
  345. The data loader used for scoring, by default None.
  346. X : List[Any], optional
  347. The input data, by default None.
  348. y : List[int], optional
  349. The target data, by default None.
  350. Returns
  351. -------
  352. float
  353. The accuracy of the model.
  354. """
  355. print_log("Start machine learning model validation", logger="current")
  356. if data_loader is not None and X is not None:
  357. print_log(
  358. "data_loader will be used to validate the model instead of X and y.",
  359. logger="current",
  360. level=logging.WARNING,
  361. )
  362. if data_loader is None:
  363. if X is None or y is None:
  364. raise ValueError("data_loader and (X, y) can not be None simultaneously.")
  365. else:
  366. data_loader = self._data_loader(X, y)
  367. mean_loss, accuracy = self._score(data_loader)
  368. print_log(f"mean loss: {mean_loss:.3f}, accuray: {accuracy:.3f}", logger="current")
  369. return accuracy
  370. def _data_loader(self, X: List[Any], y: List[int] = None, shuffle: bool = True) -> DataLoader:
  371. """
  372. Generate a DataLoader for user-provided input data and target labels.
  373. Parameters
  374. ----------
  375. X : List[Any]
  376. Input samples.
  377. y : List[int], optional
  378. Target labels. If None, dummy labels are created, by default None.
  379. shuffle : bool, optional
  380. Whether to shuffle the data, by default True.
  381. Returns
  382. -------
  383. DataLoader
  384. A DataLoader providing batches of (X, y) pairs.
  385. """
  386. if X is None:
  387. raise ValueError("X should not be None.")
  388. if y is None:
  389. y = [0] * len(X)
  390. if not (len(y) == len(X)):
  391. raise ValueError("X and y should have equal length.")
  392. dataset = ClassificationDataset(X, y, transform=self.train_transform)
  393. data_loader = DataLoader(
  394. dataset,
  395. batch_size=self.batch_size,
  396. shuffle=shuffle,
  397. num_workers=int(self.num_workers),
  398. collate_fn=self.collate_fn,
  399. )
  400. return data_loader
  401. def save(self, epoch_id: int = 0, save_path: str = None) -> None:
  402. """
  403. Save the model and the optimizer. User can either provide a save_path or specify
  404. the epoch_id at which the model and optimizer is saved. if both save_path and
  405. epoch_id are provided, save_path will be used. If only epoch_id is specified,
  406. model and optimizer will be saved to the path f"model_checkpoint_epoch_{epoch_id}.pth"
  407. under ``self.save_dir``. save_path and epoch_id can not be None simultaneously.
  408. Parameters
  409. ----------
  410. epoch_id : int
  411. The epoch id.
  412. save_path : str, optional
  413. The path to save the model, by default None.
  414. """
  415. if self.save_dir is None and save_path is None:
  416. raise ValueError("'save_dir' and 'save_path' should not be None simultaneously.")
  417. if save_path is not None:
  418. if not os.path.exists(os.path.dirname(save_path)):
  419. os.makedirs(os.path.dirname(save_path))
  420. else:
  421. save_path = os.path.join(self.save_dir, f"model_checkpoint_epoch_{epoch_id}.pth")
  422. if not os.path.exists(self.save_dir):
  423. os.makedirs(self.save_dir)
  424. print_log(f"Checkpoints will be saved to {save_path}", logger="current")
  425. save_parma_dic = {
  426. "model": self.model.state_dict(),
  427. "optimizer": self.optimizer.state_dict(),
  428. }
  429. torch.save(save_parma_dic, save_path)
  430. def load(self, load_path: str) -> None:
  431. """
  432. Load the model and the optimizer.
  433. Parameters
  434. ----------
  435. load_path : str
  436. The directory to load the model, by default "".
  437. """
  438. if load_path is None:
  439. raise ValueError("Load path should not be None.")
  440. print_log(
  441. f"Loads checkpoint by local backend from path: {load_path}",
  442. logger="current",
  443. )
  444. param_dic = torch.load(load_path)
  445. self.model.load_state_dict(param_dic["model"])
  446. if "optimizer" in param_dic.keys():
  447. self.optimizer.load_state_dict(param_dic["optimizer"])

An efficient Python toolkit for Abductive Learning (ABL), a novel paradigm that integrates machine learning and logical reasoning in a unified framework.