Browse Source

自动超参数寻优实验功能开发

dev-ray
chenzhihang 1 year ago
parent
commit
8d6f0e2cb5
8 changed files with 379 additions and 12 deletions
  1. +2
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/mapper/RayInsDao.java
  2. +2
    -1
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/scheduling/AutoMlInsStatusTask.java
  3. +95
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/scheduling/RayInsStatusTask.java
  4. +4
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/RayInsService.java
  5. +137
    -9
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/RayInsServiceImpl.java
  6. +81
    -2
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/RayServiceImpl.java
  7. +50
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/vo/RayParamVo.java
  8. +8
    -0
      ruoyi-modules/management-platform/src/main/resources/mapper/managementPlatform/RayInsDaoMapper.xml

+ 2
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/mapper/RayInsDao.java View File

@@ -18,4 +18,6 @@ public interface RayInsDao {
int insert(@Param("rayIns") RayIns rayIns);

int update(@Param("rayIns") RayIns rayIns);

List<RayIns> queryByRayInsIsNotTerminated();
}

+ 2
- 1
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/scheduling/AutoMlInsStatusTask.java View File

@@ -1,5 +1,6 @@
package com.ruoyi.platform.scheduling;

import com.ruoyi.platform.constant.Constant;
import com.ruoyi.platform.domain.AutoMl;
import com.ruoyi.platform.domain.AutoMlIns;
import com.ruoyi.platform.mapper.AutoMlDao;
@@ -41,7 +42,7 @@ public class AutoMlInsStatusTask {
try {
autoMlIns = autoMlInsService.queryStatusFromArgo(autoMlIns);
} catch (Exception e) {
autoMlIns.setStatus("Failed");
autoMlIns.setStatus(Constant.Failed);
}
// 线程安全的添加操作
synchronized (autoMlIds) {


+ 95
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/scheduling/RayInsStatusTask.java View File

@@ -0,0 +1,95 @@
package com.ruoyi.platform.scheduling;

import com.ruoyi.platform.constant.Constant;
import com.ruoyi.platform.domain.Ray;
import com.ruoyi.platform.domain.RayIns;
import com.ruoyi.platform.mapper.RayDao;
import com.ruoyi.platform.mapper.RayInsDao;
import com.ruoyi.platform.service.RayInsService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

@Component()
public class RayInsStatusTask {

@Resource
private RayInsService rayInsService;
@Resource
private RayInsDao rayInsDao;
@Resource
private RayDao rayDao;

private List<Long> rayIds = new ArrayList<>();

@Scheduled(cron = "0/30 * * * * ?") // 每30S执行一次
public void executeRayInsStatus() {
List<RayIns> rayInsList = rayInsService.queryByRayInsIsNotTerminated();

// 去argo查询状态
List<RayIns> updateList = new ArrayList<>();
if (rayInsList != null && rayInsList.size() > 0) {
for (RayIns rayIns : rayInsList) {
//当原本状态为null或非终止态时才调用argo接口
try {
rayIns = rayInsService.queryStatusFromArgo(rayIns);
} catch (Exception e) {
rayIns.setStatus(Constant.Failed);
}
// 线程安全的添加操作
synchronized (rayIds) {
rayIds.add(rayIns.getRayId());
}
updateList.add(rayIns);
}
if (updateList.size() > 0) {
for (RayIns rayIns : updateList) {
rayInsDao.update(rayIns);
}
}
}
}

@Scheduled(cron = "0/30 * * * * ?") // / 每30S执行一次
public void executeRayStatus() {
if (rayIds.size() == 0) {
return;
}
// 存储需要更新的实验对象列表
List<Ray> updateRays = new ArrayList<>();
for (Long rayId : rayIds) {
// 获取当前实验的所有实例列表
List<RayIns> insList = rayInsDao.getByRayId(rayId);
List<String> statusList = new ArrayList<>();
// 更新实验状态列表
for (int i = 0; i < insList.size(); i++) {
statusList.add(insList.get(i).getStatus());
}
String subStatus = statusList.toString().substring(1, statusList.toString().length() - 1);
Ray ray = rayDao.getRayById(rayId);
if (!StringUtils.equals(ray.getStatusList(), subStatus)) {
ray.setStatusList(subStatus);
updateRays.add(ray);
rayDao.edit(ray);
}
}

if (!updateRays.isEmpty()) {
// 使用Iterator进行安全的删除操作
Iterator<Long> iterator = rayIds.iterator();
while (iterator.hasNext()) {
Long rayId = iterator.next();
for (Ray ray : updateRays) {
if (ray.getId().equals(rayId)) {
iterator.remove();
}
}
}
}
}
}

+ 4
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/RayInsService.java View File

@@ -18,4 +18,8 @@ public interface RayInsService {
RayIns getDetailById(Long id) throws IOException;

void updateRayStatus(Long rayId);

RayIns queryStatusFromArgo(RayIns ins);

List<RayIns> queryByRayInsIsNotTerminated();
}

+ 137
- 9
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/RayInsServiceImpl.java View File

@@ -6,8 +6,11 @@ import com.ruoyi.platform.domain.RayIns;
import com.ruoyi.platform.mapper.RayDao;
import com.ruoyi.platform.mapper.RayInsDao;
import com.ruoyi.platform.service.RayInsService;
import com.ruoyi.platform.utils.DateUtils;
import com.ruoyi.platform.utils.HttpUtils;
import com.ruoyi.platform.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
@@ -23,6 +26,13 @@ import java.util.stream.Collectors;

@Service("rayInsService")
public class RayInsServiceImpl implements RayInsService {
@Value("${argo.url}")
private String argoUrl;
@Value("${argo.workflowStatus}")
private String argoWorkflowStatus;
@Value("${argo.workflowTermination}")
private String argoWorkflowTermination;

@Resource
private RayInsDao rayInsDao;

@@ -49,7 +59,7 @@ public class RayInsServiceImpl implements RayInsService {
return "实验实例不存在";
}
if (StringUtils.isEmpty(rayIns.getStatus())) {
//todo queryStatusFromArgo
rayIns = queryStatusFromArgo(rayIns);
}
if (StringUtils.equals(rayIns.getStatus(), Constant.Running)) {
return "实验实例正在运行,不可删除";
@@ -89,7 +99,7 @@ public class RayInsServiceImpl implements RayInsService {

// 获取当前状态,如果为空,则从Argo查询
if (StringUtils.isEmpty(currentStatus)) {
// todo queryStatusFromArgo
currentStatus = queryStatusFromArgo(rayIns).getStatus();
}

// 只有状态是"Running"时才能终止实例
@@ -97,20 +107,64 @@ public class RayInsServiceImpl implements RayInsService {
throw new Exception("终止错误,只有运行状态的实例才能终止"); // 如果不是"Running"状态,则不执行终止操作
}

//todo terminateFromArgo
// 创建请求数据map
Map<String, Object> requestData = new HashMap<>();
requestData.put("namespace", namespace);
requestData.put("name", name);
// 创建发送数据map,将请求数据作为"data"键的值
Map<String, Object> res = new HashMap<>();
res.put("data", requestData);

try {
// 发送POST请求到Argo工作流状态查询接口,并将请求数据转换为JSON
String req = HttpUtils.sendPost(argoUrl + argoWorkflowTermination, null, JsonUtils.mapToJson(res));
// 检查响应是否为空或无内容
if (StringUtils.isEmpty(req)) {
throw new RuntimeException("终止响应内容为空。");

}
// 将响应的JSON字符串转换为Map对象
Map<String, Object> runResMap = JsonUtils.jsonToMap(req);
// 从响应Map中直接获取"errCode"的值
Integer errCode = (Integer) runResMap.get("errCode");
if (errCode != null && errCode == 0) {
//更新autoMlIns,确保状态更新被保存到数据库
RayIns ins = queryStatusFromArgo(rayIns);
String nodeStatus = ins.getNodeStatus();
Map<String, Object> nodeMap = JsonUtils.jsonToMap(nodeStatus);

rayIns.setStatus(Constant.Terminated);
rayIns.setFinishTime(new Date());
this.rayInsDao.update(rayIns);
updateRayStatus(rayIns.getRayId());
return true;
// 遍历 map
for (Map.Entry<String, Object> entry : nodeMap.entrySet()) {
// 获取每个 Map 中的值并强制转换为 Map
Map<String, Object> innerMap = (Map<String, Object>) entry.getValue();
// 检查 phase 的值
if (innerMap.containsKey("phase")) {
String phaseValue = (String) innerMap.get("phase");
// 如果值不等于 Succeeded,则赋值为 Failed
if (!StringUtils.equals("Succeeded", phaseValue)) {
innerMap.put("phase", "Failed");
}
}
}
ins.setNodeStatus(JsonUtils.mapToJson(nodeMap));
ins.setStatus(Constant.Terminated);
ins.setUpdateTime(new Date());
rayInsDao.update(ins);
updateRayStatus(rayIns.getRayId());
return true;
} else {
return false;
}
} catch (Exception e) {
throw new RuntimeException("终止实例错误: " + e.getMessage(), e);
}
}

@Override
public RayIns getDetailById(Long id) throws IOException {
RayIns rayIns = rayInsDao.queryById(id);
if (Constant.Running.equals(rayIns.getStatus()) || Constant.Pending.equals(rayIns.getStatus())) {
//todo queryStatusFromArgo
rayIns = queryStatusFromArgo(rayIns);
}
rayIns.setTrialList(getTrialList(rayIns.getResultPath()));
return rayIns;
@@ -132,6 +186,80 @@ public class RayInsServiceImpl implements RayInsService {
}
}

@Override
public RayIns queryStatusFromArgo(RayIns ins) {
String namespace = ins.getArgoInsNs();
String name = ins.getArgoInsName();

// 创建请求数据map
Map<String, Object> requestData = new HashMap<>();
requestData.put("namespace", namespace);
requestData.put("name", name);

// 创建发送数据map,将请求数据作为"data"键的值
Map<String, Object> res = new HashMap<>();
res.put("data", requestData);

try {
// 发送POST请求到Argo工作流状态查询接口,并将请求数据转换为JSON
String req = HttpUtils.sendPost(argoUrl + argoWorkflowStatus, null, JsonUtils.mapToJson(res));
// 检查响应是否为空或无内容
if (req == null || StringUtils.isEmpty(req)) {
throw new RuntimeException("工作流状态响应为空。");
}
// 将响应的JSON字符串转换为Map对象
Map<String, Object> runResMap = JsonUtils.jsonToMap(req);
// 从响应Map中获取"data"部分
Map<String, Object> data = (Map<String, Object>) runResMap.get("data");
if (data == null || data.isEmpty()) {
throw new RuntimeException("工作流数据为空.");
}
// 从"data"中获取"status"部分,并返回"phase"的值
Map<String, Object> status = (Map<String, Object>) data.get("status");
if (status == null || status.isEmpty()) {
throw new RuntimeException("工作流状态为空。");
}

//解析流水线结束时间
String finishedAtString = (String) status.get("finishedAt");
if (finishedAtString != null && !finishedAtString.isEmpty()) {
Date finishTime = DateUtils.convertUTCtoShanghaiDate(finishedAtString);
ins.setFinishTime(finishTime);
}

// 解析nodes字段,提取节点状态并转换为JSON字符串
Map<String, Object> nodes = (Map<String, Object>) status.get("nodes");
Map<String, Object> modifiedNodes = new LinkedHashMap<>();
if (nodes != null) {
for (Map.Entry<String, Object> nodeEntry : nodes.entrySet()) {
Map<String, Object> nodeDetails = (Map<String, Object>) nodeEntry.getValue();
String templateName = (String) nodeDetails.get("displayName");
modifiedNodes.put(templateName, nodeDetails);
}
}


String nodeStatusJson = JsonUtils.mapToJson(modifiedNodes);
ins.setNodeStatus(nodeStatusJson);

//终止态为终止不改
if (!StringUtils.equals(ins.getStatus(), Constant.Terminated)) {
ins.setStatus(StringUtils.isNotEmpty((String) status.get("phase")) ? (String) status.get("phase") : Constant.Pending);
}
if (StringUtils.equals(ins.getStatus(), "Error")) {
ins.setStatus(Constant.Failed);
}
return ins;
} catch (Exception e) {
throw new RuntimeException("查询状态失败: " + e.getMessage(), e);
}
}

@Override
public List<RayIns> queryByRayInsIsNotTerminated() {
return rayInsDao.queryByRayInsIsNotTerminated();
}

public ArrayList<Map<String, Object>> getTrialList(String directoryPath) throws IOException {
// 获取指定路径下的所有文件
Path dirPath = Paths.get(directoryPath);


+ 81
- 2
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/RayServiceImpl.java View File

@@ -4,14 +4,22 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.platform.constant.Constant;
import com.ruoyi.platform.domain.AutoMlIns;
import com.ruoyi.platform.domain.Ray;
import com.ruoyi.platform.domain.RayIns;
import com.ruoyi.platform.mapper.RayDao;
import com.ruoyi.platform.mapper.RayInsDao;
import com.ruoyi.platform.service.RayInsService;
import com.ruoyi.platform.service.RayService;
import com.ruoyi.platform.utils.HttpUtils;
import com.ruoyi.platform.utils.JacksonUtil;
import com.ruoyi.platform.utils.JsonUtils;
import com.ruoyi.platform.vo.RayParamVo;
import com.ruoyi.platform.vo.RayVo;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
@@ -20,13 +28,30 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service("rayService")
public class RayServiceImpl implements RayService {

@Value("${argo.url}")
private String argoUrl;
@Value("${argo.convertRay}")
String convertRay;
@Value("${argo.workflowRun}")
private String argoWorkflowRun;

@Value("${minio.endpoint}")
private String minioEndpoint;

@Resource
private RayDao rayDao;
@Resource
private RayInsDao rayInsDao;
@Resource
private RayInsService rayInsService;

@Override
public Page<Ray> queryByPage(String name, PageRequest pageRequest) {
@@ -126,7 +151,61 @@ public class RayServiceImpl implements RayService {
if (ray == null) {
throw new Exception("自动超参数寻优配置不存在");
}
//todo argo
return null;

RayParamVo rayParamVo = new RayParamVo();
BeanUtils.copyProperties(ray, rayParamVo);
rayParamVo.setCodeConfig(JsonUtils.jsonToMap(ray.getCodeConfig()));
rayParamVo.setDataset(JsonUtils.jsonToMap(ray.getDataset()));
rayParamVo.setModel(JsonUtils.jsonToMap(ray.getModel()));
rayParamVo.setImage(JsonUtils.jsonToMap(ray.getImage()));
String param = JsonUtils.objectToJson(rayParamVo);

// 调argo转换接口
try {
String convertRes = HttpUtils.sendPost(argoUrl + convertRay, param);
if (convertRes == null || StringUtils.isEmpty(convertRes)) {
throw new RuntimeException("转换流水线失败");
}
Map<String, Object> converMap = JsonUtils.jsonToMap(convertRes);
// 组装运行接口json
Map<String, Object> output = (Map<String, Object>) converMap.get("output");
Map<String, Object> runReqMap = new HashMap<>();
runReqMap.put("data", converMap.get("data"));
// 调argo运行接口
String runRes = HttpUtils.sendPost(argoUrl + argoWorkflowRun, JsonUtils.mapToJson(runReqMap));
if (runRes == null || StringUtils.isEmpty(runRes)) {
throw new RuntimeException("Failed to run workflow.");
}
Map<String, Object> runResMap = JsonUtils.jsonToMap(runRes);
Map<String, Object> data = (Map<String, Object>) runResMap.get("data");
//判断data为空
if (data == null || MapUtils.isEmpty(data)) {
throw new RuntimeException("Failed to run workflow.");
}
Map<String, Object> metadata = (Map<String, Object>) data.get("metadata");

// 插入记录到实验实例表
RayIns rayIns = new RayIns();
rayIns.setRayId(ray.getId());
rayIns.setArgoInsNs((String) metadata.get("namespace"));
rayIns.setArgoInsName((String) metadata.get("name"));
rayIns.setParam(param);
rayIns.setStatus(Constant.Pending);
//替换argoInsName
String outputString = JsonUtils.mapToJson(output);
rayIns.setNodeResult(outputString.replace("{{workflow.name}}", (String) metadata.get("name")));

Map<String, Object> param_output = (Map<String, Object>) output.get("param_output");
List output1 = (ArrayList) param_output.values().toArray()[0];
Map<String, String> output2 = (Map<String, String>) output1.get(0);
String outputPath = minioEndpoint + "/" + output2.get("path").replace("{{workflow.name}}", (String) metadata.get("name")) + "/";

rayIns.setResultPath(outputPath);
rayInsDao.insert(rayIns);
rayInsService.updateRayStatus(id);
} catch (Exception e) {
throw new RuntimeException(e);
}
return "执行成功";
}
}

+ 50
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/vo/RayParamVo.java View File

@@ -0,0 +1,50 @@
package com.ruoyi.platform.vo;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.swagger.annotations.ApiModel;
import lombok.Data;

import java.util.Map;

@Data
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@ApiModel(description = "超参数寻优参数")
public class RayParamVo {

private Map<String,Object> codeConfig;

private Map<String,Object> dataset;

private Map<String,Object> image;

private Map<String,Object> model;

private String mainPy;

private String name;

private Integer numSamples;

private String parameters;

private String pointsToEvaluate;

private String storagePath;

private String searchAlg;

private String scheduler;

private String metric;

private String mode;

private Integer maxT;

private Integer minSamplesRequired;

private String resource;
}

+ 8
- 0
ruoyi-modules/management-platform/src/main/resources/mapper/managementPlatform/RayInsDaoMapper.xml View File

@@ -66,4 +66,12 @@
and state = 1
order by update_time DESC limit 5
</select>

<select id="queryByRayInsIsNotTerminated" resultType="com.ruoyi.platform.domain.RayIns">
select *
from ray_ins
where (status NOT IN ('Terminated', 'Succeeded', 'Failed')
OR status IS NULL)
and state = 1
</select>
</mapper>

Loading…
Cancel
Save