|
|
|
@@ -14,6 +14,7 @@ |
|
|
|
# ============================================================================ |
|
|
|
"""This file is used to define the model lineage python api.""" |
|
|
|
import os |
|
|
|
import pandas as pd |
|
|
|
|
|
|
|
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamValueError, \ |
|
|
|
LineageQuerySummaryDataError, LineageParamSummaryPathError, \ |
|
|
|
@@ -28,6 +29,9 @@ from mindinsight.lineagemgr.lineage_parser import LineageParser, LineageOrganize |
|
|
|
from mindinsight.lineagemgr.querier.querier import Querier |
|
|
|
from mindinsight.utils.exceptions import MindInsightException |
|
|
|
|
|
|
|
_METRIC_PREFIX = "[M]" |
|
|
|
_USER_DEFINED_PREFIX = "[U]" |
|
|
|
|
|
|
|
|
|
|
|
def get_summary_lineage(data_manager=None, summary_dir=None, keys=None): |
|
|
|
""" |
|
|
|
@@ -183,3 +187,46 @@ def _convert_relative_path_to_abspath(summary_base_dir, search_condition): |
|
|
|
search_condition.get('summary_dir')['eq'] = abs_dir |
|
|
|
|
|
|
|
return search_condition |
|
|
|
|
|
|
|
|
|
|
|
def get_lineage_table(data_manager): |
|
|
|
"""Get lineage data in a table from data manager.""" |
|
|
|
lineages = filter_summary_lineage(data_manager=data_manager) |
|
|
|
lineage_objects = lineages.get("object", []) |
|
|
|
cnt_lineages = len(lineage_objects) |
|
|
|
metric_prefix = _METRIC_PREFIX |
|
|
|
user_defined_prefix = _USER_DEFINED_PREFIX |
|
|
|
# Step 1, get column names |
|
|
|
column_names = set() |
|
|
|
for lineage in lineage_objects: |
|
|
|
model_lineage = lineage.get("model_lineage", {}) |
|
|
|
metric = model_lineage.get("metric", {}) |
|
|
|
metric_names = tuple('{}{}'.format(metric_prefix, key) for key in metric.keys()) |
|
|
|
user_defined = model_lineage.get("user_defined", {}) |
|
|
|
user_defined_names = tuple('{}{}'.format(metric_prefix, key) for key in user_defined.keys()) |
|
|
|
model_lineage_temp = list(model_lineage.keys()) |
|
|
|
for key in model_lineage_temp: |
|
|
|
if key in ["metric", "user_defined"]: |
|
|
|
model_lineage_temp.remove(key) |
|
|
|
column_names.update(model_lineage_temp) |
|
|
|
column_names.update(metric_names) |
|
|
|
column_names.update(user_defined_names) |
|
|
|
# Step 2, collect data |
|
|
|
column_data = {key: [None] * cnt_lineages for key in column_names} |
|
|
|
for ind, lineage in enumerate(lineage_objects): |
|
|
|
model_lineage = lineage.get("model_lineage", {}) |
|
|
|
metric = model_lineage.pop("metric", {}) |
|
|
|
metric_content = { |
|
|
|
'{}{}'.format(metric_prefix, key): val for key, val in metric.items() |
|
|
|
} |
|
|
|
user_defined = model_lineage.pop("user_defined", {}) |
|
|
|
user_defined_content = { |
|
|
|
'{}{}'.format(user_defined_prefix, key): val for key, val in user_defined.items() |
|
|
|
} |
|
|
|
final_content = {} |
|
|
|
final_content.update(model_lineage) |
|
|
|
final_content.update(metric_content) |
|
|
|
final_content.update(user_defined_content) |
|
|
|
for key, val in final_content.items(): |
|
|
|
column_data[key][ind] = val |
|
|
|
return pd.DataFrame(column_data) |