You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

basic_nn.py 20 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. from __future__ import annotations
  2. import logging
  3. import os
  4. from typing import Any, Callable, List, Optional, Tuple, Union
  5. import numpy
  6. import torch
  7. from torch.utils.data import DataLoader
  8. from ..utils.logger import print_log
  9. from .torch_dataset import ClassificationDataset, PredictionDataset
  10. class BasicNN:
  11. """
  12. Wrap NN models into the form of an sklearn estimator.
  13. Parameters
  14. ----------
  15. model : torch.nn.Module
  16. The PyTorch model to be trained or used for prediction.
  17. loss_fn : torch.nn.Module
  18. The loss function used for training.
  19. optimizer : torch.optim.Optimizer
  20. The optimizer used for training.
  21. scheduler : Callable[..., Any], optional
  22. The learning rate scheduler used for training, which will be called
  23. at the end of each run of the ``fit`` method. It should implement the
  24. ``step`` method, by default None.
  25. device : Union[torch.device, str]
  26. The device on which the model will be trained or used for prediction,
  27. by default torch.device("cpu").
  28. batch_size : int, optional
  29. The batch size used for training, by default 32.
  30. num_epochs : int, optional
  31. The number of epochs used for training, by default 1.
  32. stop_loss : float, optional
  33. The loss value at which to stop training, by default 0.0001.
  34. num_workers : int
  35. The number of workers used for loading data, by default 0.
  36. save_interval : int, optional
  37. The model will be saved every ``save_interval`` epoch during training, by default None.
  38. save_dir : str, optional
  39. The directory in which to save the model during training, by default None.
  40. train_transform : Callable[..., Any], optional
  41. A function/transform that takes an object and returns a transformed version used
  42. in the ``fit`` and ``train_epoch`` methods, by default None.
  43. test_transform : Callable[..., Any], optional
  44. A function/transform that takes an object and returns a transformed version in the
  45. ``predict``, ``predict_proba`` and ``score`` methods, , by default None.
  46. collate_fn : Callable[[List[T]], Any], optional
  47. The function used to collate data, by default None.
  48. """
  49. def __init__(
  50. self,
  51. model: torch.nn.Module,
  52. loss_fn: torch.nn.Module,
  53. optimizer: torch.optim.Optimizer,
  54. scheduler: Optional[Callable[..., Any]] = None,
  55. device: Union[torch.device, str] = torch.device("cpu"),
  56. batch_size: int = 32,
  57. num_epochs: int = 1,
  58. stop_loss: Optional[float] = 0.0001,
  59. num_workers: int = 0,
  60. save_interval: Optional[int] = None,
  61. save_dir: Optional[str] = None,
  62. train_transform: Optional[Callable[..., Any]] = None,
  63. test_transform: Optional[Callable[..., Any]] = None,
  64. collate_fn: Optional[Callable[[List[Any]], Any]] = None,
  65. ) -> None:
  66. if not isinstance(model, torch.nn.Module):
  67. raise TypeError("model must be an instance of torch.nn.Module")
  68. if not isinstance(loss_fn, torch.nn.Module):
  69. raise TypeError("loss_fn must be an instance of torch.nn.Module")
  70. if not isinstance(optimizer, torch.optim.Optimizer):
  71. raise TypeError("optimizer must be an instance of torch.optim.Optimizer")
  72. if scheduler is not None and not hasattr(scheduler, "step"):
  73. raise NotImplementedError("scheduler should implement the ``step`` method")
  74. if not isinstance(device, torch.device):
  75. if not isinstance(device, str):
  76. raise TypeError(
  77. "device must be an instance of torch.device or a str indicating "
  78. + "the target device"
  79. )
  80. else:
  81. device = torch.device(device)
  82. if not isinstance(batch_size, int):
  83. raise TypeError("batch_size must be an integer")
  84. if not isinstance(num_epochs, int):
  85. raise TypeError("num_epochs must be an integer")
  86. if stop_loss is not None and not isinstance(stop_loss, float):
  87. raise TypeError("stop_loss must be a float")
  88. if not isinstance(num_workers, int):
  89. raise TypeError("num_workers must be an integer")
  90. if save_interval is not None and not isinstance(save_interval, int):
  91. raise TypeError("save_interval must be an integer")
  92. if save_dir is not None and not isinstance(save_dir, str):
  93. raise TypeError("save_dir must be a string")
  94. if train_transform is not None and not callable(train_transform):
  95. raise TypeError("train_transform must be callable")
  96. if test_transform is not None and not callable(test_transform):
  97. raise TypeError("test_transform must be callable")
  98. if collate_fn is not None and not callable(collate_fn):
  99. raise TypeError("collate_fn must be callable")
  100. self.model = model.to(device)
  101. self.loss_fn = loss_fn
  102. self.optimizer = optimizer
  103. self.scheduler = scheduler
  104. self.device = device
  105. self.batch_size = batch_size
  106. self.num_epochs = num_epochs
  107. self.stop_loss = stop_loss
  108. self.num_workers = num_workers
  109. self.save_interval = save_interval
  110. self.save_dir = save_dir
  111. self.train_transform = train_transform
  112. self.test_transform = test_transform
  113. self.collate_fn = collate_fn
  114. if self.save_interval is not None and self.save_dir is None:
  115. raise ValueError("save_dir should not be None if save_interval is not None.")
  116. if self.train_transform is not None and self.test_transform is None:
  117. print_log(
  118. "Transform used in the training phase will be used in prediction.",
  119. logger="current",
  120. level=logging.WARNING,
  121. )
  122. self.test_transform = self.train_transform
  123. def _fit(self, data_loader: DataLoader) -> BasicNN:
  124. """
  125. Internal method to fit the model on data for ``self.num_epochs`` times,
  126. with early stopping.
  127. Parameters
  128. ----------
  129. data_loader : DataLoader
  130. Data loader providing training samples.
  131. Returns
  132. -------
  133. BasicNN
  134. The model itself after training.
  135. """
  136. if not isinstance(data_loader, DataLoader):
  137. raise TypeError(
  138. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  139. f"but got {type(data_loader)}"
  140. )
  141. for epoch in range(self.num_epochs):
  142. loss_value = self.train_epoch(data_loader)
  143. if self.save_interval is not None and (epoch + 1) % self.save_interval == 0:
  144. self.save(epoch + 1)
  145. if self.stop_loss is not None and loss_value < self.stop_loss:
  146. break
  147. if self.scheduler is not None:
  148. self.scheduler.step()
  149. print_log(f"model loss: {loss_value:.5f}", logger="current")
  150. return self
  151. def fit(
  152. self,
  153. data_loader: Optional[DataLoader] = None,
  154. X: Optional[List[Any]] = None,
  155. y: Optional[List[int]] = None,
  156. ) -> BasicNN:
  157. """
  158. Train the model for self.num_epochs times or until the average loss on one epoch
  159. is less than self.stop_loss. It supports training with either a DataLoader
  160. object (data_loader) or a pair of input data (X) and target labels (y). If both
  161. data_loader and (X, y) are provided, the method will prioritize using the data_loader.
  162. Parameters
  163. ----------
  164. data_loader : DataLoader, optional
  165. The data loader used for training, by default None.
  166. X : List[Any], optional
  167. The input data, by default None.
  168. y : List[int], optional
  169. The target data, by default None.
  170. Returns
  171. -------
  172. BasicNN
  173. The model itself after training.
  174. """
  175. if data_loader is not None and X is not None:
  176. print_log(
  177. "data_loader will be used to train the model instead of X and y.",
  178. logger="current",
  179. level=logging.WARNING,
  180. )
  181. if data_loader is None:
  182. if X is None:
  183. raise ValueError("data_loader and X can not be None simultaneously.")
  184. else:
  185. data_loader = self._data_loader(X, y)
  186. return self._fit(data_loader)
  187. def train_epoch(self, data_loader: DataLoader) -> float:
  188. """
  189. Train the model with an instance of DataLoader (data_loader) for one epoch.
  190. Parameters
  191. ----------
  192. data_loader : DataLoader
  193. The data loader used for training.
  194. Returns
  195. -------
  196. float
  197. The average loss on one epoch.
  198. """
  199. model = self.model
  200. loss_fn = self.loss_fn
  201. optimizer = self.optimizer
  202. device = self.device
  203. model.train()
  204. total_loss, total_num = 0.0, 0
  205. for data, target in data_loader:
  206. data, target = data.to(device), target.to(device)
  207. out = model(data)
  208. loss = loss_fn(out, target)
  209. optimizer.zero_grad()
  210. loss.backward()
  211. optimizer.step()
  212. total_loss += loss.item() * data.size(0)
  213. total_num += data.size(0)
  214. return total_loss / total_num
  215. def _predict(self, data_loader: DataLoader) -> torch.Tensor:
  216. """
  217. Internal method to predict the outputs given a DataLoader.
  218. Parameters
  219. ----------
  220. data_loader : DataLoader
  221. The DataLoader providing input samples.
  222. Returns
  223. -------
  224. torch.Tensor
  225. Raw output from the model.
  226. """
  227. if not isinstance(data_loader, DataLoader):
  228. raise TypeError(
  229. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  230. f"but got {type(data_loader)}"
  231. )
  232. model = self.model
  233. device = self.device
  234. model.eval()
  235. with torch.no_grad():
  236. results = []
  237. for data in data_loader:
  238. data = data.to(device)
  239. out = model(data)
  240. results.append(out)
  241. return torch.cat(results, axis=0)
  242. def predict(
  243. self,
  244. data_loader: Optional[DataLoader] = None,
  245. X: Optional[List[Any]] = None,
  246. ) -> numpy.ndarray:
  247. """
  248. Predict the class of the input data. This method supports prediction with either
  249. a DataLoader object (data_loader) or a list of input data (X). If both data_loader
  250. and X are provided, the method will predict the input data in data_loader
  251. instead of X.
  252. Parameters
  253. ----------
  254. data_loader : DataLoader, optional
  255. The data loader used for prediction, by default None.
  256. X : List[Any], optional
  257. The input data, by default None.
  258. Returns
  259. -------
  260. numpy.ndarray
  261. The predicted class of the input data.
  262. """
  263. if data_loader is not None and X is not None:
  264. print_log(
  265. "Predict the class of input data in data_loader instead of X.",
  266. logger="current",
  267. level=logging.WARNING,
  268. )
  269. if data_loader is None:
  270. dataset = PredictionDataset(X, self.test_transform)
  271. data_loader = DataLoader(
  272. dataset,
  273. batch_size=self.batch_size,
  274. num_workers=self.num_workers,
  275. collate_fn=self.collate_fn,
  276. pin_memory=torch.cuda.is_available(),
  277. )
  278. return self._predict(data_loader).argmax(axis=1).cpu().numpy()
  279. def predict_proba(
  280. self,
  281. data_loader: Optional[DataLoader] = None,
  282. X: Optional[List[Any]] = None,
  283. ) -> numpy.ndarray:
  284. """
  285. Predict the probability of each class for the input data. This method supports
  286. prediction with either a DataLoader object (data_loader) or a list of input data (X).
  287. If both data_loader and X are provided, the method will predict the input data in
  288. data_loader instead of X.
  289. Parameters
  290. ----------
  291. data_loader : DataLoader, optional
  292. The data loader used for prediction, by default None.
  293. X : List[Any], optional
  294. The input data, by default None.
  295. Returns
  296. -------
  297. numpy.ndarray
  298. The predicted probability of each class for the input data.
  299. """
  300. if data_loader is not None and X is not None:
  301. print_log(
  302. "Predict the class probability of input data in data_loader instead of X.",
  303. logger="current",
  304. level=logging.WARNING,
  305. )
  306. if data_loader is None:
  307. dataset = PredictionDataset(X, self.test_transform)
  308. data_loader = DataLoader(
  309. dataset,
  310. batch_size=self.batch_size,
  311. num_workers=self.num_workers,
  312. collate_fn=self.collate_fn,
  313. pin_memory=torch.cuda.is_available(),
  314. )
  315. return self._predict(data_loader).softmax(axis=1).cpu().numpy()
  316. def _score(self, data_loader: DataLoader) -> Tuple[float, float]:
  317. """
  318. Internal method to compute loss and accuracy for the data provided through a DataLoader.
  319. Parameters
  320. ----------
  321. data_loader : DataLoader
  322. Data loader to use for evaluation.
  323. Returns
  324. -------
  325. Tuple[float, float]
  326. mean_loss: float, The mean loss of the model on the provided data.
  327. accuracy: float, The accuracy of the model on the provided data.
  328. """
  329. if not isinstance(data_loader, DataLoader):
  330. raise TypeError(
  331. f"data_loader must be an instance of torch.utils.data.DataLoader, "
  332. f"but got {type(data_loader)}"
  333. )
  334. model = self.model
  335. loss_fn = self.loss_fn
  336. device = self.device
  337. model.eval()
  338. total_correct_num, total_num, total_loss = 0, 0, 0.0
  339. with torch.no_grad():
  340. for data, target in data_loader:
  341. data, target = data.to(device), target.to(device)
  342. out = model(data)
  343. if len(out.shape) > 1:
  344. correct_num = (target == out.argmax(axis=1)).sum().item()
  345. else:
  346. correct_num = (target == (out > 0.5)).sum().item()
  347. loss = loss_fn(out, target)
  348. total_loss += loss.item() * data.size(0)
  349. total_correct_num += correct_num
  350. total_num += data.size(0)
  351. mean_loss = total_loss / total_num
  352. accuracy = total_correct_num / total_num
  353. return mean_loss, accuracy
  354. def score(
  355. self,
  356. data_loader: Optional[DataLoader] = None,
  357. X: Optional[List[Any]] = None,
  358. y: Optional[List[int]] = None,
  359. ) -> float:
  360. """
  361. Validate the model. It supports validation with either a DataLoader object (data_loader)
  362. or a pair of input data (X) and ground truth labels (y). If both data_loader and
  363. (X, y) are provided, the method will prioritize using the data_loader.
  364. Parameters
  365. ----------
  366. data_loader : DataLoader, optional
  367. The data loader used for scoring, by default None.
  368. X : List[Any], optional
  369. The input data, by default None.
  370. y : List[int], optional
  371. The target data, by default None.
  372. Returns
  373. -------
  374. float
  375. The accuracy of the model.
  376. """
  377. print_log("Start machine learning model validation", logger="current")
  378. if data_loader is not None and X is not None:
  379. print_log(
  380. "data_loader will be used to validate the model instead of X and y.",
  381. logger="current",
  382. level=logging.WARNING,
  383. )
  384. if data_loader is None:
  385. if X is None or y is None:
  386. raise ValueError("data_loader and (X, y) can not be None simultaneously.")
  387. else:
  388. data_loader = self._data_loader(X, y)
  389. mean_loss, accuracy = self._score(data_loader)
  390. print_log(f"mean loss: {mean_loss:.3f}, accuray: {accuracy:.3f}", logger="current")
  391. return accuracy
  392. def _data_loader(
  393. self,
  394. X: Optional[List[Any]],
  395. y: Optional[List[int]] = None,
  396. shuffle: Optional[bool] = True,
  397. ) -> DataLoader:
  398. """
  399. Generate a DataLoader for user-provided input data and target labels.
  400. Parameters
  401. ----------
  402. X : List[Any]
  403. Input samples.
  404. y : List[int], optional
  405. Target labels. If None, dummy labels are created, by default None.
  406. shuffle : bool, optional
  407. Whether to shuffle the data, by default True.
  408. Returns
  409. -------
  410. DataLoader
  411. A DataLoader providing batches of (X, y) pairs.
  412. """
  413. if X is None:
  414. raise ValueError("X should not be None.")
  415. if y is None:
  416. y = [0] * len(X)
  417. if not (len(y) == len(X)):
  418. raise ValueError("X and y should have equal length.")
  419. dataset = ClassificationDataset(X, y, transform=self.train_transform)
  420. data_loader = DataLoader(
  421. dataset,
  422. batch_size=self.batch_size,
  423. shuffle=shuffle,
  424. num_workers=self.num_workers,
  425. collate_fn=self.collate_fn,
  426. pin_memory=torch.cuda.is_available(),
  427. )
  428. return data_loader
  429. def save(self, epoch_id: int = 0, save_path: Optional[str] = None) -> None:
  430. """
  431. Save the model and the optimizer. User can either provide a save_path or specify
  432. the epoch_id at which the model and optimizer is saved. if both save_path and
  433. epoch_id are provided, save_path will be used. If only epoch_id is specified,
  434. model and optimizer will be saved to the path f"model_checkpoint_epoch_{epoch_id}.pth"
  435. under ``self.save_dir``. save_path and epoch_id can not be None simultaneously.
  436. Parameters
  437. ----------
  438. epoch_id : int
  439. The epoch id.
  440. save_path : str, optional
  441. The path to save the model, by default None.
  442. """
  443. if self.save_dir is None and save_path is None:
  444. raise ValueError("'save_dir' and 'save_path' should not be None simultaneously.")
  445. if save_path is not None:
  446. if not os.path.exists(os.path.dirname(save_path)):
  447. os.makedirs(os.path.dirname(save_path))
  448. else:
  449. save_path = os.path.join(self.save_dir, f"model_checkpoint_epoch_{epoch_id}.pth")
  450. if not os.path.exists(self.save_dir):
  451. os.makedirs(self.save_dir)
  452. print_log(f"Checkpoints will be saved to {save_path}", logger="current")
  453. save_parma_dic = {
  454. "model": self.model.state_dict(),
  455. "optimizer": self.optimizer.state_dict(),
  456. }
  457. torch.save(save_parma_dic, save_path)
  458. def load(self, load_path: str) -> None:
  459. """
  460. Load the model and the optimizer.
  461. Parameters
  462. ----------
  463. load_path : str
  464. The directory to load the model, by default "".
  465. """
  466. if load_path is None:
  467. raise ValueError("Load path should not be None.")
  468. print_log(
  469. f"Loads checkpoint by local backend from path: {load_path}",
  470. logger="current",
  471. )
  472. param_dic = torch.load(load_path)
  473. self.model.load_state_dict(param_dic["model"])
  474. if "optimizer" in param_dic.keys():
  475. self.optimizer.load_state_dict(param_dic["optimizer"])

An efficient Python toolkit for Abductive Learning (ABL), a novel paradigm that integrates machine learning and logical reasoning in a unified framework.