| @@ -17,9 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; | |||||
| @EnableRyFeignClients | @EnableRyFeignClients | ||||
| @SpringBootApplication | @SpringBootApplication | ||||
| @EnableScheduling | @EnableScheduling | ||||
| public class | |||||
| RuoYiManagementPlatformApplication { | |||||
| public class RuoYiManagementPlatformApplication { | |||||
| public static void main(String[] args) { | public static void main(String[] args) { | ||||
| SpringApplication.run(RuoYiManagementPlatformApplication.class, args); | SpringApplication.run(RuoYiManagementPlatformApplication.class, args); | ||||
| System.out.println("(♥◠‿◠)ノ゙ 复杂智能软件管理平台启动成功 ლ(´ڡ`ლ)゙ \n" + | System.out.println("(♥◠‿◠)ノ゙ 复杂智能软件管理平台启动成功 ლ(´ڡ`ლ)゙ \n" + | ||||
| @@ -92,5 +92,7 @@ public interface ExperimentInsDao { | |||||
| List<ExperimentIns> queryByExperiment(@Param("experimentIns") ExperimentIns experimentIns); | List<ExperimentIns> queryByExperiment(@Param("experimentIns") ExperimentIns experimentIns); | ||||
| List<ExperimentIns> queryByExperimentId(Integer id); | List<ExperimentIns> queryByExperimentId(Integer id); | ||||
| List<ExperimentIns> queryByExperimentIsNotTerminated(); | |||||
| } | } | ||||
| @@ -0,0 +1,88 @@ | |||||
| package com.ruoyi.platform.scheduling; | |||||
| import com.ruoyi.common.security.utils.SecurityUtils; | |||||
| import com.ruoyi.platform.domain.Experiment; | |||||
| import com.ruoyi.platform.domain.ExperimentIns; | |||||
| import com.ruoyi.platform.mapper.ExperimentDao; | |||||
| import com.ruoyi.platform.mapper.ExperimentInsDao; | |||||
| import com.ruoyi.platform.service.ExperimentInsService; | |||||
| import com.ruoyi.platform.service.ExperimentService; | |||||
| import com.ruoyi.platform.utils.JsonUtils; | |||||
| import com.ruoyi.system.api.model.LoginUser; | |||||
| import io.swagger.models.auth.In; | |||||
| import org.apache.commons.lang3.StringUtils; | |||||
| import org.springframework.beans.factory.annotation.Autowired; | |||||
| import org.springframework.scheduling.annotation.Scheduled; | |||||
| import org.springframework.stereotype.Component; | |||||
| import javax.annotation.Resource; | |||||
| import java.io.IOException; | |||||
| import java.util.*; | |||||
| import java.util.stream.Collectors; | |||||
| @Component() | |||||
| public class ExperimentInstanceStatusTask { | |||||
| @Autowired | |||||
| private ExperimentInsService experimentInsService; | |||||
| @Resource | |||||
| private ExperimentDao experimentDao; | |||||
| @Resource | |||||
| private ExperimentInsDao experimentInsDao; | |||||
| private List<Integer> experimentIds = new ArrayList<>(); | |||||
| @Scheduled(cron = "0/30 * * * * ?") // 每30S执行一次 | |||||
| public void executeExperimentInsStatus() throws IOException { | |||||
| // 查到所有非终止态的实例 | |||||
| List<ExperimentIns> experimentInsList = experimentInsService.queryByExperimentIsNotTerminated(); | |||||
| // 去argo查询状态 | |||||
| List<ExperimentIns> updateList = new ArrayList<>(); | |||||
| if (experimentInsList != null && experimentInsList.size() > 0) { | |||||
| for (ExperimentIns experimentIns : experimentInsList) { | |||||
| //当原本状态为null或非终止态时才调用argo接口 | |||||
| String oldStatus = experimentIns.getStatus(); | |||||
| try { | |||||
| experimentIns = experimentInsService.queryStatusFromArgo(experimentIns); | |||||
| }catch (Exception e){ | |||||
| experimentIns.setStatus("Failed"); | |||||
| } | |||||
| if (!StringUtils.equals(oldStatus,experimentIns.getStatus())){ | |||||
| experimentIns.setUpdateTime(new Date()); | |||||
| if (!experimentIds.contains(experimentIns.getExperimentId())){ | |||||
| experimentIds.add(experimentIns.getExperimentId()); | |||||
| } | |||||
| updateList.add(experimentIns); | |||||
| } | |||||
| experimentInsDao.update(experimentIns); | |||||
| } | |||||
| } | |||||
| if (updateList.size() > 0){ | |||||
| experimentInsDao.insertOrUpdateBatch(updateList); | |||||
| } | |||||
| } | |||||
| @Scheduled(cron = "0/30 * * * * ?") // / 每30S执行一次 | |||||
| public void executeExperimentStatus() throws IOException { | |||||
| if (experimentIds.size()==0){ | |||||
| return; | |||||
| } | |||||
| List<Experiment> updateexperiments = new ArrayList<>(); | |||||
| for (Integer experimentId : experimentIds){ | |||||
| List<ExperimentIns> insList = experimentInsService.getByExperimentId(experimentId); | |||||
| List<String> statusList = new ArrayList<String>(); | |||||
| // 更新实验状态列表 | |||||
| for (int i=0;i<insList.size();i++){ | |||||
| statusList.add(insList.get(i).getStatus()); | |||||
| } | |||||
| String subStatus = statusList.toString().substring(1, statusList.toString().length() - 1); | |||||
| Experiment experiment = experimentDao.queryById(experimentId); | |||||
| if (!StringUtils.equals(subStatus,experiment.getStatusList())){ | |||||
| updateexperiments.add(experiment); | |||||
| } | |||||
| } | |||||
| if (updateexperiments.size() > 0){ | |||||
| experimentDao.insertOrUpdateBatch(updateexperiments); | |||||
| } | |||||
| experimentIds.clear(); | |||||
| System.out.println(experimentIds); | |||||
| } | |||||
| } | |||||
| @@ -98,4 +98,9 @@ public interface ExperimentInsService { | |||||
| String getRealtimePodLogFromPod(PodLogVo podLogVo); | String getRealtimePodLogFromPod(PodLogVo podLogVo); | ||||
| /** | |||||
| * 查询非终止态的实例 | |||||
| * @return | |||||
| */ | |||||
| List<ExperimentIns> queryByExperimentIsNotTerminated(); | |||||
| } | } | ||||
| @@ -71,14 +71,16 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||||
| @Override | @Override | ||||
| public ExperimentIns queryById(Integer id) throws IOException { | public ExperimentIns queryById(Integer id) throws IOException { | ||||
| ExperimentIns experimentIns = this.experimentInsDao.queryById(id); | ExperimentIns experimentIns = this.experimentInsDao.queryById(id); | ||||
| if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||||
| experimentIns = this.queryStatusFromArgo(experimentIns); | |||||
| //只有当新状态是终止态时才更新数据库 | |||||
| if (isTerminatedState(experimentIns)) { | |||||
| //同时更新各个节点 | |||||
| this.update(experimentIns); | |||||
| } | |||||
| } | |||||
| //已经迁移至定时任务进行更新操作 | |||||
| // if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||||
| // experimentIns = this.queryStatusFromArgo(experimentIns); | |||||
| // //只有当新状态是终止态时才更新数据库 | |||||
| // if (isTerminatedState(experimentIns)) { | |||||
| // //同时更新各个节点 | |||||
| // this.update(experimentIns); | |||||
| // } | |||||
| // } | |||||
| return experimentIns; | return experimentIns; | ||||
| } | } | ||||
| @@ -93,40 +95,42 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||||
| @Override | @Override | ||||
| public List<ExperimentIns> getByExperimentId(Integer experimentId) throws IOException { | public List<ExperimentIns> getByExperimentId(Integer experimentId) throws IOException { | ||||
| List<ExperimentIns> experimentInsList = experimentInsDao.getByExperimentId(experimentId); | List<ExperimentIns> experimentInsList = experimentInsDao.getByExperimentId(experimentId); | ||||
| //搞个标记,当状态改变才去改表 | |||||
| boolean flag = false; | |||||
| List<ExperimentIns> result = new ArrayList<ExperimentIns>(); | |||||
| if (experimentInsList!=null && experimentInsList.size()>0) { | |||||
| for (ExperimentIns experimentIns : experimentInsList) { | |||||
| //当原本状态为null或非终止态时才调用argo接口 | |||||
| if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||||
| experimentIns = this.queryStatusFromArgo(experimentIns); | |||||
| if (!flag){ | |||||
| flag = true; | |||||
| } | |||||
| //只有当新状态是终止态时才更新数据库 | |||||
| if (isTerminatedState(experimentIns)) { | |||||
| //同时更新各个节点 | |||||
| this.update(experimentIns); | |||||
| } | |||||
| } | |||||
| //新增查询tensorBoard容器状态 | |||||
| result.add(experimentIns); | |||||
| } | |||||
| } | |||||
| if (flag) { | |||||
| List<String> statusList = new ArrayList<String>(); | |||||
| // 更新实验状态列表 | |||||
| for (int i=0;i<result.size();i++){ | |||||
| statusList.add(result.get(i).getStatus()); | |||||
| } | |||||
| Experiment experiment = experimentDao.queryById(experimentId); | |||||
| experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1)); | |||||
| experimentDao.update(experiment); | |||||
| } | |||||
| return result; | |||||
| //代码全部迁移至定时任务 | |||||
| //搞个标记,当状态改变才去改表 | |||||
| // boolean flag = false; | |||||
| // List<ExperimentIns> result = new ArrayList<ExperimentIns>(); | |||||
| // if (experimentInsList!=null && experimentInsList.size()>0) { | |||||
| // for (ExperimentIns experimentIns : experimentInsList) { | |||||
| // //当原本状态为null或非终止态时才调用argo接口 | |||||
| // if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) { | |||||
| // experimentIns = this.queryStatusFromArgo(experimentIns); | |||||
| // if (!flag){ | |||||
| // flag = true; | |||||
| // } | |||||
| // //只有当新状态是终止态时才更新数据库 | |||||
| // if (isTerminatedState(experimentIns)) { | |||||
| // //同时更新各个节点 | |||||
| // this.update(experimentIns); | |||||
| // } | |||||
| // } | |||||
| // | |||||
| // //新增查询tensorBoard容器状态 | |||||
| // result.add(experimentIns); | |||||
| // } | |||||
| // } | |||||
| // if (flag) { | |||||
| // List<String> statusList = new ArrayList<String>(); | |||||
| // // 更新实验状态列表 | |||||
| // for (int i=0;i<result.size();i++){ | |||||
| // statusList.add(result.get(i).getStatus()); | |||||
| // } | |||||
| // Experiment experiment = experimentDao.queryById(experimentId); | |||||
| // experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1)); | |||||
| // experimentDao.update(experiment); | |||||
| // } | |||||
| return experimentInsList; | |||||
| } | } | ||||
| @@ -141,15 +145,15 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||||
| public Page<ExperimentIns> queryByPage(ExperimentIns experimentIns, PageRequest pageRequest) throws IOException { | public Page<ExperimentIns> queryByPage(ExperimentIns experimentIns, PageRequest pageRequest) throws IOException { | ||||
| long total = this.experimentInsDao.count(experimentIns); | long total = this.experimentInsDao.count(experimentIns); | ||||
| List<ExperimentIns> experimentInsList = this.experimentInsDao.queryAllByLimit(experimentIns, pageRequest); | List<ExperimentIns> experimentInsList = this.experimentInsDao.queryAllByLimit(experimentIns, pageRequest); | ||||
| if (experimentInsList!=null && experimentInsList.size()>0) { | |||||
| for (ExperimentIns ins : experimentInsList) { | |||||
| //如果实验实例不为空或者 | |||||
| if (ins != null && StringUtils.isEmpty(ins.getStatus())) { | |||||
| ins = this.queryStatusFromArgo(ins); | |||||
| this.update(ins); | |||||
| } | |||||
| } | |||||
| } | |||||
| // if (experimentInsList!=null && experimentInsList.size()>0) { | |||||
| // for (ExperimentIns ins : experimentInsList) { | |||||
| // //如果实验实例不为空或者 | |||||
| // if (ins != null && StringUtils.isEmpty(ins.getStatus())) { | |||||
| // ins = this.queryStatusFromArgo(ins); | |||||
| // this.update(ins); | |||||
| // } | |||||
| // } | |||||
| // } | |||||
| return new PageImpl<>(experimentInsList, pageRequest, total); | return new PageImpl<>(experimentInsList, pageRequest, total); | ||||
| } | } | ||||
| @@ -524,6 +528,11 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||||
| return k8sClientUtil.getPodLogs(podLogVo.getPodName(), podLogVo.getNamespace(),podLogVo.getContainerName(), logsLines); | return k8sClientUtil.getPodLogs(podLogVo.getPodName(), podLogVo.getNamespace(),podLogVo.getContainerName(), logsLines); | ||||
| } | } | ||||
| @Override | |||||
| public List<ExperimentIns> queryByExperimentIsNotTerminated() { | |||||
| return experimentInsDao.queryByExperimentIsNotTerminated(); | |||||
| } | |||||
| private boolean isTerminatedState(ExperimentIns ins) throws IOException { | private boolean isTerminatedState(ExperimentIns ins) throws IOException { | ||||
| // 定义终止态的列表,例如 "Succeeded", "Failed" 等 | // 定义终止态的列表,例如 "Succeeded", "Failed" 等 | ||||
| String status = ins.getStatus(); | String status = ins.getStatus(); | ||||
| @@ -256,8 +256,9 @@ public class ExperimentServiceImpl implements ExperimentService { | |||||
| experimentIns.setExperimentId(experiment.getId()); | experimentIns.setExperimentId(experiment.getId()); | ||||
| experimentIns.setArgoInsNs((String) metadata.get("namespace")); | experimentIns.setArgoInsNs((String) metadata.get("namespace")); | ||||
| experimentIns.setArgoInsName((String) metadata.get("name")); | experimentIns.setArgoInsName((String) metadata.get("name")); | ||||
| //传入实验全局参数 | |||||
| experimentIns.setStatus("Pending"); | |||||
| //传入实验全局参数 | |||||
| experimentIns.setGlobalParam(experiment.getGlobalParam()); | experimentIns.setGlobalParam(experiment.getGlobalParam()); | ||||
| @@ -21,7 +21,13 @@ | |||||
| <result property="state" column="state" jdbcType="INTEGER"/> | <result property="state" column="state" jdbcType="INTEGER"/> | ||||
| </resultMap> | </resultMap> | ||||
| <!--查询非终止态的实例--> | |||||
| <select id="queryByExperimentIsNotTerminated" resultMap="ExperimentInsMap"> | |||||
| select id, experiment_id, argo_ins_name, argo_ins_ns, status, nodes_status,nodes_result, nodes_logs,global_param, start_time, finish_time, create_by, create_time, update_by, update_time, state | |||||
| from experiment_ins | |||||
| where (status NOT IN ('Terminated', 'Succeeded', 'Failed') | |||||
| OR status IS NULL) and state = 1 | |||||
| </select> | |||||
| <!--查询单个--> | <!--查询单个--> | ||||
| <select id="queryById" resultMap="ExperimentInsMap"> | <select id="queryById" resultMap="ExperimentInsMap"> | ||||