| @@ -49,7 +49,7 @@ public class ComputingResource implements Serializable { | |||
| private Integer gpuNums; | |||
| @ApiModelProperty("积分/小时") | |||
| private Float creditPerHour; | |||
| private Double creditPerHour; | |||
| @ApiModelProperty("标签") | |||
| private String labels; | |||
| @@ -23,7 +23,10 @@ public class ResourceOccupy { | |||
| private String description; | |||
| @ApiModelProperty("积分/小时") | |||
| private Float creditPerHour; | |||
| private Double creditPerHour; | |||
| @ApiModelProperty("扣除的积分") | |||
| private Double deduceCredit; | |||
| @ApiModelProperty("上一次扣分时间") | |||
| private Date deduceLastTime; | |||
| @@ -18,6 +18,8 @@ import org.springframework.stereotype.Component; | |||
| import javax.annotation.Resource; | |||
| import java.io.IOException; | |||
| import java.text.SimpleDateFormat; | |||
| import java.time.Instant; | |||
| import java.util.*; | |||
| @Component() | |||
| @@ -47,19 +49,26 @@ public class ExperimentInstanceStatusTask { | |||
| try { | |||
| experimentIns = experimentInsService.queryStatusFromArgo(experimentIns); | |||
| } catch (Exception e) { | |||
| experimentIns.setStatus("Failed"); | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Workflow, Long.valueOf(experimentIns.getId()), null); | |||
| experimentIns.setStatus(Constant.Failed); | |||
| // 结束扣除积分 | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Workflow, Long.valueOf(experimentIns.getId()), null, null); | |||
| } | |||
| // 扣除积分 | |||
| Map<String, Object> nodesStatusMap = JsonUtils.jsonToMap(experimentIns.getNodesStatus()); | |||
| for (String key : nodesStatusMap.keySet()) { | |||
| Map<String, Object> value = (Map<String, Object>) nodesStatusMap.get(key); | |||
| Date finishedAt = (Date) value.get("finishedAt"); | |||
| if (finishedAt == null) { | |||
| resourceOccupyService.deducing(Constant.TaskType_Workflow, Long.valueOf(experimentIns.getId()), key); | |||
| } else { | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Workflow, Long.valueOf(experimentIns.getId()), key); | |||
| if (StringUtils.isNotEmpty(experimentIns.getNodesStatus())) { | |||
| Map<String, Object> nodesStatusMap = JsonUtils.jsonToMap(experimentIns.getNodesStatus()); | |||
| for (String key : nodesStatusMap.keySet()) { | |||
| Map<String, Object> value = (Map<String, Object>) nodesStatusMap.get(key); | |||
| String startedAt = (String) value.get("startedAt"); | |||
| Instant instant = Instant.parse(startedAt); | |||
| Date startTime = Date.from(instant); | |||
| String finishedAt = (String) value.get("finishedAt"); | |||
| if (StringUtils.isEmpty(finishedAt)) { | |||
| resourceOccupyService.deducing(Constant.TaskType_Workflow, Long.valueOf(experimentIns.getId()), key, startTime); | |||
| } else { | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Workflow, Long.valueOf(experimentIns.getId()), key, startTime); | |||
| } | |||
| } | |||
| } | |||
| //运行成功的实验实例记录指标数值 | |||
| @@ -42,15 +42,18 @@ public class RayInsStatusTask { | |||
| //当原本状态为null或非终止态时才调用argo接口 | |||
| try { | |||
| rayIns = rayInsService.queryStatusFromArgo(rayIns); | |||
| // 扣除积分 | |||
| if (Constant.Running.equals(rayIns.getStatus())) { | |||
| resourceOccupyService.deducing(Constant.TaskType_Ray, rayIns.getId(), null); | |||
| resourceOccupyService.deducing(Constant.TaskType_Ray, rayIns.getId(), null,null); | |||
| } else if (Constant.Failed.equals(rayIns.getStatus()) || Constant.Terminated.equals(rayIns.getStatus()) | |||
| || Constant.Succeeded.equals(rayIns.getStatus())) { | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Ray, rayIns.getId(), null); | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Ray, rayIns.getId(), null,null); | |||
| } | |||
| } catch (Exception e) { | |||
| rayIns.setStatus(Constant.Failed); | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Ray, rayIns.getId(), null); | |||
| // 结束扣除积分 | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Ray, rayIns.getId(), null, null); | |||
| } | |||
| // 线程安全的添加操作 | |||
| synchronized (rayIds) { | |||
| @@ -25,11 +25,11 @@ public class ResourceOccupyTask { | |||
| private ServiceDao serviceDao; | |||
| // 开发环境功能扣除积分 | |||
| @Scheduled(cron = "0 0/10 * * * ?") // 每10分钟执行一次 | |||
| @Scheduled(cron = "0 0/1 * * * ?") // 每10分钟执行一次 | |||
| public void devDeduceCredit() { | |||
| List<DevEnvironment> devEnvironments = devEnvironmentDao.getRunning(); | |||
| for (DevEnvironment devEnvironment : devEnvironments) { | |||
| resourceOccupyService.deducing(Constant.TaskType_Dev, Long.valueOf(devEnvironment.getId()), null); | |||
| resourceOccupyService.deducing(Constant.TaskType_Dev, Long.valueOf(devEnvironment.getId()), null, null); | |||
| } | |||
| } | |||
| @@ -38,7 +38,7 @@ public class ResourceOccupyTask { | |||
| public void serviceDeduceCredit() { | |||
| List<ServiceVersion> serviceVersions = serviceDao.getRunning(); | |||
| for (ServiceVersion serviceVersion : serviceVersions) { | |||
| resourceOccupyService.deducing(Constant.TaskType_Service, serviceVersion.getId(), null); | |||
| resourceOccupyService.deducing(Constant.TaskType_Service, serviceVersion.getId(), null, null); | |||
| } | |||
| } | |||
| } | |||
| @@ -4,15 +4,17 @@ import com.ruoyi.platform.domain.ResourceOccupy; | |||
| import org.springframework.data.domain.Page; | |||
| import org.springframework.data.domain.PageRequest; | |||
| import java.util.Date; | |||
| public interface ResourceOccupyService { | |||
| Boolean haveResource(Integer computingResourceId) throws Exception; | |||
| void startDeduce(Integer computingResourceId, String taskType, Long taskId, String nodeId); | |||
| void endDeduce(String taskType, Long taskId, String nodeId); | |||
| void endDeduce(String taskType, Long taskId, String nodeId, Date nodeStartTime); | |||
| void deducing(String taskType, Long taskId, String nodeId); | |||
| void deducing(String taskType, Long taskId, String nodeId, Date nodeStartTime); | |||
| Page<ResourceOccupy> queryByPage(PageRequest pageRequest); | |||
| } | |||
| @@ -3,6 +3,7 @@ package com.ruoyi.platform.service.impl; | |||
| import cn.hutool.json.JSONUtil; | |||
| import com.alibaba.fastjson2.JSON; | |||
| import com.ruoyi.common.security.utils.SecurityUtils; | |||
| import com.ruoyi.platform.constant.Constant; | |||
| import com.ruoyi.platform.domain.DatasetTempStorage; | |||
| import com.ruoyi.platform.domain.Experiment; | |||
| import com.ruoyi.platform.domain.ExperimentIns; | |||
| @@ -12,6 +13,7 @@ import com.ruoyi.platform.mapper.ExperimentDao; | |||
| import com.ruoyi.platform.mapper.ExperimentInsDao; | |||
| import com.ruoyi.platform.mapper.ModelDependency1Dao; | |||
| import com.ruoyi.platform.service.ExperimentInsService; | |||
| import com.ruoyi.platform.service.ResourceOccupyService; | |||
| import com.ruoyi.platform.utils.*; | |||
| import com.ruoyi.platform.vo.LogRequestVo; | |||
| import com.ruoyi.platform.vo.PodLogVo; | |||
| @@ -66,6 +68,9 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||
| @Resource | |||
| private DatasetTempStorageDao datasetTempStorageDao; | |||
| @Resource | |||
| private ResourceOccupyService resourceOccupyService; | |||
| private final MinioUtil minioUtil; | |||
| public ExperimentInsServiceImpl(MinioUtil minioUtil) { | |||
| @@ -410,6 +415,8 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||
| //修改实验状态 | |||
| updateExperimentStatus(experimentIns.getExperimentId()); | |||
| // 结束扣除积分 | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Workflow, Long.valueOf(experimentIns.getId()), null, null); | |||
| return true; | |||
| } else { | |||
| throw new Exception("终止错误"); | |||
| @@ -133,9 +133,6 @@ public class JupyterServiceImpl implements JupyterService { | |||
| return "pod不存在!"; | |||
| } | |||
| // 结束扣积分 | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Dev, Long.valueOf(id), null); | |||
| // 使用 Kubernetes API 删除 Pod | |||
| String deleteResult = k8sClientUtil.deletePod(podName, namespace); | |||
| // 删除service | |||
| @@ -143,6 +140,9 @@ public class JupyterServiceImpl implements JupyterService { | |||
| devEnvironment.setStatus(Constant.Terminated); | |||
| this.devEnvironmentService.update(devEnvironment); | |||
| // 结束扣积分 | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Dev, Long.valueOf(id), null, null); | |||
| return deleteResult + ",编辑器已停止"; | |||
| } | |||
| @@ -168,7 +168,7 @@ public class RayInsServiceImpl implements RayInsService { | |||
| rayInsDao.update(ins); | |||
| updateRayStatus(rayIns.getRayId()); | |||
| // 结束扣积分 | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Ray, id, null); | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Ray, id, null, null); | |||
| return true; | |||
| } else { | |||
| return false; | |||
| @@ -205,6 +205,7 @@ public class RayServiceImpl implements RayService { | |||
| rayIns.setResultPath(outputPath); | |||
| rayInsDao.insert(rayIns); | |||
| rayInsService.updateRayStatus(id); | |||
| // 记录开始扣除积分 | |||
| resourceOccupyService.startDeduce(ray.getComputingResourceId(), Constant.TaskType_Ray, rayIns.getId(), null); | |||
| } catch (Exception e) { | |||
| throw new RuntimeException(e); | |||
| @@ -12,6 +12,7 @@ import org.springframework.data.domain.Page; | |||
| import org.springframework.data.domain.PageImpl; | |||
| import org.springframework.data.domain.PageRequest; | |||
| import org.springframework.stereotype.Service; | |||
| import org.springframework.transaction.annotation.Transactional; | |||
| import javax.annotation.Resource; | |||
| import java.util.Date; | |||
| @@ -49,6 +50,7 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| } | |||
| @Override | |||
| @Transactional | |||
| public void startDeduce(Integer computingResourceId, String taskType, Long taskId, String nodeId) { | |||
| ResourceOccupy resourceOccupy = new ResourceOccupy(); | |||
| ComputingResource computingResource = computingResourceDao.queryById(computingResourceId); | |||
| @@ -70,10 +72,11 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| } | |||
| @Override | |||
| public void endDeduce(String taskType, Long taskId, String nodeId) { | |||
| @Transactional | |||
| public void endDeduce(String taskType, Long taskId, String nodeId, Date nodeStartTime) { | |||
| List<ResourceOccupy> resourceOccupys = resourceOccupyDao.getResourceOccupyByTask(taskType, taskId, nodeId); | |||
| for (ResourceOccupy resourceOccupy : resourceOccupys) { | |||
| deducing(taskType, taskId, nodeId); | |||
| deducing(taskType, taskId, nodeId, nodeStartTime); | |||
| resourceOccupy.setState(Constant.State_invalid); | |||
| resourceOccupyDao.edit(resourceOccupy); | |||
| @@ -87,16 +90,23 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| } | |||
| @Override | |||
| public void deducing(String taskType, Long taskId, String nodeId) { | |||
| @Transactional | |||
| public void deducing(String taskType, Long taskId, String nodeId, Date nodeStartTime) { | |||
| List<ResourceOccupy> resourceOccupys = resourceOccupyDao.getResourceOccupyByTask(taskType, taskId, nodeId); | |||
| for (ResourceOccupy resourceOccupy : resourceOccupys) { | |||
| long timeDifferenceMillis = new Date().getTime() - resourceOccupy.getDeduceLastTime().getTime(); | |||
| Date now = new Date(); | |||
| long timeDifferenceMillis; | |||
| if (nodeStartTime != null && resourceOccupy.getDeduceLastTime().before(nodeStartTime)) { | |||
| timeDifferenceMillis = now.getTime() - nodeStartTime.getTime(); | |||
| } else { | |||
| timeDifferenceMillis = now.getTime() - resourceOccupy.getDeduceLastTime().getTime(); | |||
| } | |||
| Double hours = (double) timeDifferenceMillis / (1000 * 60 * 60); | |||
| Double deduceCredit = resourceOccupy.getCreditPerHour() * hours; | |||
| resourceOccupyDao.deduceCredit(deduceCredit, resourceOccupy.getUserId()); | |||
| resourceOccupy.setDeduceLastTime(new Date()); | |||
| resourceOccupy.setDeduceCredit(resourceOccupy.getDeduceCredit() + deduceCredit); | |||
| resourceOccupy.setDeduceLastTime(now); | |||
| resourceOccupyDao.edit(resourceOccupy); | |||
| } | |||
| } | |||
| @@ -252,19 +252,21 @@ public class ServiceServiceImpl implements ServiceService { | |||
| paramMap.put("service_type", service.getServiceType()); | |||
| paramMap.put("deploy_type", serviceVersion.getDeployType()); | |||
| // 记录开始扣积分 | |||
| // 判断是否有资源 | |||
| if (resourceOccupyService.haveResource(serviceVersion.getComputingResourceId())) { | |||
| String req = HttpUtils.sendPost(argoUrl + modelService + "/create", JSON.toJSONString(paramMap)); | |||
| if (StringUtils.isNotEmpty(req)) { | |||
| Map<String, Object> reqMap = JacksonUtil.parseJSONStr2Map(req); | |||
| if ((Integer) reqMap.get("code") == 200) { | |||
| resourceOccupyService.startDeduce(serviceVersion.getComputingResourceId(), Constant.TaskType_Service, serviceVersion.getId(), null); | |||
| Map<String, String> data = (Map<String, String>) reqMap.get("data"); | |||
| serviceVersion.setUrl(data.get("url")); | |||
| serviceVersion.setDeploymentName(data.get("deployment_name")); | |||
| serviceVersion.setSvcName(data.get("svc_name")); | |||
| serviceVersion.setRunState(Constant.Pending); | |||
| serviceDao.updateServiceVersion(serviceVersion); | |||
| // 记录开始扣积分 | |||
| resourceOccupyService.startDeduce(serviceVersion.getComputingResourceId(), Constant.TaskType_Service, serviceVersion.getId(), null); | |||
| return "启动成功"; | |||
| } else { | |||
| throw new RuntimeException("启动失败"); | |||
| @@ -286,7 +288,7 @@ public class ServiceServiceImpl implements ServiceService { | |||
| serviceVersion.setRunState(Constant.Stopped); | |||
| serviceDao.updateServiceVersion(serviceVersion); | |||
| // 结束扣积分 | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Service, id, null); | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Service, id, null, null); | |||
| return "停止成功"; | |||
| } else { | |||
| throw new RuntimeException("停止失败"); | |||
| @@ -516,6 +516,7 @@ public class K8sClientUtil { | |||
| pod = api.createNamespacedPod(namespace, pod, null, null, null); | |||
| String nodeName = getNodeName(podName, namespace); | |||
| // 记录开始扣除积分 | |||
| resourceOccupyService.startDeduce(devEnvironment.getComputingResourceId(), Constant.TaskType_Dev, Long.valueOf(devEnvironment.getId()), null); | |||
| } | |||
| } catch (ApiException e) { | |||
| @@ -696,8 +697,7 @@ public class K8sClientUtil { | |||
| ComputingResource computingResource = computingResourceDao.queryById(computingResourceId); | |||
| //配置pod资源 | |||
| String memory = computingResource.getMemoryGb().toString(); | |||
| memory = memory.substring(0, memory.length() - 1).concat("i"); | |||
| String memory = computingResource.getMemoryGb().toString().concat("Gi"); | |||
| HashMap<String, Quantity> limitMap = new HashMap<>(); | |||
| if (computingResource.getGpuNums() != null && computingResource.getGpuNums() != 0) { | |||
| limitMap.put("nvidia.com/gpu", new Quantity(String.valueOf(computingResource.getGpuNums()))); | |||
| @@ -18,6 +18,9 @@ | |||
| <if test="resourceOccupy.deduceLastTime != null"> | |||
| deduce_last_time = #{resourceOccupy.deduceLastTime}, | |||
| </if> | |||
| <if test="resourceOccupy.deduceCredit != null"> | |||
| deduce_credit = #{resourceOccupy.deduceCredit}, | |||
| </if> | |||
| </set> | |||
| where id = #{resourceOccupy.id} | |||
| </update> | |||
| @@ -52,13 +55,13 @@ | |||
| where task_type = #{taskType} | |||
| and task_id = #{taskId} | |||
| <if test="nodeId != null and nodeId !=''"> | |||
| node_id = #{nodeId}, | |||
| and node_id = #{nodeId} | |||
| </if> | |||
| and state = 1 | |||
| </select> | |||
| <select id="count" resultType="java.lang.Long"> | |||
| select count(1) resource_occupy | |||
| select count(1) from resource_occupy | |||
| </select> | |||
| <select id="queryByPage" resultType="com.ruoyi.platform.domain.ResourceOccupy"> | |||