diff --git a/ruoyi-modules/management-platform/pom.xml b/ruoyi-modules/management-platform/pom.xml index 9eedb568..4cb1ae7d 100644 --- a/ruoyi-modules/management-platform/pom.xml +++ b/ruoyi-modules/management-platform/pom.xml @@ -201,6 +201,12 @@ 0.1.55 + + org.springframework.boot + spring-boot-starter-websocket + + + diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/config/WebSocketConfig.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/config/WebSocketConfig.java new file mode 100644 index 00000000..9f946af1 --- /dev/null +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/config/WebSocketConfig.java @@ -0,0 +1,17 @@ +package com.ruoyi.platform.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +@Configuration +public class WebSocketConfig { + /** + * 注入 ServerEndpointExporter, + * 这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 WebSocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/controller/experiment/ExperimentInsController.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/controller/experiment/ExperimentInsController.java index b31e1316..4b5fc804 100644 --- a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/controller/experiment/ExperimentInsController.java +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/controller/experiment/ExperimentInsController.java @@ -143,6 +143,15 @@ public class ExperimentInsController extends BaseController { return genericsSuccess(this.experimentInsService.getRealtimePodLog(podName,startTime)); } + + @GetMapping("/pods/realtimelog") + @ApiOperation("获取pod实时日志请求") + public GenericsAjaxResult getRealtimePodLogFromPod(@RequestParam("pod_name") String podName, + @RequestParam("workspace") String workspace){ + return genericsSuccess(this.experimentInsService.getRealtimePodLogFromPod(podName,workspace)); + } + + /** * 查询实验实例实时日志 * diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/domain/Workflow.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/domain/Workflow.java index a6774593..f4ddff72 100644 --- a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/domain/Workflow.java +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/domain/Workflow.java @@ -2,6 +2,7 @@ package com.ruoyi.platform.domain; import com.baomidou.mybatisplus.annotation.TableId; import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonRawValue; import com.fasterxml.jackson.databind.PropertyNamingStrategy; import com.fasterxml.jackson.databind.annotation.JsonNaming; import com.ruoyi.platform.handler.BaseMetaObjectHandler; @@ -48,6 +49,7 @@ public class Workflow extends BaseMetaObjectHandler implements Serializable { private String dag; // @ApiModelProperty(name = "global_param") + @JsonRawValue private String globalParam; /** diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java index 96552f83..13ff922d 100644 --- a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java @@ -94,4 +94,6 @@ public interface ExperimentInsService { Map getRealtimeWorkflowLog(LogRequestVo logRequest); Map getRealtimePodLog(String podName, String startTime); + + String getRealtimePodLogFromPod(String podName, String namespace); } diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/WorkflowService.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/WorkflowService.java index 7520c562..41397348 100644 --- a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/WorkflowService.java +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/WorkflowService.java @@ -1,5 +1,6 @@ package com.ruoyi.platform.service; +import com.fasterxml.jackson.core.JsonProcessingException; import com.ruoyi.platform.domain.Workflow; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java index c4ffd05c..ee45a36e 100644 --- a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java @@ -51,6 +51,9 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { private String argoWorkflowRealTimeLog; @Value("${argo.workflowPodLog}") private String argoWorkflowPodLog; + @Value("${argo.ins.logsLines}") + private int logsLines; + private final MinioUtil minioUtil; public ExperimentInsServiceImpl(MinioUtil minioUtil) { @@ -518,6 +521,11 @@ public class ExperimentInsServiceImpl implements ExperimentInsService { } + @Override + public String getRealtimePodLogFromPod(String podName, String namespace) { + return K8sClientUtil.getPodLogs(podName, namespace, logsLines); + } + private boolean isTerminatedState(ExperimentIns ins) throws IOException { // 定义终止态的列表,例如 "Succeeded", "Failed" 等 String status = ins.getStatus(); diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java index a6ac4eeb..650598ce 100644 --- a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java @@ -226,6 +226,7 @@ public class ExperimentServiceImpl implements ExperimentService { Map runReqMap = new HashMap<>(); runReqMap.put("data", converMap.get("data")); runReqMap.put("params", JsonUtils.jsonToMap(StringUtils.isEmpty(experiment.getGlobalParam())?"{}":experiment.getGlobalParam())); + runReqMap.put("experiment", new HashMap().put("name", "experiment-"+experiment.getId())); Map output = (Map) converMap.get("output"); // 调argo运行接口 String runRes = HttpUtils.sendPost(argoUrl + argoWorkflowRun, JsonUtils.mapToJson(runReqMap)); diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/utils/K8sClientUtil.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/utils/K8sClientUtil.java index 4bc187d1..c0b59127 100644 --- a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/utils/K8sClientUtil.java +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/utils/K8sClientUtil.java @@ -1,13 +1,16 @@ package com.ruoyi.platform.utils; +import com.alibaba.nacos.shaded.com.google.gson.reflect.TypeToken; import io.kubernetes.client.Exec; import io.kubernetes.client.custom.IntOrString; import io.kubernetes.client.custom.Quantity; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.ApiResponse; import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.models.*; import io.kubernetes.client.util.ClientBuilder; +import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.credentials.AccessTokenAuthentication; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; @@ -15,7 +18,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.BufferedReader; -import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.LinkedHashMap; @@ -405,10 +407,11 @@ public class K8sClientUtil { public static String getPodLogs(String podName,String namespace,int line) { CoreV1Api api = new CoreV1Api(apiClient); try { - String log = api.readNamespacedPodLog(podName, namespace, null, null, null, null, null,null, null, line, null); + String log = api.readNamespacedPodLog(podName, namespace, null, null, null, null, null,null, null, line, null); return log; } catch (ApiException e) { throw new RuntimeException("获取Pod日志异常", e); } + } } diff --git a/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/webSocket/WebSocket.java b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/webSocket/WebSocket.java new file mode 100644 index 00000000..b9b60ac8 --- /dev/null +++ b/ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/webSocket/WebSocket.java @@ -0,0 +1,70 @@ +package com.ruoyi.platform.webSocket; + +import javax.annotation.Resource; +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + +import com.ruoyi.platform.service.ExperimentInsService; +import com.ruoyi.platform.utils.K8sClientUtil; +import org.springframework.stereotype.Component; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ConcurrentHashMap; + +@Component +@Slf4j +@ServerEndpoint("/websocket/workflowLogs") +@RestController +public class WebSocket { + private Session session; + private String userId; + private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>(); + private static ConcurrentHashMap sessionPool = new ConcurrentHashMap<>(); + + @OnOpen + public void onOpen(Session session, @PathParam("userId") String userId) { + try { + this.session = session; + this.userId = "workflowLogs"; + webSockets.add(this); + sessionPool.put(userId, session); + log.info("【WebSocket 消息】有新的连接,总数为:" + webSockets.size()); + } catch (Exception e) { + // 异常处理 + } + } + + @OnClose + public void onClose() { + try { + webSockets.remove(this); + sessionPool.remove(this.userId); + log.info("【WebSocket 消息】连接断开,总数为:" + webSockets.size()); + } catch (Exception e) { + // 异常处理 + } + } + + @OnMessage + public void onMessage(String message, Session session) { + log.info("【WebSocket 消息】收到客户端消息:" + message); + // 处理收到的消息,例如获取实时日志数据 + + // 推送日志数据给客户端 + try { + K8sClientUtil.watchPodLog("workflow-controller-7c6f89997b-9lr8z", "argo"); + session.getBasicRemote().sendText("nihao"); + } catch (Exception e) { + log.error("【WebSocket 消息】推送日志数据失败:" + e.getMessage()); + } + } + + @OnError + public void onError(Session session, Throwable error) { + log.error("【WebSocket 消息】发生错误:" + error.getMessage()); + } +}