You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

logfile_loader.py 8.3 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # -*- coding: UTF-8 -*-
  2. """
  3. Copyright 2020 Tianshu AI Platform. All Rights Reserved.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. =============================================================
  14. """
  15. import threading
  16. import time
  17. import json
  18. from io import BytesIO
  19. from pathlib import Path
  20. from tbparser import SummaryReader
  21. from tbparser import Projector_Reader
  22. from utils.cache_io import CacheIO
  23. from utils.path_utils import path_parser
  24. from utils.redis_utils import RedisInstance
  25. import pickle
  26. class Trace_Thread(threading.Thread):
  27. def __init__(self, runname, filename, current_size, uid, cache_path):
  28. threading.Thread.__init__(self, name=filename.name)
  29. self.uid = uid
  30. self.runname = runname
  31. self.cache_path = cache_path
  32. self.filename = filename
  33. self.current_size = current_size
  34. self.r = RedisInstance
  35. # 该日志中是否有超参数
  36. self.has_hparams = False
  37. self.first_write = False
  38. self.metrics = []
  39. # 是否完成初始化
  40. self._finish_init = 0
  41. self.redis_tag = []
  42. def run(self):
  43. print('监听文件 %s' % self.filename)
  44. self.trace(self.current_size)
  45. def trace(self, current_size):
  46. filename = Path(self.filename)
  47. if filename.suffix == ".json":
  48. self.load_model_file(filename)
  49. self.finish_init = 1
  50. return
  51. f = open(filename, "rb")
  52. # for event file
  53. if "event" in filename.name:
  54. _io = BytesIO(
  55. f.read(current_size)
  56. )
  57. self.load_event_file(_io)
  58. # 设置初始化完成标志
  59. self.finish_init = 1
  60. while True:
  61. rest = f.read()
  62. if not rest:
  63. time.sleep(2)
  64. continue
  65. _io = BytesIO(rest)
  66. self.load_event_file(_io)
  67. # for projector file
  68. elif "projector" in filename.name:
  69. self.load_projector_file(f)
  70. # 设置初始化完成标志
  71. self.finish_init = 1
  72. @property
  73. def finish_init(self):
  74. return self._finish_init
  75. # 设置标志
  76. @finish_init.setter
  77. def finish_init(self, is_finish):
  78. self.r.set("{}_{}_{}_is_finish".format(self.uid, self.runname,
  79. self.filename.name), 1)
  80. print(self.name + " is finish")
  81. self._finish_init = is_finish
  82. def set_redis_key(self, type, tag, file_path):
  83. _key = self.uid + '_' + self.runname + '_' + type + '_' + tag
  84. if _key in self.redis_tag:
  85. pass
  86. else:
  87. self.r.set(_key, str(file_path))
  88. self.redis_tag.append(_key)
  89. def set_cache(self, file_name, data):
  90. if not file_name.parent.exists():
  91. file_name.parent.mkdir(parents=True, exist_ok=True)
  92. with open(file_name, 'ab') as f:
  93. pickle.dump(data, f)
  94. f.close()
  95. def load_event_file(self, fileIO):
  96. reader = SummaryReader(fileIO, types=[
  97. 'scalar',
  98. 'graph',
  99. 'hist',
  100. 'text',
  101. 'image',
  102. 'audio',
  103. 'hparams'
  104. ])
  105. for items in reader:
  106. if items.type == "graph":
  107. file_path = path_parser(self.cache_path, self.runname,
  108. items.type, tag='c_graph')
  109. CacheIO(file_path).set_cache(data=items.value)
  110. self.set_redis_key(items.type, tag='c_graph',
  111. file_path=file_path)
  112. continue
  113. elif items.type == "hparams":
  114. file_path = path_parser(self.cache_path, self.runname,
  115. type='hyperparm',
  116. tag='hparams')
  117. self.set_cache(file_name=file_path, data=items.value)
  118. self.set_redis_key(type='hyperparm',
  119. tag='hparams',
  120. file_path=file_path)
  121. continue
  122. item_data = {
  123. 'step': items.step,
  124. 'wall_time': items.wall_time,
  125. 'value': items.value,
  126. 'type': items.type
  127. }
  128. file_path = path_parser(self.cache_path, self.runname,
  129. type=items.type,
  130. tag=items.tag)
  131. CacheIO(file_path).set_cache(data=item_data)
  132. self.set_redis_key(type=items.type, tag=items.tag,
  133. file_path=file_path)
  134. def load_projector_file(self, fileIO):
  135. p_reader = Projector_Reader(fileIO).read()
  136. for items in p_reader.projectors:
  137. item_data = {
  138. 'step': items.step,
  139. 'wall_time': items.wall_time,
  140. 'value': items.value.reshape(items.value.shape[0], -1)
  141. if items.value.ndim > 2 else items.value,
  142. 'label': items.label,
  143. }
  144. file_path = path_parser(self.cache_path, self.runname,
  145. type=p_reader.metadata.type,
  146. tag=items.tag)
  147. CacheIO(file_path).set_cache(data=item_data)
  148. self.set_redis_key(type=p_reader.metadata.type, tag=items.tag,
  149. file_path=file_path)
  150. if p_reader.sample:
  151. file_path = path_parser(self.cache_path, self.runname,
  152. type="embedding",
  153. tag="sample_" + items.tag)
  154. CacheIO(file_path).set_cache(data=p_reader.sample)
  155. self.set_redis_key(type="embedding", tag="sample_" + items.tag,
  156. file_path=file_path)
  157. def filter_graph(self, file):
  158. variable_names = {}
  159. graph = json.loads(file)
  160. for sub_graph in graph:
  161. cfg = sub_graph["config"]
  162. # 拷贝一份,用于循环
  163. cfg_copy = cfg["layers"].copy()
  164. for layer in cfg_copy:
  165. if layer["class_name"] == "variable":
  166. _name = layer["name"]
  167. variable_names[_name] = layer
  168. cfg["layers"].remove(layer)
  169. # 第二遍循环,删除`variable_names`出现在`inbound_nodes`中的名字
  170. for sub_graph in graph:
  171. cfg = sub_graph["config"]
  172. for layer in cfg["layers"]:
  173. in_nodes = layer["inbound_nodes"]
  174. in_nodes_copy = in_nodes.copy()
  175. for node in in_nodes_copy:
  176. # 在里面则删除
  177. if node in variable_names.keys():
  178. in_nodes.remove(node)
  179. graph_str = json.dumps(graph)
  180. return graph_str
  181. def load_model_file(self, file):
  182. with open(file, "r") as f:
  183. # 结构图内容
  184. _cg_content = f.read()
  185. _sg_content = self.filter_graph(_cg_content)
  186. # caclulate_graph.json
  187. sg_file_path = path_parser(self.cache_path, self.runname,
  188. type="graph",
  189. tag="s_graph")
  190. cg_file_path = path_parser(self.cache_path, self.runname,
  191. type="graph",
  192. tag="c_graph")
  193. CacheIO(sg_file_path).set_cache(data=_sg_content)
  194. CacheIO(cg_file_path).set_cache(data=_cg_content)
  195. self.set_redis_key(type="graph", tag="s_graph",
  196. file_path=sg_file_path)
  197. self.set_redis_key(type="graph", tag="c_graph",
  198. file_path=cg_file_path)

一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库、视觉模型炼知平台、数据可视化分析平台等一系列平台及工具,在模型高效分布式训练、数据处理和可视分析、模型炼知和轻量化等技术上形成独特优势,目前已在产学研等各领域近千家单位及个人提供AI应用赋能