| @@ -17,9 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; | |||
| @EnableRyFeignClients | |||
| @SpringBootApplication | |||
| @EnableScheduling | |||
| public class | |||
| RuoYiManagementPlatformApplication { | |||
| public class RuoYiManagementPlatformApplication { | |||
| public static void main(String[] args) { | |||
| SpringApplication.run(RuoYiManagementPlatformApplication.class, args); | |||
| System.out.println("(♥◠‿◠)ノ゙ 复杂智能软件管理平台启动成功 ლ(´ڡ`ლ)゙ \n" + | |||
| @@ -92,5 +92,7 @@ public interface ExperimentInsDao { | |||
| List<ExperimentIns> queryByExperiment(@Param("experimentIns") ExperimentIns experimentIns); | |||
| List<ExperimentIns> queryByExperimentId(Integer id); | |||
| List<ExperimentIns> queryByExperimentIsNotTerminated(); | |||
| } | |||
| @@ -0,0 +1,88 @@ | |||
| package com.ruoyi.platform.scheduling; | |||
| import com.ruoyi.common.security.utils.SecurityUtils; | |||
| import com.ruoyi.platform.domain.Experiment; | |||
| import com.ruoyi.platform.domain.ExperimentIns; | |||
| import com.ruoyi.platform.mapper.ExperimentDao; | |||
| import com.ruoyi.platform.mapper.ExperimentInsDao; | |||
| import com.ruoyi.platform.service.ExperimentInsService; | |||
| import com.ruoyi.platform.service.ExperimentService; | |||
| import com.ruoyi.platform.utils.JsonUtils; | |||
| import com.ruoyi.system.api.model.LoginUser; | |||
| import io.swagger.models.auth.In; | |||
| import org.apache.commons.lang3.StringUtils; | |||
| import org.springframework.beans.factory.annotation.Autowired; | |||
| import org.springframework.scheduling.annotation.Scheduled; | |||
| import org.springframework.stereotype.Component; | |||
| import javax.annotation.Resource; | |||
| import java.io.IOException; | |||
| import java.util.*; | |||
| import java.util.stream.Collectors; | |||
| @Component() | |||
| public class ExperimentInstanceStatusTask { | |||
| @Autowired | |||
| private ExperimentInsService experimentInsService; | |||
| @Resource | |||
| private ExperimentDao experimentDao; | |||
| @Resource | |||
| private ExperimentInsDao experimentInsDao; | |||
| private List<Integer> experimentIds = new ArrayList<>(); | |||
| @Scheduled(cron = "0/30 * * * * ?") // 每30S执行一次 | |||
| public void executeExperimentInsStatus() throws IOException { | |||
| // 查到所有非终止态的实例 | |||
| List<ExperimentIns> experimentInsList = experimentInsService.queryByExperimentIsNotTerminated(); | |||
| // 去argo查询状态 | |||
| List<ExperimentIns> updateList = new ArrayList<>(); | |||
| if (experimentInsList != null && experimentInsList.size() > 0) { | |||
| for (ExperimentIns experimentIns : experimentInsList) { | |||
| //当原本状态为null或非终止态时才调用argo接口 | |||
| String oldStatus = experimentIns.getStatus(); | |||
| try { | |||
| experimentIns = experimentInsService.queryStatusFromArgo(experimentIns); | |||
| }catch (Exception e){ | |||
| experimentIns.setStatus("Failed"); | |||
| } | |||
| if (!StringUtils.equals(oldStatus,experimentIns.getStatus())){ | |||
| experimentIns.setUpdateTime(new Date()); | |||
| if (!experimentIds.contains(experimentIns.getExperimentId())){ | |||
| experimentIds.add(experimentIns.getExperimentId()); | |||
| } | |||
| updateList.add(experimentIns); | |||
| } | |||
| experimentInsDao.update(experimentIns); | |||
| } | |||
| } | |||
| if (updateList.size() > 0){ | |||
| experimentInsDao.insertOrUpdateBatch(updateList); | |||
| } | |||
| } | |||
| @Scheduled(cron = "0/30 * * * * ?") // / 每30S执行一次 | |||
| public void executeExperimentStatus() throws IOException { | |||
| if (experimentIds.size()==0){ | |||
| return; | |||
| } | |||
| List<Experiment> updateexperiments = new ArrayList<>(); | |||
| for (Integer experimentId : experimentIds){ | |||
| List<ExperimentIns> insList = experimentInsService.getByExperimentId(experimentId); | |||
| List<String> statusList = new ArrayList<String>(); | |||
| // 更新实验状态列表 | |||
| for (int i=0;i<insList.size();i++){ | |||
| statusList.add(insList.get(i).getStatus()); | |||
| } | |||
| String subStatus = statusList.toString().substring(1, statusList.toString().length() - 1); | |||
| Experiment experiment = experimentDao.queryById(experimentId); | |||
| if (!StringUtils.equals(subStatus,experiment.getStatusList())){ | |||
| updateexperiments.add(experiment); | |||
| } | |||
| } | |||
| if (updateexperiments.size() > 0){ | |||
| experimentDao.insertOrUpdateBatch(updateexperiments); | |||
| } | |||
| experimentIds.clear(); | |||
| System.out.println(experimentIds); | |||
| } | |||
| } | |||
| @@ -98,4 +98,9 @@ public interface ExperimentInsService { | |||
| String getRealtimePodLogFromPod(PodLogVo podLogVo); | |||
| /** | |||
| * 查询非终止态的实例 | |||
| * @return | |||
| */ | |||
| List<ExperimentIns> queryByExperimentIsNotTerminated(); | |||
| } | |||
| @@ -71,14 +71,16 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||
| @Override | |||
| public ExperimentIns queryById(Integer id) throws IOException { | |||
| ExperimentIns experimentIns = this.experimentInsDao.queryById(id); | |||
| if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||
| experimentIns = this.queryStatusFromArgo(experimentIns); | |||
| //只有当新状态是终止态时才更新数据库 | |||
| if (isTerminatedState(experimentIns)) { | |||
| //同时更新各个节点 | |||
| this.update(experimentIns); | |||
| } | |||
| } | |||
| //已经迁移至定时任务进行更新操作 | |||
| // if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||
| // experimentIns = this.queryStatusFromArgo(experimentIns); | |||
| // //只有当新状态是终止态时才更新数据库 | |||
| // if (isTerminatedState(experimentIns)) { | |||
| // //同时更新各个节点 | |||
| // this.update(experimentIns); | |||
| // } | |||
| // } | |||
| return experimentIns; | |||
| } | |||
| @@ -93,40 +95,42 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||
| @Override | |||
| public List<ExperimentIns> getByExperimentId(Integer experimentId) throws IOException { | |||
| List<ExperimentIns> experimentInsList = experimentInsDao.getByExperimentId(experimentId); | |||
| //搞个标记,当状态改变才去改表 | |||
| boolean flag = false; | |||
| List<ExperimentIns> result = new ArrayList<ExperimentIns>(); | |||
| if (experimentInsList!=null && experimentInsList.size()>0) { | |||
| for (ExperimentIns experimentIns : experimentInsList) { | |||
| //当原本状态为null或非终止态时才调用argo接口 | |||
| if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||
| experimentIns = this.queryStatusFromArgo(experimentIns); | |||
| if (!flag){ | |||
| flag = true; | |||
| } | |||
| //只有当新状态是终止态时才更新数据库 | |||
| if (isTerminatedState(experimentIns)) { | |||
| //同时更新各个节点 | |||
| this.update(experimentIns); | |||
| } | |||
| } | |||
| //新增查询tensorBoard容器状态 | |||
| result.add(experimentIns); | |||
| } | |||
| } | |||
| if (flag) { | |||
| List<String> statusList = new ArrayList<String>(); | |||
| // 更新实验状态列表 | |||
| for (int i=0;i<result.size();i++){ | |||
| statusList.add(result.get(i).getStatus()); | |||
| } | |||
| Experiment experiment = experimentDao.queryById(experimentId); | |||
| experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1)); | |||
| experimentDao.update(experiment); | |||
| } | |||
| return result; | |||
| //代码全部迁移至定时任务 | |||
| //搞个标记,当状态改变才去改表 | |||
| // boolean flag = false; | |||
| // List<ExperimentIns> result = new ArrayList<ExperimentIns>(); | |||
| // if (experimentInsList!=null && experimentInsList.size()>0) { | |||
| // for (ExperimentIns experimentIns : experimentInsList) { | |||
| // //当原本状态为null或非终止态时才调用argo接口 | |||
| // if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||
| // experimentIns = this.queryStatusFromArgo(experimentIns); | |||
| // if (!flag){ | |||
| // flag = true; | |||
| // } | |||
| // //只有当新状态是终止态时才更新数据库 | |||
| // if (isTerminatedState(experimentIns)) { | |||
| // //同时更新各个节点 | |||
| // this.update(experimentIns); | |||
| // } | |||
| // } | |||
| // | |||
| // //新增查询tensorBoard容器状态 | |||
| // result.add(experimentIns); | |||
| // } | |||
| // } | |||
| // if (flag) { | |||
| // List<String> statusList = new ArrayList<String>(); | |||
| // // 更新实验状态列表 | |||
| // for (int i=0;i<result.size();i++){ | |||
| // statusList.add(result.get(i).getStatus()); | |||
| // } | |||
| // Experiment experiment = experimentDao.queryById(experimentId); | |||
| // experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1)); | |||
| // experimentDao.update(experiment); | |||
| // } | |||
| return experimentInsList; | |||
| } | |||
| @@ -141,15 +145,15 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||
| public Page<ExperimentIns> queryByPage(ExperimentIns experimentIns, PageRequest pageRequest) throws IOException { | |||
| long total = this.experimentInsDao.count(experimentIns); | |||
| List<ExperimentIns> experimentInsList = this.experimentInsDao.queryAllByLimit(experimentIns, pageRequest); | |||
| if (experimentInsList!=null && experimentInsList.size()>0) { | |||
| for (ExperimentIns ins : experimentInsList) { | |||
| //如果实验实例不为空或者 | |||
| if (ins != null && StringUtils.isEmpty(ins.getStatus())) { | |||
| ins = this.queryStatusFromArgo(ins); | |||
| this.update(ins); | |||
| } | |||
| } | |||
| } | |||
| // if (experimentInsList!=null && experimentInsList.size()>0) { | |||
| // for (ExperimentIns ins : experimentInsList) { | |||
| // //如果实验实例不为空或者 | |||
| // if (ins != null && StringUtils.isEmpty(ins.getStatus())) { | |||
| // ins = this.queryStatusFromArgo(ins); | |||
| // this.update(ins); | |||
| // } | |||
| // } | |||
| // } | |||
| return new PageImpl<>(experimentInsList, pageRequest, total); | |||
| } | |||
| @@ -524,6 +528,11 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||
| return k8sClientUtil.getPodLogs(podLogVo.getPodName(), podLogVo.getNamespace(),podLogVo.getContainerName(), logsLines); | |||
| } | |||
| @Override | |||
| public List<ExperimentIns> queryByExperimentIsNotTerminated() { | |||
| return experimentInsDao.queryByExperimentIsNotTerminated(); | |||
| } | |||
| private boolean isTerminatedState(ExperimentIns ins) throws IOException { | |||
| // 定义终止态的列表,例如 "Succeeded", "Failed" 等 | |||
| String status = ins.getStatus(); | |||
| @@ -256,8 +256,9 @@ public class ExperimentServiceImpl implements ExperimentService { | |||
| experimentIns.setExperimentId(experiment.getId()); | |||
| experimentIns.setArgoInsNs((String) metadata.get("namespace")); | |||
| experimentIns.setArgoInsName((String) metadata.get("name")); | |||
| //传入实验全局参数 | |||
| experimentIns.setStatus("Pending"); | |||
| //传入实验全局参数 | |||
| experimentIns.setGlobalParam(experiment.getGlobalParam()); | |||
| @@ -21,7 +21,13 @@ | |||
| <result property="state" column="state" jdbcType="INTEGER"/> | |||
| </resultMap> | |||
| <!--查询非终止态的实例--> | |||
| <select id="queryByExperimentIsNotTerminated" resultMap="ExperimentInsMap"> | |||
| select id, experiment_id, argo_ins_name, argo_ins_ns, status, nodes_status,nodes_result, nodes_logs,global_param, start_time, finish_time, create_by, create_time, update_by, update_time, state | |||
| from experiment_ins | |||
| where (status NOT IN ('Terminated', 'Succeeded', 'Failed') | |||
| OR status IS NULL) and state = 1 | |||
| </select> | |||
| <!--查询单个--> | |||
| <select id="queryById" resultMap="ExperimentInsMap"> | |||