| @@ -201,6 +201,12 @@ | |||||
| <version>0.1.55</version> <!-- 检查最新版本 --> | <version>0.1.55</version> <!-- 检查最新版本 --> | ||||
| </dependency> | </dependency> | ||||
| <dependency> | |||||
| <groupId>org.springframework.boot</groupId> | |||||
| <artifactId>spring-boot-starter-websocket</artifactId> | |||||
| </dependency> | |||||
| </dependencies> | </dependencies> | ||||
| <build> | <build> | ||||
| @@ -0,0 +1,17 @@ | |||||
| package com.ruoyi.platform.config; | |||||
| import org.springframework.context.annotation.Bean; | |||||
| import org.springframework.context.annotation.Configuration; | |||||
| import org.springframework.web.socket.server.standard.ServerEndpointExporter; | |||||
| @Configuration | |||||
| public class WebSocketConfig { | |||||
| /** | |||||
| * 注入 ServerEndpointExporter, | |||||
| * 这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 WebSocket endpoint | |||||
| */ | |||||
| @Bean | |||||
| public ServerEndpointExporter serverEndpointExporter() { | |||||
| return new ServerEndpointExporter(); | |||||
| } | |||||
| } | |||||
| @@ -143,6 +143,15 @@ public class ExperimentInsController extends BaseController { | |||||
| return genericsSuccess(this.experimentInsService.getRealtimePodLog(podName,startTime)); | return genericsSuccess(this.experimentInsService.getRealtimePodLog(podName,startTime)); | ||||
| } | } | ||||
| @GetMapping("/pods/realtimelog") | |||||
| @ApiOperation("获取pod实时日志请求") | |||||
| public GenericsAjaxResult<String> getRealtimePodLogFromPod(@RequestParam("pod_name") String podName, | |||||
| @RequestParam("workspace") String workspace){ | |||||
| return genericsSuccess(this.experimentInsService.getRealtimePodLogFromPod(podName,workspace)); | |||||
| } | |||||
| /** | /** | ||||
| * 查询实验实例实时日志 | * 查询实验实例实时日志 | ||||
| * | * | ||||
| @@ -2,6 +2,7 @@ package com.ruoyi.platform.domain; | |||||
| import com.baomidou.mybatisplus.annotation.TableId; | import com.baomidou.mybatisplus.annotation.TableId; | ||||
| import com.fasterxml.jackson.annotation.JsonFormat; | import com.fasterxml.jackson.annotation.JsonFormat; | ||||
| import com.fasterxml.jackson.annotation.JsonRawValue; | |||||
| import com.fasterxml.jackson.databind.PropertyNamingStrategy; | import com.fasterxml.jackson.databind.PropertyNamingStrategy; | ||||
| import com.fasterxml.jackson.databind.annotation.JsonNaming; | import com.fasterxml.jackson.databind.annotation.JsonNaming; | ||||
| import com.ruoyi.platform.handler.BaseMetaObjectHandler; | import com.ruoyi.platform.handler.BaseMetaObjectHandler; | ||||
| @@ -48,6 +49,7 @@ public class Workflow extends BaseMetaObjectHandler implements Serializable { | |||||
| private String dag; | private String dag; | ||||
| // @ApiModelProperty(name = "global_param") | // @ApiModelProperty(name = "global_param") | ||||
| @JsonRawValue | |||||
| private String globalParam; | private String globalParam; | ||||
| /** | /** | ||||
| @@ -94,4 +94,6 @@ public interface ExperimentInsService { | |||||
| Map<String, Object> getRealtimeWorkflowLog(LogRequestVo logRequest); | Map<String, Object> getRealtimeWorkflowLog(LogRequestVo logRequest); | ||||
| Map<String, Object> getRealtimePodLog(String podName, String startTime); | Map<String, Object> getRealtimePodLog(String podName, String startTime); | ||||
| String getRealtimePodLogFromPod(String podName, String namespace); | |||||
| } | } | ||||
| @@ -1,5 +1,6 @@ | |||||
| package com.ruoyi.platform.service; | package com.ruoyi.platform.service; | ||||
| import com.fasterxml.jackson.core.JsonProcessingException; | |||||
| import com.ruoyi.platform.domain.Workflow; | import com.ruoyi.platform.domain.Workflow; | ||||
| import org.springframework.data.domain.Page; | import org.springframework.data.domain.Page; | ||||
| import org.springframework.data.domain.PageRequest; | import org.springframework.data.domain.PageRequest; | ||||
| @@ -51,6 +51,9 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||||
| private String argoWorkflowRealTimeLog; | private String argoWorkflowRealTimeLog; | ||||
| @Value("${argo.workflowPodLog}") | @Value("${argo.workflowPodLog}") | ||||
| private String argoWorkflowPodLog; | private String argoWorkflowPodLog; | ||||
| @Value("${argo.ins.logsLines}") | |||||
| private int logsLines; | |||||
| private final MinioUtil minioUtil; | private final MinioUtil minioUtil; | ||||
| public ExperimentInsServiceImpl(MinioUtil minioUtil) { | public ExperimentInsServiceImpl(MinioUtil minioUtil) { | ||||
| @@ -518,6 +521,11 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { | |||||
| } | } | ||||
| @Override | |||||
| public String getRealtimePodLogFromPod(String podName, String namespace) { | |||||
| return K8sClientUtil.getPodLogs(podName, namespace, logsLines); | |||||
| } | |||||
| private boolean isTerminatedState(ExperimentIns ins) throws IOException { | private boolean isTerminatedState(ExperimentIns ins) throws IOException { | ||||
| // 定义终止态的列表,例如 "Succeeded", "Failed" 等 | // 定义终止态的列表,例如 "Succeeded", "Failed" 等 | ||||
| String status = ins.getStatus(); | String status = ins.getStatus(); | ||||
| @@ -226,6 +226,7 @@ public class ExperimentServiceImpl implements ExperimentService { | |||||
| Map<String, Object> runReqMap = new HashMap<>(); | Map<String, Object> runReqMap = new HashMap<>(); | ||||
| runReqMap.put("data", converMap.get("data")); | runReqMap.put("data", converMap.get("data")); | ||||
| runReqMap.put("params", JsonUtils.jsonToMap(StringUtils.isEmpty(experiment.getGlobalParam())?"{}":experiment.getGlobalParam())); | runReqMap.put("params", JsonUtils.jsonToMap(StringUtils.isEmpty(experiment.getGlobalParam())?"{}":experiment.getGlobalParam())); | ||||
| runReqMap.put("experiment", new HashMap<String, Object>().put("name", "experiment-"+experiment.getId())); | |||||
| Map<String ,Object> output = (Map<String, Object>) converMap.get("output"); | Map<String ,Object> output = (Map<String, Object>) converMap.get("output"); | ||||
| // 调argo运行接口 | // 调argo运行接口 | ||||
| String runRes = HttpUtils.sendPost(argoUrl + argoWorkflowRun, JsonUtils.mapToJson(runReqMap)); | String runRes = HttpUtils.sendPost(argoUrl + argoWorkflowRun, JsonUtils.mapToJson(runReqMap)); | ||||
| @@ -1,13 +1,16 @@ | |||||
| package com.ruoyi.platform.utils; | package com.ruoyi.platform.utils; | ||||
| import com.alibaba.nacos.shaded.com.google.gson.reflect.TypeToken; | |||||
| import io.kubernetes.client.Exec; | import io.kubernetes.client.Exec; | ||||
| import io.kubernetes.client.custom.IntOrString; | import io.kubernetes.client.custom.IntOrString; | ||||
| import io.kubernetes.client.custom.Quantity; | import io.kubernetes.client.custom.Quantity; | ||||
| import io.kubernetes.client.openapi.ApiClient; | import io.kubernetes.client.openapi.ApiClient; | ||||
| import io.kubernetes.client.openapi.ApiException; | import io.kubernetes.client.openapi.ApiException; | ||||
| import io.kubernetes.client.openapi.ApiResponse; | |||||
| import io.kubernetes.client.openapi.apis.CoreV1Api; | import io.kubernetes.client.openapi.apis.CoreV1Api; | ||||
| import io.kubernetes.client.openapi.models.*; | import io.kubernetes.client.openapi.models.*; | ||||
| import io.kubernetes.client.util.ClientBuilder; | import io.kubernetes.client.util.ClientBuilder; | ||||
| import io.kubernetes.client.util.Watch; | |||||
| import io.kubernetes.client.util.credentials.AccessTokenAuthentication; | import io.kubernetes.client.util.credentials.AccessTokenAuthentication; | ||||
| import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||
| import org.apache.commons.lang.StringUtils; | import org.apache.commons.lang.StringUtils; | ||||
| @@ -15,7 +18,6 @@ import org.springframework.beans.factory.annotation.Value; | |||||
| import org.springframework.stereotype.Component; | import org.springframework.stereotype.Component; | ||||
| import java.io.BufferedReader; | import java.io.BufferedReader; | ||||
| import java.io.IOException; | |||||
| import java.io.InputStreamReader; | import java.io.InputStreamReader; | ||||
| import java.util.HashMap; | import java.util.HashMap; | ||||
| import java.util.LinkedHashMap; | import java.util.LinkedHashMap; | ||||
| @@ -405,10 +407,11 @@ public class K8sClientUtil { | |||||
| public static String getPodLogs(String podName,String namespace,int line) { | public static String getPodLogs(String podName,String namespace,int line) { | ||||
| CoreV1Api api = new CoreV1Api(apiClient); | CoreV1Api api = new CoreV1Api(apiClient); | ||||
| try { | try { | ||||
| String log = api.readNamespacedPodLog(podName, namespace, null, null, null, null, null,null, null, line, null); | |||||
| String log = api.readNamespacedPodLog(podName, namespace, null, null, null, null, null,null, null, line, null); | |||||
| return log; | return log; | ||||
| } catch (ApiException e) { | } catch (ApiException e) { | ||||
| throw new RuntimeException("获取Pod日志异常", e); | throw new RuntimeException("获取Pod日志异常", e); | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| @@ -0,0 +1,70 @@ | |||||
| package com.ruoyi.platform.webSocket; | |||||
| import javax.annotation.Resource; | |||||
| import javax.websocket.*; | |||||
| import javax.websocket.server.PathParam; | |||||
| import javax.websocket.server.ServerEndpoint; | |||||
| import com.ruoyi.platform.service.ExperimentInsService; | |||||
| import com.ruoyi.platform.utils.K8sClientUtil; | |||||
| import org.springframework.stereotype.Component; | |||||
| import lombok.extern.slf4j.Slf4j; | |||||
| import org.springframework.web.bind.annotation.RestController; | |||||
| import java.io.IOException; | |||||
| import java.util.concurrent.CopyOnWriteArraySet; | |||||
| import java.util.concurrent.ConcurrentHashMap; | |||||
| @Component | |||||
| @Slf4j | |||||
| @ServerEndpoint("/websocket/workflowLogs") | |||||
| @RestController | |||||
| public class WebSocket { | |||||
| private Session session; | |||||
| private String userId; | |||||
| private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>(); | |||||
| private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>(); | |||||
| @OnOpen | |||||
| public void onOpen(Session session, @PathParam("userId") String userId) { | |||||
| try { | |||||
| this.session = session; | |||||
| this.userId = "workflowLogs"; | |||||
| webSockets.add(this); | |||||
| sessionPool.put(userId, session); | |||||
| log.info("【WebSocket 消息】有新的连接,总数为:" + webSockets.size()); | |||||
| } catch (Exception e) { | |||||
| // 异常处理 | |||||
| } | |||||
| } | |||||
| @OnClose | |||||
| public void onClose() { | |||||
| try { | |||||
| webSockets.remove(this); | |||||
| sessionPool.remove(this.userId); | |||||
| log.info("【WebSocket 消息】连接断开,总数为:" + webSockets.size()); | |||||
| } catch (Exception e) { | |||||
| // 异常处理 | |||||
| } | |||||
| } | |||||
| @OnMessage | |||||
| public void onMessage(String message, Session session) { | |||||
| log.info("【WebSocket 消息】收到客户端消息:" + message); | |||||
| // 处理收到的消息,例如获取实时日志数据 | |||||
| // 推送日志数据给客户端 | |||||
| try { | |||||
| K8sClientUtil.watchPodLog("workflow-controller-7c6f89997b-9lr8z", "argo"); | |||||
| session.getBasicRemote().sendText("nihao"); | |||||
| } catch (Exception e) { | |||||
| log.error("【WebSocket 消息】推送日志数据失败:" + e.getMessage()); | |||||
| } | |||||
| } | |||||
| @OnError | |||||
| public void onError(Session session, Throwable error) { | |||||
| log.error("【WebSocket 消息】发生错误:" + error.getMessage()); | |||||
| } | |||||
| } | |||||