| @@ -0,0 +1,251 @@ | |||||
| package com.ruoyi.platform.service.impl; | |||||
| import com.ruoyi.platform.domain.TextClassification; | |||||
| import com.ruoyi.platform.domain.TextClassificationIns; | |||||
| import com.ruoyi.platform.mapper.TextClassificationDao; | |||||
| import com.ruoyi.platform.mapper.TextClassificationInsDao; | |||||
| import com.ruoyi.platform.service.TextClassificationInsService; | |||||
| import com.ruoyi.platform.utils.DateUtils; | |||||
| import com.ruoyi.platform.utils.HttpUtils; | |||||
| import com.ruoyi.platform.utils.JsonUtils; | |||||
| import com.ruoyi.system.api.constant.Constant; | |||||
| import org.apache.commons.lang3.StringUtils; | |||||
| import org.springframework.beans.factory.annotation.Value; | |||||
| import org.springframework.data.domain.Page; | |||||
| import org.springframework.data.domain.PageImpl; | |||||
| import org.springframework.data.domain.PageRequest; | |||||
| import org.springframework.stereotype.Service; | |||||
| import javax.annotation.Resource; | |||||
| import java.util.*; | |||||
| @Service | |||||
| public class TextClassificationInsServiceImpl implements TextClassificationInsService { | |||||
| @Value("${argo.url}") | |||||
| private String argoUrl; | |||||
| @Value("${argo.workflowStatus}") | |||||
| private String argoWorkflowStatus; | |||||
| @Value("${argo.workflowTermination}") | |||||
| private String argoWorkflowTermination; | |||||
| @Resource | |||||
| private TextClassificationDao textClassificationDao; | |||||
| @Resource | |||||
| private TextClassificationInsDao textClassificationInsDao; | |||||
| @Override | |||||
| public Page<TextClassificationIns> queryByPage(TextClassificationIns textClassificationIns, PageRequest pageRequest) { | |||||
| long total = this.textClassificationInsDao.count(textClassificationIns); | |||||
| List<TextClassificationIns> textClassificationInsList = this.textClassificationInsDao.queryAllByLimit(textClassificationIns, pageRequest); | |||||
| return new PageImpl<>(textClassificationInsList, pageRequest, total); | |||||
| } | |||||
| @Override | |||||
| public TextClassificationIns insert(TextClassificationIns textClassificationIns) { | |||||
| this.textClassificationInsDao.insert(textClassificationIns); | |||||
| return textClassificationIns; | |||||
| } | |||||
| @Override | |||||
| public String removeById(Long id) { | |||||
| TextClassificationIns textClassificationIns = textClassificationInsDao.queryById(id); | |||||
| if (textClassificationIns == null) { | |||||
| return "实验实例不存在"; | |||||
| } | |||||
| if (StringUtils.isEmpty(textClassificationIns.getStatus())) { | |||||
| textClassificationIns = queryStatusFromArgo(textClassificationIns); | |||||
| } | |||||
| if (StringUtils.equals(textClassificationIns.getStatus(), Constant.Running)) { | |||||
| return "实验实例正在运行,不可删除"; | |||||
| } | |||||
| textClassificationIns.setState(Constant.State_invalid); | |||||
| int update = textClassificationInsDao.update(textClassificationIns); | |||||
| if (update > 0) { | |||||
| updateTextClassificationStatus(textClassificationIns.getTextClassificationId()); | |||||
| return "删除成功"; | |||||
| } else { | |||||
| return "删除失败"; | |||||
| } | |||||
| } | |||||
| @Override | |||||
| public String batchDelete(List<Long> ids) { | |||||
| for (Long id : ids) { | |||||
| String result = removeById(id); | |||||
| if (!"删除成功".equals(result)) { | |||||
| return result; | |||||
| } | |||||
| } | |||||
| return "删除成功"; | |||||
| } | |||||
| @Override | |||||
| public boolean terminateTextClassificationIns(Long id) throws Exception { | |||||
| TextClassificationIns textClassificationIns = textClassificationInsDao.queryById(id); | |||||
| if (textClassificationIns == null) { | |||||
| throw new IllegalStateException("实验实例未查询到,id: " + id); | |||||
| } | |||||
| String currentStatus = textClassificationIns.getStatus(); | |||||
| String name = textClassificationIns.getArgoInsName(); | |||||
| String namespace = textClassificationIns.getArgoInsNs(); | |||||
| // 获取当前状态,如果为空,则从Argo查询 | |||||
| if (StringUtils.isEmpty(currentStatus)) { | |||||
| currentStatus = queryStatusFromArgo(textClassificationIns).getStatus(); | |||||
| } | |||||
| // 只有状态是"Running"时才能终止实例 | |||||
| if (!currentStatus.equalsIgnoreCase(Constant.Running)) { | |||||
| throw new Exception("终止错误,只有运行状态的实例才能终止"); // 如果不是"Running"状态,则不执行终止操作 | |||||
| } | |||||
| // 创建请求数据map | |||||
| Map<String, Object> requestData = new HashMap<>(); | |||||
| requestData.put("namespace", namespace); | |||||
| requestData.put("name", name); | |||||
| // 创建发送数据map,将请求数据作为"data"键的值 | |||||
| Map<String, Object> res = new HashMap<>(); | |||||
| res.put("data", requestData); | |||||
| try { | |||||
| // 发送POST请求到Argo工作流状态查询接口,并将请求数据转换为JSON | |||||
| String req = HttpUtils.sendPost(argoUrl + argoWorkflowTermination, null, JsonUtils.mapToJson(res)); | |||||
| // 检查响应是否为空或无内容 | |||||
| if (StringUtils.isEmpty(req)) { | |||||
| throw new RuntimeException("终止响应内容为空"); | |||||
| } | |||||
| // 将响应的JSON字符串转换为Map对象 | |||||
| Map<String, Object> runResMap = JsonUtils.jsonToMap(req); | |||||
| // 从响应Map中直接获取"errCode"的值 | |||||
| Integer errCode = (Integer) runResMap.get("errCode"); | |||||
| if (errCode != null && errCode == 0) { | |||||
| //更新autoMlIns,确保状态更新被保存到数据库 | |||||
| TextClassificationIns ins = queryStatusFromArgo(textClassificationIns); | |||||
| String nodeStatus = ins.getNodeStatus(); | |||||
| Map<String, Object> nodeMap = JsonUtils.jsonToMap(nodeStatus); | |||||
| // 遍历 map | |||||
| for (Map.Entry<String, Object> entry : nodeMap.entrySet()) { | |||||
| // 获取每个 Map 中的值并强制转换为 Map | |||||
| Map<String, Object> innerMap = (Map<String, Object>) entry.getValue(); | |||||
| // 检查 phase 的值 | |||||
| if (innerMap.containsKey("phase")) { | |||||
| String phaseValue = (String) innerMap.get("phase"); | |||||
| // 如果值不等于 Succeeded,则赋值为 Failed | |||||
| if (!StringUtils.equals(Constant.Succeeded, phaseValue)) { | |||||
| innerMap.put("phase", Constant.Failed); | |||||
| } | |||||
| } | |||||
| } | |||||
| ins.setNodeStatus(JsonUtils.mapToJson(nodeMap)); | |||||
| ins.setStatus(Constant.Terminated); | |||||
| ins.setUpdateTime(new Date()); | |||||
| ins.setFinishTime(new Date()); | |||||
| this.textClassificationInsDao.update(ins); | |||||
| updateTextClassificationStatus(textClassificationIns.getTextClassificationId()); | |||||
| return true; | |||||
| }else { | |||||
| return false; | |||||
| } | |||||
| } catch (Exception e) { | |||||
| throw new RuntimeException("终止实例错误: " + e.getMessage(), e); | |||||
| } | |||||
| } | |||||
| @Override | |||||
| public TextClassificationIns getDetailById(Long id) { | |||||
| TextClassificationIns textClassificationIns = textClassificationInsDao.queryById(id); | |||||
| if (Constant.Running.equals(textClassificationIns.getStatus()) || Constant.Pending.equals(textClassificationIns.getStatus())) { | |||||
| textClassificationIns = queryStatusFromArgo(textClassificationIns); | |||||
| } | |||||
| return textClassificationIns; | |||||
| } | |||||
| @Override | |||||
| public List<TextClassificationIns> queryByNotTerminated() { | |||||
| return textClassificationInsDao.queryByNotTerminated(); | |||||
| } | |||||
| @Override | |||||
| public TextClassificationIns queryStatusFromArgo(TextClassificationIns ins) { | |||||
| String namespace = ins.getArgoInsNs(); | |||||
| String name = ins.getArgoInsName(); | |||||
| // 创建请求数据map | |||||
| Map<String, Object> requestData = new HashMap<>(); | |||||
| requestData.put("namespace", namespace); | |||||
| requestData.put("name", name); | |||||
| // 创建发送数据map,将请求数据作为"data"键的值 | |||||
| Map<String, Object> res = new HashMap<>(); | |||||
| res.put("data", requestData); | |||||
| try { | |||||
| // 发送POST请求到Argo工作流状态查询接口,并将请求数据转换为JSON | |||||
| String req = HttpUtils.sendPost(argoUrl + argoWorkflowStatus, null, JsonUtils.mapToJson(res)); | |||||
| // 检查响应是否为空或无内容 | |||||
| if (req == null || StringUtils.isEmpty(req)) { | |||||
| throw new RuntimeException("工作流状态响应为空"); | |||||
| } | |||||
| // 将响应的JSON字符串转换为Map对象 | |||||
| Map<String, Object> runResMap = JsonUtils.jsonToMap(req); | |||||
| // 从响应Map中获取"data"部分 | |||||
| Map<String, Object> data = (Map<String, Object>) runResMap.get("data"); | |||||
| if (data == null || data.isEmpty()) { | |||||
| throw new RuntimeException("工作流数据为空"); | |||||
| } | |||||
| // 从"data"中获取"status"部分,并返回"phase"的值 | |||||
| Map<String, Object> status = (Map<String, Object>) data.get("status"); | |||||
| if (status == null || status.isEmpty()) { | |||||
| throw new RuntimeException("工作流状态为空"); | |||||
| } | |||||
| //解析流水线结束时间 | |||||
| String finishedAtString = (String) status.get("finishedAt"); | |||||
| if (finishedAtString != null && !finishedAtString.isEmpty()) { | |||||
| Date finishTime = DateUtils.convertUTCtoShanghaiDate(finishedAtString); | |||||
| ins.setFinishTime(finishTime); | |||||
| } | |||||
| // 解析nodes字段,提取节点状态并转换为JSON字符串 | |||||
| Map<String, Object> nodes = (Map<String, Object>) status.get("nodes"); | |||||
| Map<String, Object> modifiedNodes = new LinkedHashMap<>(); | |||||
| if (nodes != null) { | |||||
| for (Map.Entry<String, Object> nodeEntry : nodes.entrySet()) { | |||||
| Map<String, Object> nodeDetails = (Map<String, Object>) nodeEntry.getValue(); | |||||
| String templateName = (String) nodeDetails.get("displayName"); | |||||
| modifiedNodes.put(templateName, nodeDetails); | |||||
| } | |||||
| } | |||||
| String nodeStatusJson = JsonUtils.mapToJson(modifiedNodes); | |||||
| ins.setNodeStatus(nodeStatusJson); | |||||
| //终止态为终止不改 | |||||
| if (!StringUtils.equals(ins.getStatus(), Constant.Terminated)) { | |||||
| ins.setStatus(StringUtils.isNotEmpty((String) status.get("phase")) ? (String) status.get("phase") : Constant.Pending); | |||||
| } | |||||
| if (StringUtils.equals(ins.getStatus(), Constant.Error)) { | |||||
| ins.setStatus(Constant.Failed); | |||||
| } | |||||
| return ins; | |||||
| } catch (Exception e) { | |||||
| throw new RuntimeException("查询状态失败: " + e.getMessage(), e); | |||||
| } | |||||
| } | |||||
| @Override | |||||
| public void updateTextClassificationStatus(Long textClassificationId) { | |||||
| List<TextClassificationIns> insList = textClassificationInsDao.getByTextClassificationId(textClassificationId); | |||||
| List<String> statusList = new ArrayList<>(); | |||||
| // 更新实验状态列表 | |||||
| for (int i = 0; i < insList.size(); i++) { | |||||
| statusList.add(insList.get(i).getStatus()); | |||||
| } | |||||
| String subStatus = statusList.toString().substring(1, statusList.toString().length() - 1); | |||||
| TextClassification textClassification = textClassificationDao.getById(new Long(textClassificationId)); | |||||
| if (!StringUtils.equals(textClassification.getStatusList(), subStatus)) { | |||||
| textClassification.setStatusList(subStatus); | |||||
| textClassificationDao.edit(textClassification); | |||||
| } | |||||
| } | |||||
| } | |||||