Browse Source

把状态刷新改成job

pull/41/head
fanshuai 1 year ago
parent
commit
900c711e68
7 changed files with 164 additions and 55 deletions
  1. +1
    -3
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/RuoYiManagementPlatformApplication.java
  2. +2
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/mapper/ExperimentInsDao.java
  3. +88
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/scheduling/ExperimentInstanceStatusTask.java
  4. +5
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java
  5. +59
    -50
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java
  6. +2
    -1
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java
  7. +7
    -1
      ruoyi-modules/management-platform/src/main/resources/mapper/managementPlatform/ExperimentInsDaoMapper.xml

+ 1
- 3
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/RuoYiManagementPlatformApplication.java View File

@@ -17,9 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableRyFeignClients
@SpringBootApplication
@EnableScheduling
public class

RuoYiManagementPlatformApplication {
public class RuoYiManagementPlatformApplication {
public static void main(String[] args) {
SpringApplication.run(RuoYiManagementPlatformApplication.class, args);
System.out.println("(♥◠‿◠)ノ゙ 复杂智能软件管理平台启动成功 ლ(´ڡ`ლ)゙ \n" +


+ 2
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/mapper/ExperimentInsDao.java View File

@@ -92,5 +92,7 @@ public interface ExperimentInsDao {
List<ExperimentIns> queryByExperiment(@Param("experimentIns") ExperimentIns experimentIns);

List<ExperimentIns> queryByExperimentId(Integer id);

List<ExperimentIns> queryByExperimentIsNotTerminated();
}


+ 88
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/scheduling/ExperimentInstanceStatusTask.java View File

@@ -0,0 +1,88 @@
package com.ruoyi.platform.scheduling;

import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.platform.domain.Experiment;
import com.ruoyi.platform.domain.ExperimentIns;
import com.ruoyi.platform.mapper.ExperimentDao;
import com.ruoyi.platform.mapper.ExperimentInsDao;
import com.ruoyi.platform.service.ExperimentInsService;
import com.ruoyi.platform.service.ExperimentService;
import com.ruoyi.platform.utils.JsonUtils;
import com.ruoyi.system.api.model.LoginUser;
import io.swagger.models.auth.In;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;

@Component()
public class ExperimentInstanceStatusTask {
@Autowired
private ExperimentInsService experimentInsService;
@Resource
private ExperimentDao experimentDao;
@Resource
private ExperimentInsDao experimentInsDao;

private List<Integer> experimentIds = new ArrayList<>();
@Scheduled(cron = "0/30 * * * * ?") // 每30S执行一次
public void executeExperimentInsStatus() throws IOException {
// 查到所有非终止态的实例
List<ExperimentIns> experimentInsList = experimentInsService.queryByExperimentIsNotTerminated();
// 去argo查询状态
List<ExperimentIns> updateList = new ArrayList<>();
if (experimentInsList != null && experimentInsList.size() > 0) {
for (ExperimentIns experimentIns : experimentInsList) {
//当原本状态为null或非终止态时才调用argo接口
String oldStatus = experimentIns.getStatus();
try {
experimentIns = experimentInsService.queryStatusFromArgo(experimentIns);
}catch (Exception e){
experimentIns.setStatus("Failed");
}
if (!StringUtils.equals(oldStatus,experimentIns.getStatus())){
experimentIns.setUpdateTime(new Date());
if (!experimentIds.contains(experimentIns.getExperimentId())){
experimentIds.add(experimentIns.getExperimentId());
}
updateList.add(experimentIns);
}
experimentInsDao.update(experimentIns);
}
}
if (updateList.size() > 0){
experimentInsDao.insertOrUpdateBatch(updateList);
}

}
@Scheduled(cron = "0/30 * * * * ?") // / 每30S执行一次
public void executeExperimentStatus() throws IOException {
if (experimentIds.size()==0){
return;
}
List<Experiment> updateexperiments = new ArrayList<>();
for (Integer experimentId : experimentIds){
List<ExperimentIns> insList = experimentInsService.getByExperimentId(experimentId);
List<String> statusList = new ArrayList<String>();
// 更新实验状态列表
for (int i=0;i<insList.size();i++){
statusList.add(insList.get(i).getStatus());
}
String subStatus = statusList.toString().substring(1, statusList.toString().length() - 1);
Experiment experiment = experimentDao.queryById(experimentId);
if (!StringUtils.equals(subStatus,experiment.getStatusList())){
updateexperiments.add(experiment);
}
}
if (updateexperiments.size() > 0){
experimentDao.insertOrUpdateBatch(updateexperiments);
}
experimentIds.clear();
System.out.println(experimentIds);
}
}

+ 5
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java View File

@@ -98,4 +98,9 @@ public interface ExperimentInsService {

String getRealtimePodLogFromPod(PodLogVo podLogVo);

/**
* 查询非终止态的实例
* @return
*/
List<ExperimentIns> queryByExperimentIsNotTerminated();
}

+ 59
- 50
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java View File

@@ -71,14 +71,16 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
@Override
public ExperimentIns queryById(Integer id) throws IOException {
ExperimentIns experimentIns = this.experimentInsDao.queryById(id);
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
experimentIns = this.queryStatusFromArgo(experimentIns);
//只有当新状态是终止态时才更新数据库
if (isTerminatedState(experimentIns)) {
//同时更新各个节点
this.update(experimentIns);
}
}

//已经迁移至定时任务进行更新操作
// if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
// experimentIns = this.queryStatusFromArgo(experimentIns);
// //只有当新状态是终止态时才更新数据库
// if (isTerminatedState(experimentIns)) {
// //同时更新各个节点
// this.update(experimentIns);
// }
// }
return experimentIns;
}

@@ -93,40 +95,42 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
@Override
public List<ExperimentIns> getByExperimentId(Integer experimentId) throws IOException {
List<ExperimentIns> experimentInsList = experimentInsDao.getByExperimentId(experimentId);
//搞个标记,当状态改变才去改表
boolean flag = false;
List<ExperimentIns> result = new ArrayList<ExperimentIns>();
if (experimentInsList!=null && experimentInsList.size()>0) {
for (ExperimentIns experimentIns : experimentInsList) {
//当原本状态为null或非终止态时才调用argo接口
if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
experimentIns = this.queryStatusFromArgo(experimentIns);
if (!flag){
flag = true;
}
//只有当新状态是终止态时才更新数据库
if (isTerminatedState(experimentIns)) {
//同时更新各个节点
this.update(experimentIns);
}
}

//新增查询tensorBoard容器状态
result.add(experimentIns);
}
}
if (flag) {
List<String> statusList = new ArrayList<String>();
// 更新实验状态列表
for (int i=0;i<result.size();i++){
statusList.add(result.get(i).getStatus());
}
Experiment experiment = experimentDao.queryById(experimentId);
experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1));
experimentDao.update(experiment);
}

return result;
//代码全部迁移至定时任务
//搞个标记,当状态改变才去改表
// boolean flag = false;
// List<ExperimentIns> result = new ArrayList<ExperimentIns>();
// if (experimentInsList!=null && experimentInsList.size()>0) {
// for (ExperimentIns experimentIns : experimentInsList) {
// //当原本状态为null或非终止态时才调用argo接口
// if (experimentIns != null && (StringUtils.isEmpty(experimentIns.getStatus())) || !isTerminatedState(experimentIns)) {
// experimentIns = this.queryStatusFromArgo(experimentIns);
// if (!flag){
// flag = true;
// }
// //只有当新状态是终止态时才更新数据库
// if (isTerminatedState(experimentIns)) {
// //同时更新各个节点
// this.update(experimentIns);
// }
// }
//
// //新增查询tensorBoard容器状态
// result.add(experimentIns);
// }
// }
// if (flag) {
// List<String> statusList = new ArrayList<String>();
// // 更新实验状态列表
// for (int i=0;i<result.size();i++){
// statusList.add(result.get(i).getStatus());
// }
// Experiment experiment = experimentDao.queryById(experimentId);
// experiment.setStatusList(statusList.toString().substring(1, statusList.toString().length()-1));
// experimentDao.update(experiment);
// }

return experimentInsList;

}

@@ -141,15 +145,15 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
public Page<ExperimentIns> queryByPage(ExperimentIns experimentIns, PageRequest pageRequest) throws IOException {
long total = this.experimentInsDao.count(experimentIns);
List<ExperimentIns> experimentInsList = this.experimentInsDao.queryAllByLimit(experimentIns, pageRequest);
if (experimentInsList!=null && experimentInsList.size()>0) {
for (ExperimentIns ins : experimentInsList) {
//如果实验实例不为空或者
if (ins != null && StringUtils.isEmpty(ins.getStatus())) {
ins = this.queryStatusFromArgo(ins);
this.update(ins);
}
}
}
// if (experimentInsList!=null && experimentInsList.size()>0) {
// for (ExperimentIns ins : experimentInsList) {
// //如果实验实例不为空或者
// if (ins != null && StringUtils.isEmpty(ins.getStatus())) {
// ins = this.queryStatusFromArgo(ins);
// this.update(ins);
// }
// }
// }
return new PageImpl<>(experimentInsList, pageRequest, total);
}

@@ -524,6 +528,11 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
return k8sClientUtil.getPodLogs(podLogVo.getPodName(), podLogVo.getNamespace(),podLogVo.getContainerName(), logsLines);
}

@Override
public List<ExperimentIns> queryByExperimentIsNotTerminated() {
return experimentInsDao.queryByExperimentIsNotTerminated();
}

private boolean isTerminatedState(ExperimentIns ins) throws IOException {
// 定义终止态的列表,例如 "Succeeded", "Failed" 等
String status = ins.getStatus();


+ 2
- 1
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java View File

@@ -256,8 +256,9 @@ public class ExperimentServiceImpl implements ExperimentService {
experimentIns.setExperimentId(experiment.getId());
experimentIns.setArgoInsNs((String) metadata.get("namespace"));
experimentIns.setArgoInsName((String) metadata.get("name"));
//传入实验全局参数
experimentIns.setStatus("Pending");

//传入实验全局参数

experimentIns.setGlobalParam(experiment.getGlobalParam());



+ 7
- 1
ruoyi-modules/management-platform/src/main/resources/mapper/managementPlatform/ExperimentInsDaoMapper.xml View File

@@ -21,7 +21,13 @@
<result property="state" column="state" jdbcType="INTEGER"/>
</resultMap>


<!--查询非终止态的实例-->
<select id="queryByExperimentIsNotTerminated" resultMap="ExperimentInsMap">
select id, experiment_id, argo_ins_name, argo_ins_ns, status, nodes_status,nodes_result, nodes_logs,global_param, start_time, finish_time, create_by, create_time, update_by, update_time, state
from experiment_ins
where (status NOT IN ('Terminated', 'Succeeded', 'Failed')
OR status IS NULL) and state = 1
</select>

<!--查询单个-->
<select id="queryById" resultMap="ExperimentInsMap">


Loading…
Cancel
Save