|
- #!/usr/bin/env python3
- # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
- # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
-
- import docker
-
- class DockerResult:
- def __init__(self, code, result):
- self._code = code
- self._result = result
-
- @property
- def code(self):
- return self._code
-
- @property
- def result(self):
- if isinstance(self._result, tuple):
- return self._result.output
- return self._result
-
- def __str__(self):
- return "DockerResult(code: {},result: {})".format(self.code, self.result)
-
- def __repr__(self):
- return self.__str__()
-
- class DockerUtil:
- def __init__(self, debug=False):
- try:
- self._client = docker.from_env()
- except:
- self._client = None
- self._debug = debug
- self._ctr = None
-
- def refresh_client(self):
- if self._client:
- self._client.close()
- del self._client
- self._client = docker.from_env()
-
- def available(self):
- return self._client != None
-
- def has_image(self, image_name):
- if self._client == None:
- return False
-
- try:
- image = self._client.images.get(image_name)
- if self._debug:
- print("Image <{}>(id: {}) founded".format(image_name, image.id))
- return True
- except:
- if self._debug:
- print("Image <{}> is not existent!".format(image_name))
- return False
-
- def get(self, container_name):
- if self._ctr == None:
- try:
- self._ctr = self._client.containers.get(container_name)
- except:
- pass
- elif self._ctr.name != container_name:
- try:
- self._ctr = self._client.containers.get(container_name)
- except:
- pass
-
- return self
-
-
- def __getattr__(self, attr_str):
- def func(*list_params, **dict_params):
- if self._client == None or self._ctr == None:
- return DockerResult(-128, "Docker client or container object is invalid")
-
- if not hasattr(self._ctr, attr_str):
- return DockerResult(-127, "Invalid container attribute")
-
- cmd = getattr(self._ctr, attr_str)
-
- if callable(cmd):
- try:
- return DockerResult(0, cmd(*list_params, **dict_params))
- except:
- return DockerResult(-1, "Failed to run function {}".format(attr_str))
- else:
- return DockerResult(0, cmd)
-
- return func
-
- def __setattr__(self, s, v):
- self.__dict__[s] = v
-
- if __name__ == '__main__':
- du = DockerUtil()
-
- if du.available():
- print("pyfastapi:3.8 -> ", du.has_image('pyfastapi:3.8'))
- r = du.get('jenkins').exec_run("/bin/bash -c 'echo $PATH'")
- print(r)
- print("Status: ", du.get("jenkins").status().result)
- else:
- print("Docker is not available")
|