| @@ -3,6 +3,7 @@ package com.ruoyi.platform.scheduling; | |||
| import com.ruoyi.platform.constant.Constant; | |||
| import com.ruoyi.platform.domain.Experiment; | |||
| import com.ruoyi.platform.domain.ExperimentIns; | |||
| import com.ruoyi.platform.domain.ResourceOccupy; | |||
| import com.ruoyi.platform.mapper.ExperimentDao; | |||
| import com.ruoyi.platform.mapper.ExperimentInsDao; | |||
| import com.ruoyi.platform.service.AimService; | |||
| @@ -18,7 +19,6 @@ import org.springframework.stereotype.Component; | |||
| import javax.annotation.Resource; | |||
| import java.io.IOException; | |||
| import java.text.SimpleDateFormat; | |||
| import java.time.Instant; | |||
| import java.util.*; | |||
| @@ -62,11 +62,14 @@ public class ExperimentInstanceStatusTask { | |||
| String startedAt = (String) value.get("startedAt"); | |||
| Instant instant = Instant.parse(startedAt); | |||
| Date startTime = Date.from(instant); | |||
| String phase = (String) value.get("phase"); | |||
| String finishedAt = (String) value.get("finishedAt"); | |||
| if (StringUtils.isEmpty(finishedAt)) { | |||
| if (StringUtils.isEmpty(finishedAt) && Constant.Running.equals(phase)) { | |||
| ResourceOccupy resourceOccupy = new ResourceOccupy(); | |||
| resourceOccupy.setState(Constant.State_valid); | |||
| resourceOccupyService.deducing(Constant.TaskType_Workflow, null, Long.valueOf(experimentIns.getId()), key, startTime); | |||
| } else { | |||
| } else if (StringUtils.isNotEmpty(finishedAt)) { | |||
| resourceOccupyService.endDeduce(Constant.TaskType_Workflow, null, Long.valueOf(experimentIns.getId()), key, startTime); | |||
| } | |||
| } | |||
| @@ -9,9 +9,9 @@ import java.util.Map; | |||
| public interface ResourceOccupyService { | |||
| Boolean haveResource(Integer computingResourceId) throws Exception; | |||
| Boolean haveResource(Integer computingResourceId, Integer replicas) throws Exception; | |||
| void startDeduce(Integer computingResourceId, String taskType, Long taskId, Long taskInsId, Long workflowId, String taskName, String nodeId); | |||
| void startDeduce(Integer computingResourceId, Integer replicas, String taskType, Long taskId, Long taskInsId, Long workflowId, String taskName, String nodeId, Integer state); | |||
| void endDeduce(String taskType, Long taskId, Long taskInsId, String nodeId, Date nodeStartTime); | |||
| @@ -20,4 +20,6 @@ public interface ResourceOccupyService { | |||
| Page<ResourceOccupy> queryByPage(PageRequest pageRequest); | |||
| Map<String, Double> queryCredit(); | |||
| void update(Integer computingResourceId, Integer replicas); | |||
| } | |||
| @@ -38,7 +38,7 @@ public interface ServiceService { | |||
| String stopServiceVersion(Long id); | |||
| String updateServiceVersion(ServiceVersion serviceVersion) throws Exception; | |||
| String updateServiceVersion(ServiceVersion serviceVersion, Boolean reRun) throws Exception; | |||
| HashMap<String, String> getServiceVersionLog(Long id, String startTime, String endTime); | |||
| @@ -239,10 +239,9 @@ public class ExperimentServiceImpl implements ExperimentService { | |||
| Map<String, Map<String, Object>> resourceInfo = (Map<String, Map<String, Object>>) converMap.get("resource_info"); | |||
| for (Map.Entry<String, Map<String, Object>> entry : resourceInfo.entrySet()) { | |||
| Map<String, Object> node = entry.getValue(); | |||
| resourceOccupyService.haveResource((Integer) node.get("computing_resource_id")); | |||
| resourceOccupyService.haveResource((Integer) node.get("computing_resource_id"), 1); | |||
| } | |||
| // 组装运行接口json | |||
| Map<String, Object> runReqMap = new HashMap<>(); | |||
| runReqMap.put("data", converMap.get("data")); | |||
| @@ -313,7 +312,7 @@ public class ExperimentServiceImpl implements ExperimentService { | |||
| // 记录开始扣积分 | |||
| for (Map.Entry<String, Map<String, Object>> entry : resourceInfo.entrySet()) { | |||
| Map<String, Object> node = entry.getValue(); | |||
| resourceOccupyService.startDeduce((Integer) node.get("computing_resource_id"), Constant.TaskType_Workflow, Long.valueOf(id), Long.valueOf(insert.getId()), experiment.getWorkflowId(), experiment.getName(), entry.getKey()); | |||
| resourceOccupyService.startDeduce((Integer) node.get("computing_resource_id"), 1, Constant.TaskType_Workflow, Long.valueOf(id), Long.valueOf(insert.getId()), experiment.getWorkflowId(), experiment.getName(), entry.getKey(), Constant.State_building); | |||
| } | |||
| } catch (Exception e) { | |||
| @@ -153,10 +153,10 @@ public class RayServiceImpl implements RayService { | |||
| } | |||
| // 记录开始扣积分 | |||
| if (resourceOccupyService.haveResource(ray.getComputingResourceId())) { | |||
| if (resourceOccupyService.haveResource(ray.getComputingResourceId(), 1)) { | |||
| RayParamVo rayParamVo = new RayParamVo(); | |||
| BeanUtils.copyProperties(ray, rayParamVo); | |||
| rayParamVo.setResource(ray.getComputingResourceId()); | |||
| rayParamVo.setComputingResourceId(ray.getComputingResourceId()); | |||
| rayParamVo.setCodeConfig(JsonUtils.jsonToMap(ray.getCodeConfig())); | |||
| rayParamVo.setDataset(JsonUtils.jsonToMap(ray.getDataset())); | |||
| rayParamVo.setModel(JsonUtils.jsonToMap(ray.getModel())); | |||
| @@ -206,7 +206,7 @@ public class RayServiceImpl implements RayService { | |||
| rayInsDao.insert(rayIns); | |||
| rayInsService.updateRayStatus(id); | |||
| // 记录开始扣除积分 | |||
| resourceOccupyService.startDeduce(ray.getComputingResourceId(), Constant.TaskType_Ray, id, rayIns.getId(), null, ray.getName(), null); | |||
| resourceOccupyService.startDeduce(ray.getComputingResourceId(), 1, Constant.TaskType_Ray, id, rayIns.getId(), null, ray.getName(), null, null); | |||
| } catch (Exception e) { | |||
| throw new RuntimeException(e); | |||
| } | |||
| @@ -29,21 +29,21 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| private ComputingResourceDao computingResourceDao; | |||
| @Override | |||
| public Boolean haveResource(Integer computingResourceId) throws Exception { | |||
| public Boolean haveResource(Integer computingResourceId, Integer replicas) throws Exception { | |||
| ComputingResource computingResource = computingResourceDao.queryById(computingResourceId); | |||
| LoginUser loginUser = SecurityUtils.getLoginUser(); | |||
| if (loginUser.getSysUser().getCredit() < computingResource.getCreditPerHour()) { | |||
| throw new Exception("积分不足"); | |||
| if (loginUser.getSysUser().getCredit() == 0) { | |||
| throw new Exception("当前积分为零"); | |||
| } | |||
| if (Constant.Computing_Resource_GPU.equals(computingResource.getComputingResource())) { | |||
| if (resourceOccupyDao.haveResource(computingResource.getResourceId(), computingResource.getGpuNums())) { | |||
| if (resourceOccupyDao.haveResource(computingResource.getResourceId(), computingResource.getGpuNums() * replicas)) { | |||
| return true; | |||
| } else { | |||
| throw new Exception("资源不足,GPU资源已被占用"); | |||
| } | |||
| } else { | |||
| if (resourceOccupyDao.haveResource(computingResource.getResourceId(), computingResource.getCpuCores())) { | |||
| if (resourceOccupyDao.haveResource(computingResource.getResourceId(), computingResource.getCpuCores() * replicas)) { | |||
| return true; | |||
| } else { | |||
| throw new Exception("资源不足,CPU资源已被占用完"); | |||
| @@ -53,26 +53,31 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| @Override | |||
| @Transactional | |||
| public void startDeduce(Integer computingResourceId, String taskType, Long taskId, Long taskInsId, Long workflowId, String taskName, String nodeId) { | |||
| public void startDeduce(Integer computingResourceId, Integer replicas, String taskType, Long taskId, Long taskInsId, Long workflowId, String taskName, String nodeId, Integer state) { | |||
| ResourceOccupy resourceOccupy = new ResourceOccupy(); | |||
| ComputingResource computingResource = computingResourceDao.queryById(computingResourceId); | |||
| resourceOccupy.setComputingResourceId(computingResourceId); | |||
| LoginUser loginUser = SecurityUtils.getLoginUser(); | |||
| resourceOccupy.setUserId(loginUser.getUserid()); | |||
| resourceOccupy.setCreditPerHour(computingResource.getCreditPerHour()); | |||
| resourceOccupy.setDescription(computingResource.getDescription()); | |||
| resourceOccupy.setCreditPerHour(computingResource.getCreditPerHour() * replicas); | |||
| if (replicas > 1) { | |||
| resourceOccupy.setDescription(replicas + " * [" + computingResource.getDescription() + "]"); | |||
| } else { | |||
| resourceOccupy.setDescription(computingResource.getDescription()); | |||
| } | |||
| resourceOccupy.setTaskType(taskType); | |||
| resourceOccupy.setTaskId(taskId); | |||
| resourceOccupy.setTaskInsId(taskInsId); | |||
| resourceOccupy.setWorkflowId(workflowId); | |||
| resourceOccupy.setTaskName(taskName); | |||
| resourceOccupy.setNodeId(nodeId); | |||
| resourceOccupy.setState(state); | |||
| resourceOccupyDao.save(resourceOccupy); | |||
| if (Constant.Computing_Resource_GPU.equals(computingResource.getComputingResource())) { | |||
| resourceOccupyDao.updateUsed(computingResource.getResourceId(), computingResource.getGpuNums()); | |||
| resourceOccupyDao.updateUsed(computingResource.getResourceId(), computingResource.getGpuNums() * replicas); | |||
| } else { | |||
| resourceOccupyDao.updateUsed(computingResource.getResourceId(), computingResource.getCpuCores()); | |||
| resourceOccupyDao.updateUsed(computingResource.getResourceId(), computingResource.getCpuCores() * replicas); | |||
| } | |||
| } | |||
| @@ -86,10 +91,11 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| resourceOccupyDao.edit(resourceOccupy); | |||
| ComputingResource computingResource = computingResourceDao.queryById(resourceOccupy.getComputingResourceId()); | |||
| int occupy_num = (int) (resourceOccupy.getCreditPerHour() / computingResource.getCreditPerHour()); | |||
| if (Constant.Computing_Resource_GPU.equals(computingResource.getComputingResource())) { | |||
| resourceOccupyDao.updateUnUsed(computingResource.getResourceId(), computingResource.getGpuNums()); | |||
| resourceOccupyDao.updateUnUsed(computingResource.getResourceId(), computingResource.getGpuNums() * occupy_num); | |||
| } else { | |||
| resourceOccupyDao.updateUnUsed(computingResource.getResourceId(), computingResource.getCpuCores()); | |||
| resourceOccupyDao.updateUnUsed(computingResource.getResourceId(), computingResource.getCpuCores() * occupy_num); | |||
| } | |||
| } | |||
| } | |||
| @@ -112,6 +118,7 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| resourceOccupy.setDeduceCredit(resourceOccupy.getDeduceCredit() + deduceCredit); | |||
| resourceOccupy.setDeduceLastTime(now); | |||
| resourceOccupy.setState(Constant.State_valid); | |||
| resourceOccupyDao.edit(resourceOccupy); | |||
| } | |||
| } | |||
| @@ -131,4 +138,35 @@ public class ResourceOccupyServiceImpl implements ResourceOccupyService { | |||
| result.put("deduceCredit", deduceCredit); | |||
| return result; | |||
| } | |||
| @Override | |||
| @Transactional | |||
| public void update(Integer computingResourceId, Integer replicas) { | |||
| ResourceOccupy resourceOccupy = new ResourceOccupy(); | |||
| ComputingResource oldComputingResource = computingResourceDao.queryById(resourceOccupy.getComputingResourceId()); | |||
| ComputingResource computingResource = computingResourceDao.queryById(computingResourceId); | |||
| int occupy_num = (int) (resourceOccupy.getCreditPerHour() / oldComputingResource.getCreditPerHour()); | |||
| if (Constant.Computing_Resource_GPU.equals(oldComputingResource.getComputingResource())) { | |||
| resourceOccupyDao.updateUnUsed(oldComputingResource.getResourceId(), oldComputingResource.getGpuNums() * occupy_num); | |||
| } else { | |||
| resourceOccupyDao.updateUnUsed(oldComputingResource.getResourceId(), computingResource.getCpuCores() * occupy_num); | |||
| } | |||
| if (Constant.Computing_Resource_GPU.equals(computingResource.getComputingResource())) { | |||
| resourceOccupyDao.updateUsed(computingResource.getResourceId(), computingResource.getGpuNums() * replicas); | |||
| } else { | |||
| resourceOccupyDao.updateUsed(computingResource.getResourceId(), computingResource.getCpuCores() * replicas); | |||
| } | |||
| if (replicas > 1) { | |||
| resourceOccupy.setDescription(replicas + " * [" + computingResource.getDescription() + "]"); | |||
| } else { | |||
| resourceOccupy.setDescription(computingResource.getDescription()); | |||
| } | |||
| resourceOccupy.setCreditPerHour(computingResource.getCreditPerHour() * replicas); | |||
| resourceOccupy.setComputingResourceId(computingResourceId); | |||
| resourceOccupyDao.edit(resourceOccupy); | |||
| } | |||
| } | |||
| @@ -151,7 +151,7 @@ public class ServiceServiceImpl implements ServiceService { | |||
| ServiceVersion oldServiceVersion = serviceDao.getServiceVersionById(serviceVersionVo.getId()); | |||
| if (!oldServiceVersion.getReplicas().equals(serviceVersionVo.getReplicas()) || !oldServiceVersion.getComputingResourceId().equals(serviceVersionVo.getComputingResourceId()) | |||
| || serviceVersionVo.getRerun()) { | |||
| updateServiceVersion(serviceVersion); | |||
| updateServiceVersion(serviceVersion, serviceVersionVo.getRerun()); | |||
| } | |||
| LoginUser loginUser = SecurityUtils.getLoginUser(); | |||
| serviceVersion.setUpdateBy(loginUser.getUsername()); | |||
| @@ -251,7 +251,7 @@ public class ServiceServiceImpl implements ServiceService { | |||
| HashMap<String, Object> paramMap = new HashMap<>(); | |||
| paramMap.put("service_name", service.getServiceName()); | |||
| paramMap.put("description", serviceVersion.getDescription()); | |||
| paramMap.put("resource", serviceVersion.getComputingResourceId()); | |||
| paramMap.put("computing_resource_id", serviceVersion.getComputingResourceId()); | |||
| paramMap.put("mount_path", serviceVersion.getMountPath()); | |||
| paramMap.put("replicas", serviceVersion.getReplicas()); | |||
| paramMap.put("env", JSONObject.parseObject(serviceVersion.getEnvVariables())); | |||
| @@ -262,7 +262,7 @@ public class ServiceServiceImpl implements ServiceService { | |||
| paramMap.put("deploy_type", serviceVersion.getDeployType()); | |||
| // 判断是否有资源 | |||
| if (resourceOccupyService.haveResource(serviceVersion.getComputingResourceId())) { | |||
| if (resourceOccupyService.haveResource(serviceVersion.getComputingResourceId(), serviceVersion.getReplicas())) { | |||
| String req = HttpUtils.sendPost(argoUrl + modelService + "/create", JSON.toJSONString(paramMap)); | |||
| if (StringUtils.isNotEmpty(req)) { | |||
| Map<String, Object> reqMap = JacksonUtil.parseJSONStr2Map(req); | |||
| @@ -275,7 +275,7 @@ public class ServiceServiceImpl implements ServiceService { | |||
| serviceDao.updateServiceVersion(serviceVersion); | |||
| // 记录开始扣积分 | |||
| resourceOccupyService.startDeduce(serviceVersion.getComputingResourceId(), Constant.TaskType_Service, serviceVersion.getServiceId(), serviceVersion.getId(), null, service.getServiceName(), null); | |||
| resourceOccupyService.startDeduce(serviceVersion.getComputingResourceId(), serviceVersion.getReplicas(), Constant.TaskType_Service, serviceVersion.getServiceId(), serviceVersion.getId(), null, service.getServiceName(), null, null); | |||
| return "启动成功"; | |||
| } else { | |||
| throw new RuntimeException("启动失败"); | |||
| @@ -305,20 +305,24 @@ public class ServiceServiceImpl implements ServiceService { | |||
| } | |||
| @Override | |||
| public String updateServiceVersion(ServiceVersion serviceVersion) throws Exception { | |||
| public String updateServiceVersion(ServiceVersion serviceVersion, Boolean reRun) throws Exception { | |||
| // 判断是否有资源 | |||
| if (resourceOccupyService.haveResource(serviceVersion.getComputingResourceId())) { | |||
| if (resourceOccupyService.haveResource(serviceVersion.getComputingResourceId(), serviceVersion.getReplicas())) { | |||
| HashMap<String, Object> paramMap = new HashMap<>(); | |||
| paramMap.put("deployment_name", serviceVersion.getDeploymentName()); | |||
| HashMap<String, Object> updateMap = new HashMap<>(); | |||
| updateMap.put("replicas", serviceVersion.getReplicas()); | |||
| updateMap.put("resource", serviceVersion.getResource()); | |||
| updateMap.put("computing_resource_id", serviceVersion.getComputingResourceId()); | |||
| paramMap.put("update_model", new JSONObject(updateMap)); | |||
| String req = HttpUtils.sendPost(argoUrl + modelService + "/update", JSON.toJSONString(paramMap)); | |||
| if (StringUtils.isNotEmpty(req)) { | |||
| com.ruoyi.platform.domain.Service service = serviceDao.getServiceById(serviceVersion.getServiceId()); | |||
| // 记录开始扣积分 | |||
| resourceOccupyService.startDeduce(serviceVersion.getComputingResourceId(), Constant.TaskType_Service, serviceVersion.getServiceId(), serviceVersion.getId(), null, service.getServiceName(), null); | |||
| if (reRun) { | |||
| resourceOccupyService.startDeduce(serviceVersion.getComputingResourceId(), serviceVersion.getReplicas(), Constant.TaskType_Service, serviceVersion.getServiceId(), serviceVersion.getId(), null, service.getServiceName(), null, null); | |||
| } else { | |||
| resourceOccupyService.update(serviceVersion.getComputingResourceId(), serviceVersion.getReplicas()); | |||
| } | |||
| return "修改成功"; | |||
| } else { | |||
| throw new RuntimeException("更新失败"); | |||
| @@ -512,12 +512,12 @@ public class K8sClientUtil { | |||
| try { | |||
| // 记录开始扣积分 | |||
| if (resourceOccupyService.haveResource(devEnvironment.getComputingResourceId())) { | |||
| if (resourceOccupyService.haveResource(devEnvironment.getComputingResourceId(), 1)) { | |||
| pod = api.createNamespacedPod(namespace, pod, null, null, null); | |||
| String nodeName = getNodeName(podName, namespace); | |||
| // 记录开始扣除积分 | |||
| resourceOccupyService.startDeduce(devEnvironment.getComputingResourceId(), Constant.TaskType_Dev, Long.valueOf(devEnvironment.getId()), null, null, devEnvironment.getName(), null); | |||
| resourceOccupyService.startDeduce(devEnvironment.getComputingResourceId(), 1, Constant.TaskType_Dev, Long.valueOf(devEnvironment.getId()), null, null, devEnvironment.getName(), null, null); | |||
| } | |||
| } catch (ApiException e) { | |||
| throw new RuntimeException("创建pod异常:" + e.getResponseBody()); | |||
| @@ -44,5 +44,5 @@ public class RayParamVo { | |||
| private Integer minSamplesRequired; | |||
| private Integer resource; | |||
| private Integer computingResourceId; | |||
| } | |||
| @@ -3,12 +3,18 @@ | |||
| <mapper namespace="com.ruoyi.platform.mapper.ResourceOccupyDao"> | |||
| <insert id="save"> | |||
| insert into resource_occupy (user_id, computing_resource_id, credit_per_hour, description, task_type, task_id, | |||
| task_ins_id, workflow_id, task_name, | |||
| node_id) | |||
| task_ins_id, workflow_id, task_name, | |||
| <if test="resourceOccupy.state != null"> | |||
| state, | |||
| </if> | |||
| node_id) | |||
| values (#{resourceOccupy.userId}, #{resourceOccupy.computingResourceId}, #{resourceOccupy.creditPerHour}, | |||
| #{resourceOccupy.description}, #{resourceOccupy.taskType}, #{resourceOccupy.taskId}, | |||
| #{resourceOccupy.taskInsId}, #{resourceOccupy.workflowId}, #{resourceOccupy.taskName}, | |||
| #{resourceOccupy.nodeId}) | |||
| #{resourceOccupy.description}, #{resourceOccupy.taskType}, #{resourceOccupy.taskId}, | |||
| #{resourceOccupy.taskInsId}, #{resourceOccupy.workflowId}, #{resourceOccupy.taskName}, | |||
| <if test="resourceOccupy.state != null"> | |||
| #{resourceOccupy.state}, | |||
| </if> | |||
| #{resourceOccupy.nodeId}) | |||
| </insert> | |||
| <update id="edit"> | |||
| @@ -23,6 +29,9 @@ | |||
| <if test="resourceOccupy.deduceCredit != null"> | |||
| deduce_credit = #{resourceOccupy.deduceCredit}, | |||
| </if> | |||
| <if test="resourceOccupy.description != null and resourceOccupy.description !=''"> | |||
| description = #{resourceOccupy.description}, | |||
| </if> | |||
| </set> | |||
| where id = #{resourceOccupy.id} | |||
| </update> | |||