# -*- coding: UTF-8 -*- """ Copyright 2020 Tianshu AI Platform. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ============================================================= """ import sys sys.path.append('../service_utils') from python_io import logfile_utils from python_io.lazy_load import LazyLoad from multiprocessing import Process from pathlib import Path from shutil import rmtree from utils.redis_utils import RedisInstance import json def load_logs(uid, run_dirs, cache_path): msg = '({}) starts successfully'.format(uid) RedisInstance.lpush('parser_statu' + uid, json.dumps( {'code': 200, 'msg': msg} )) print(msg) for key, val in run_dirs.items(): LazyLoad(key, val).init_load(uid=uid, cache_path=cache_path) def set_cache_path(cache_dir): cache_dir = Path(cache_dir).absolute() if cache_dir.exists(): rmtree(cache_dir) return cache_dir.absolute() class Master: def __init__(self): self.file_parsers = {} self.r = RedisInstance self.r.flushdb() def set_parser(self, uid, log_dir, cache_dir): if Path(log_dir).exists(): run_dirs = logfile_utils.get_runinfo(log_dir) if run_dirs: if uid in self.file_parsers.keys(): msg = "User {} has already started".format(uid) RedisInstance.lpush('parser_statu' + uid, json.dumps( {'code': 200, 'msg': msg} )) print(msg) return cache_path = set_cache_path(cache_dir) self.r.set(uid, str(cache_path)) p = Process(target=load_logs, args=(uid, run_dirs, cache_path)) p.start() self.file_parsers[uid] = p else: msg = 'No related logs found' RedisInstance.lpush('parser_statu' + uid, json.dumps( {'code': 500, 'msg': msg} )) print(msg) else: msg = 'User does not exist or log path not found error: {}'\ .format(log_dir) RedisInstance.lpush('parser_statu' + uid, json.dumps( {'code': 500, 'msg': msg} )) print(msg) def kill_parser(self, uid): if uid in self.file_parsers.keys(): cache_path = Path(self.r.get(uid)) self.file_parsers[uid].terminate() # 清除redis缓存 for key in self.r.keys(uid + '*'): self.r.delete(key) import time time.sleep(2) # 等待file_parsers线程关闭 if not self.file_parsers[uid].is_alive(): self.file_parsers.pop(uid) if cache_path.exists(): rmtree(cache_path) print('({}) terminates successfully'.format(uid)) def run_server(self): while True: _, request = self.r.brpop('sessions') request = json.loads(request) if request['type'] == 'run': uid = request['uid'] logdir = request['logdir'] cachedir = request['cachedir'] self.set_parser(uid, logdir, cachedir) elif request['type'] == 'kill': uid = request['uid'] self.kill_parser(uid) else: print('Unrecognized request') if __name__ == '__main__': # logdir = '../../demo_logs' # Master().set_parser('a',logdir) print("Master running...") Master().run_server()