|
- #!/usr/bin/env python3
- # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
- # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
-
-
- class TaskResult:
- def __init__(self, name):
- self._name = name
- self._status = ITask.NONE
- self._output = {}
-
- @property
- def name(self) -> str:
- return self._name
-
- @property
- def output(self) -> dict:
- return self._output
-
- @output.setter
- def output(self, val: dict) -> None:
- self._output = val
-
- @property
- def status(self) -> int:
- return self._status
-
- @status.setter
- def status(self, val: int) -> None:
- self._status = val
-
- class ITask:
-
- (NONE, ERROR, RUNNING, DONE) = range(4)
-
- def __init__(self) -> None:
- self._status = ITask.NONE
- self._output = {}
-
- def name(self) -> str:
- """Use the class name as the default task name"""
- return self.__class__.__name__
-
- def status(self) -> int:
- return self._status
-
- def output(self) -> dict:
- # Return a dict to ddescribe the output
- return self._output
-
- def next_tasks(self) -> list:
- return []
-
- def prev_tasks(self) -> list:
- return []
-
- def update(self, tr: TaskResult):
- self._status = tr.status
- self._output.update(tr.output)
-
- def run(self, params: list) -> TaskResult:
- raise NotImplementedError()
-
- def __str__(self) -> str:
- return f"prev_tasks: {self.prev_tasks()}, next_tasks: {self.next_tasks()}"
-
- __repr__ = __str__
|