|
|
|
@@ -56,12 +56,12 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
* @return 实例对象 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public ExperimentIns queryById(Integer id) throws IOException { |
|
|
|
public ExperimentIns queryById(Integer id) { |
|
|
|
ExperimentIns experimentIns = this.experimentInsDao.queryById(id); |
|
|
|
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { |
|
|
|
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns.getStatus())) { |
|
|
|
experimentIns = this.queryStatusFromArgo(experimentIns); |
|
|
|
//只有当新状态是终止态时才更新数据库 |
|
|
|
if (isTerminatedState(experimentIns)) { |
|
|
|
if (isTerminatedState(experimentIns.getStatus())) { |
|
|
|
//同时更新各个节点 |
|
|
|
this.update(experimentIns); |
|
|
|
} |
|
|
|
@@ -78,17 +78,17 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
* @return 实验列表 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public List<ExperimentIns> getByExperimentId(Integer experimentId) throws IOException { |
|
|
|
public List<ExperimentIns> getByExperimentId(Integer experimentId) { |
|
|
|
List<ExperimentIns> experimentInsList = experimentInsDao.getByExperimentId(experimentId); |
|
|
|
|
|
|
|
List<ExperimentIns> result = new ArrayList<ExperimentIns>(); |
|
|
|
if (experimentInsList!=null&&experimentInsList.size()>0) { |
|
|
|
for (ExperimentIns experimentIns : experimentInsList) { |
|
|
|
//当原本状态为null或非终止态时才调用argo接口 |
|
|
|
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { |
|
|
|
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns.getStatus())) { |
|
|
|
experimentIns = this.queryStatusFromArgo(experimentIns); |
|
|
|
//只有当新状态是终止态时才更新数据库 |
|
|
|
if (isTerminatedState(experimentIns)) { |
|
|
|
if (isTerminatedState(experimentIns.getStatus())) { |
|
|
|
//同时更新各个节点 |
|
|
|
this.update(experimentIns); |
|
|
|
} |
|
|
|
@@ -109,7 +109,7 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
* @return 查询结果 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public Page<ExperimentIns> queryByPage(ExperimentIns experimentIns, PageRequest pageRequest) throws IOException { |
|
|
|
public Page<ExperimentIns> queryByPage(ExperimentIns experimentIns, PageRequest pageRequest) { |
|
|
|
long total = this.experimentInsDao.count(experimentIns); |
|
|
|
List<ExperimentIns> experimentInsList = this.experimentInsDao.queryAllByLimit(experimentIns, pageRequest); |
|
|
|
if (experimentInsList!=null && experimentInsList.size()>0) { |
|
|
|
@@ -152,7 +152,7 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
* @return 实例对象 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public ExperimentIns update(ExperimentIns experimentIns) throws IOException { |
|
|
|
public ExperimentIns update(ExperimentIns experimentIns) { |
|
|
|
LoginUser loginUser = SecurityUtils.getLoginUser(); |
|
|
|
experimentIns.setUpdateBy(loginUser.getUsername()); |
|
|
|
experimentIns.setUpdateTime(new Date()); |
|
|
|
@@ -212,8 +212,8 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
String namespace = ins.getArgoInsNs(); |
|
|
|
String name = ins.getArgoInsName(); |
|
|
|
Integer id = ins.getId(); |
|
|
|
// 创建请求数据map |
|
|
|
ExperimentIns experimentIns = this.experimentInsDao.queryById(id); |
|
|
|
// 创建请求数据map |
|
|
|
Map<String,Object> requestData = new HashMap<>(); |
|
|
|
requestData.put("namespace", namespace); |
|
|
|
requestData.put("name", name); |
|
|
|
@@ -241,6 +241,10 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
if (status == null || status.isEmpty()) { |
|
|
|
throw new RuntimeException("工作流状态为空。"); |
|
|
|
} |
|
|
|
//解析流水线开始时间,开始时间一定存在,所以不需要判断 |
|
|
|
Date startTime = DateUtils.convertUTCtoShanghaiDate((String) status.get("startedAt")); |
|
|
|
experimentIns.setStartTime(startTime); |
|
|
|
|
|
|
|
|
|
|
|
//解析流水线结束时间 |
|
|
|
String finishedAtString = (String) status.get("finishedAt"); |
|
|
|
@@ -251,23 +255,21 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
|
|
|
|
// 解析nodes字段,提取节点状态并转换为JSON字符串 |
|
|
|
Map<String, Object> nodes = (Map<String, Object>) status.get("nodes"); |
|
|
|
Map<String, Object> modifiedNodes = new LinkedHashMap<>(); |
|
|
|
if (nodes != null ) { |
|
|
|
for (Map.Entry<String, Object> nodeEntry : nodes.entrySet()) { |
|
|
|
Map<String,Object> nodeDetails = (Map<String, Object>) nodeEntry.getValue(); |
|
|
|
String templateName = (String) nodeDetails.get("displayName"); |
|
|
|
modifiedNodes.put(templateName, nodeDetails); |
|
|
|
} |
|
|
|
if (nodes == null || nodes.isEmpty()) { |
|
|
|
throw new RuntimeException("工作流的节点数据为空。"); |
|
|
|
} |
|
|
|
|
|
|
|
String nodeStatusJson = JsonUtils.mapToJson(modifiedNodes); |
|
|
|
experimentIns.setNodesStatus(nodeStatusJson); |
|
|
|
Map<String, Object> modifiedNodes = new LinkedHashMap<>(); |
|
|
|
|
|
|
|
//终止态为终止不改 |
|
|
|
if (!StringUtils.equals(experimentIns.getStatus(),"Terminated")) { |
|
|
|
experimentIns.setStatus(StringUtils.isNotEmpty((String) status.get("phase"))?(String) status.get("phase"):"Pending"); |
|
|
|
for (Map.Entry<String, Object> nodeEntry : nodes.entrySet()) { |
|
|
|
Map<String,Object> nodeDetails = (Map<String, Object>) nodeEntry.getValue(); |
|
|
|
String templateName = (String) nodeDetails.get("displayName"); |
|
|
|
modifiedNodes.put(templateName, nodeDetails); |
|
|
|
} |
|
|
|
|
|
|
|
String nodeStatusJson = JsonUtils.mapToJson(modifiedNodes); |
|
|
|
experimentIns.setNodesStatus(nodeStatusJson); |
|
|
|
experimentIns.setStatus((String) status.get("phase")); |
|
|
|
|
|
|
|
return experimentIns; |
|
|
|
|
|
|
|
@@ -276,6 +278,7 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
@@ -326,11 +329,9 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
// 从响应Map中直接获取"errCode"的值 |
|
|
|
Integer errCode = (Integer) runResMap.get("errCode"); |
|
|
|
if (errCode != null && errCode == 0) { |
|
|
|
experimentIns.setStatus("Terminated"); |
|
|
|
//更新experimentIns,确保状态更新被保存到数据库 |
|
|
|
ExperimentIns ins = queryStatusFromArgo(experimentIns); |
|
|
|
ins.setStatus("Terminated"); |
|
|
|
ins.setFinishTime(new Date()); |
|
|
|
this.experimentInsDao.update(ins); |
|
|
|
this.experimentInsDao.update(experimentIns); |
|
|
|
return true; |
|
|
|
} else { |
|
|
|
return false; |
|
|
|
@@ -394,27 +395,10 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private boolean isTerminatedState(ExperimentIns ins) throws IOException { |
|
|
|
private boolean isTerminatedState(String state) { |
|
|
|
// 定义终止态的列表,例如 "Succeeded", "Failed" 等 |
|
|
|
String status = ins.getStatus(); |
|
|
|
boolean flag = true; |
|
|
|
List<String> terminatedStates = Arrays.asList("Succeeded", "Failed"); |
|
|
|
flag = terminatedStates.contains(status); |
|
|
|
if (StringUtils.equals(status, "Terminated")){ |
|
|
|
//如果跟node_status里面不一样,就要去更新node_status的信息 |
|
|
|
String nodesStatus = ins.getNodesStatus(); |
|
|
|
Map<String, Object> nodeMap = JsonUtils.jsonToMap(nodesStatus); |
|
|
|
String keyStartsWithWorkflow = nodeMap.keySet().stream() |
|
|
|
.filter(key -> key.startsWith("workflow-")) |
|
|
|
.findFirst() |
|
|
|
.orElse(null); |
|
|
|
Map workflowMap = (Map) nodeMap.get(keyStartsWithWorkflow); |
|
|
|
if (workflowMap != null){ |
|
|
|
flag = StringUtils.equals("Terminated", (String) workflowMap.get("phase")); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return flag; |
|
|
|
List<String> terminatedStates = Arrays.asList("Succeeded", "Failed", "Terminated"); |
|
|
|
return terminatedStates.contains(state); |
|
|
|
} |
|
|
|
|
|
|
|
} |