You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

master.py 4.1 kB

5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # -*- coding: UTF-8 -*-
  2. """
  3. Copyright 2020 Tianshu AI Platform. All Rights Reserved.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. =============================================================
  14. """
  15. import sys
  16. sys.path.append('../service_utils')
  17. from python_io import logfile_utils
  18. from python_io.lazy_load import LazyLoad
  19. from multiprocessing import Process
  20. from pathlib import Path
  21. from shutil import rmtree
  22. from utils.redis_utils import RedisInstance
  23. import json
  24. def load_logs(uid, run_dirs, cache_path):
  25. msg = '({}) starts successfully'.format(uid)
  26. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  27. {'code': 200, 'msg': msg}
  28. ))
  29. print(msg)
  30. for key, val in run_dirs.items():
  31. LazyLoad(key, val).init_load(uid=uid, cache_path=cache_path)
  32. def set_cache_path(cache_dir):
  33. cache_dir = Path(cache_dir).absolute()
  34. if cache_dir.exists():
  35. rmtree(cache_dir)
  36. return cache_dir.absolute()
  37. class Master:
  38. def __init__(self):
  39. self.file_parsers = {}
  40. self.r = RedisInstance
  41. self.r.flushdb()
  42. def set_parser(self, uid, log_dir, cache_dir):
  43. if Path(log_dir).exists():
  44. run_dirs = logfile_utils.get_runinfo(log_dir)
  45. if run_dirs:
  46. if uid in self.file_parsers.keys():
  47. msg = "User {} has already started".format(uid)
  48. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  49. {'code': 200, 'msg': msg}
  50. ))
  51. print(msg)
  52. return
  53. cache_path = set_cache_path(cache_dir)
  54. self.r.set(uid, str(cache_path))
  55. p = Process(target=load_logs,
  56. args=(uid, run_dirs, cache_path))
  57. p.start()
  58. self.file_parsers[uid] = p
  59. else:
  60. msg = 'No related logs found'
  61. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  62. {'code': 500, 'msg': msg}
  63. ))
  64. print(msg)
  65. else:
  66. msg = 'User does not exist or log path not found error: {}'\
  67. .format(log_dir)
  68. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  69. {'code': 500, 'msg': msg}
  70. ))
  71. print(msg)
  72. def kill_parser(self, uid):
  73. if uid in self.file_parsers.keys():
  74. cache_path = Path(self.r.get(uid))
  75. self.file_parsers[uid].terminate()
  76. # 清除redis缓存
  77. for key in self.r.keys(uid + '*'):
  78. self.r.delete(key)
  79. import time
  80. time.sleep(2) # 等待file_parsers线程关闭
  81. if not self.file_parsers[uid].is_alive():
  82. self.file_parsers.pop(uid)
  83. if cache_path.exists():
  84. rmtree(cache_path)
  85. print('({}) terminates successfully'.format(uid))
  86. def run_server(self):
  87. while True:
  88. _, request = self.r.brpop('sessions')
  89. request = json.loads(request)
  90. if request['type'] == 'run':
  91. uid = request['uid']
  92. logdir = request['logdir']
  93. cachedir = request['cachedir']
  94. self.set_parser(uid, logdir, cachedir)
  95. elif request['type'] == 'kill':
  96. uid = request['uid']
  97. self.kill_parser(uid)
  98. else:
  99. print('Unrecognized request')
  100. if __name__ == '__main__':
  101. # logdir = '../../demo_logs'
  102. # Master().set_parser('a',logdir)
  103. print("Master running...")
  104. Master().run_server()

一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库、视觉模型炼知平台、数据可视化分析平台等一系列平台及工具,在模型高效分布式训练、数据处理和可视分析、模型炼知和轻量化等技术上形成独特优势,目前已在产学研等各领域近千家单位及个人提供AI应用赋能