You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

master.py 4.2 kB

5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. """
  2. /**
  3. * Copyright 2020 Zhejiang Lab. All Rights Reserved.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. * =============================================================
  17. */
  18. """
  19. # -*- coding: UTF-8 -*-
  20. import sys
  21. sys.path.append('../service_utils')
  22. from python_io import logfile_utils
  23. from python_io.lazy_load import LazyLoad
  24. from multiprocessing import Process
  25. from pathlib import Path
  26. from shutil import rmtree
  27. from utils.redis_utils import RedisInstance
  28. import json
  29. def load_logs(uid, run_dirs, cache_path):
  30. msg = '({}) starts successfully'.format(uid)
  31. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  32. {'code': 200, 'msg': msg}
  33. ))
  34. print(msg)
  35. for key, val in run_dirs.items():
  36. LazyLoad(key, val).init_load(uid=uid, cache_path=cache_path)
  37. def set_cache_path(cache_dir):
  38. cache_dir = Path(cache_dir).absolute()
  39. if cache_dir.exists():
  40. rmtree(cache_dir)
  41. return cache_dir.absolute()
  42. class Master:
  43. def __init__(self):
  44. self.file_parsers = {}
  45. self.r = RedisInstance
  46. self.r.flushdb()
  47. def set_parser(self, uid, log_dir, cache_dir):
  48. if Path(log_dir).exists():
  49. run_dirs = logfile_utils.get_runinfo(log_dir)
  50. if run_dirs:
  51. if uid in self.file_parsers.keys():
  52. msg = "User {} has already started".format(uid)
  53. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  54. {'code': 200, 'msg': msg}
  55. ))
  56. print(msg)
  57. return
  58. cache_path = set_cache_path(cache_dir)
  59. self.r.set(uid, str(cache_path))
  60. p = Process(target=load_logs,
  61. args=(uid, run_dirs, cache_path))
  62. p.start()
  63. self.file_parsers[uid] = p
  64. else:
  65. msg = 'No related logs found'
  66. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  67. {'code': 500, 'msg': msg}
  68. ))
  69. print(msg)
  70. else:
  71. msg = 'User does not exist or log path not found error: {}'\
  72. .format(log_dir)
  73. RedisInstance.lpush('parser_statu' + uid, json.dumps(
  74. {'code': 500, 'msg': msg}
  75. ))
  76. print(msg)
  77. def kill_parser(self, uid):
  78. if uid in self.file_parsers.keys():
  79. cache_path = Path(self.r.get(uid))
  80. self.file_parsers[uid].terminate()
  81. # 清除redis缓存
  82. for key in self.r.keys(uid + '*'):
  83. self.r.delete(key)
  84. import time
  85. time.sleep(2) # 等待file_parsers线程关闭
  86. if not self.file_parsers[uid].is_alive():
  87. self.file_parsers.pop(uid)
  88. if cache_path.exists():
  89. rmtree(cache_path)
  90. print('({}) terminates successfully'.format(uid))
  91. def run_server(self):
  92. while True:
  93. _, request = self.r.brpop('sessions')
  94. request = json.loads(request)
  95. if request['type'] == 'run':
  96. uid = request['uid']
  97. logdir = request['logdir']
  98. cachedir = request['cachedir']
  99. self.set_parser(uid, logdir, cachedir)
  100. elif request['type'] == 'kill':
  101. uid = request['uid']
  102. self.kill_parser(uid)
  103. else:
  104. print('Unrecognized request')
  105. if __name__ == '__main__':
  106. # logdir = '../../demo_logs'
  107. # Master().set_parser('a',logdir)
  108. print("Master running...")
  109. Master().run_server()

一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库、视觉模型炼知平台、数据可视化分析平台等一系列平台及工具,在模型高效分布式训练、数据处理和可视分析、模型炼知和轻量化等技术上形成独特优势,目前已在产学研等各领域近千家单位及个人提供AI应用赋能

Contributors (1)