Browse Source

新增实时查询

pull/9/head
fanshuai 1 year ago
parent
commit
ed08068c92
10 changed files with 121 additions and 2 deletions
  1. +6
    -0
      ruoyi-modules/management-platform/pom.xml
  2. +17
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/config/WebSocketConfig.java
  3. +9
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/controller/experiment/ExperimentInsController.java
  4. +2
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/domain/Workflow.java
  5. +2
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java
  6. +1
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/WorkflowService.java
  7. +8
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java
  8. +1
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java
  9. +5
    -2
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/utils/K8sClientUtil.java
  10. +70
    -0
      ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/webSocket/WebSocket.java

+ 6
- 0
ruoyi-modules/management-platform/pom.xml View File

@@ -201,6 +201,12 @@
<version>0.1.55</version> <!-- 检查最新版本 -->
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>


</dependencies>

<build>


+ 17
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/config/WebSocketConfig.java View File

@@ -0,0 +1,17 @@
package com.ruoyi.platform.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
/**
* 注入 ServerEndpointExporter,
* 这个 bean 会自动注册使用了 @ServerEndpoint 注解声明的 WebSocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

+ 9
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/controller/experiment/ExperimentInsController.java View File

@@ -143,6 +143,15 @@ public class ExperimentInsController extends BaseController {
return genericsSuccess(this.experimentInsService.getRealtimePodLog(podName,startTime));
}


@GetMapping("/pods/realtimelog")
@ApiOperation("获取pod实时日志请求")
public GenericsAjaxResult<String> getRealtimePodLogFromPod(@RequestParam("pod_name") String podName,
@RequestParam("workspace") String workspace){
return genericsSuccess(this.experimentInsService.getRealtimePodLogFromPod(podName,workspace));
}


/**
* 查询实验实例实时日志
*


+ 2
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/domain/Workflow.java View File

@@ -2,6 +2,7 @@ package com.ruoyi.platform.domain;

import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonRawValue;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import com.ruoyi.platform.handler.BaseMetaObjectHandler;
@@ -48,6 +49,7 @@ public class Workflow extends BaseMetaObjectHandler implements Serializable {
private String dag;

// @ApiModelProperty(name = "global_param")
@JsonRawValue
private String globalParam;

/**


+ 2
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/ExperimentInsService.java View File

@@ -94,4 +94,6 @@ public interface ExperimentInsService {
Map<String, Object> getRealtimeWorkflowLog(LogRequestVo logRequest);

Map<String, Object> getRealtimePodLog(String podName, String startTime);

String getRealtimePodLogFromPod(String podName, String namespace);
}

+ 1
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/WorkflowService.java View File

@@ -1,5 +1,6 @@
package com.ruoyi.platform.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.ruoyi.platform.domain.Workflow;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;


+ 8
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentInsServiceImpl.java View File

@@ -51,6 +51,9 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {
private String argoWorkflowRealTimeLog;
@Value("${argo.workflowPodLog}")
private String argoWorkflowPodLog;
@Value("${argo.ins.logsLines}")
private int logsLines;

private final MinioUtil minioUtil;

public ExperimentInsServiceImpl(MinioUtil minioUtil) {
@@ -518,6 +521,11 @@ public class ExperimentInsServiceImpl implements ExperimentInsService {

}

@Override
public String getRealtimePodLogFromPod(String podName, String namespace) {
return K8sClientUtil.getPodLogs(podName, namespace, logsLines);
}

private boolean isTerminatedState(ExperimentIns ins) throws IOException {
// 定义终止态的列表,例如 "Succeeded", "Failed" 等
String status = ins.getStatus();


+ 1
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/service/impl/ExperimentServiceImpl.java View File

@@ -226,6 +226,7 @@ public class ExperimentServiceImpl implements ExperimentService {
Map<String, Object> runReqMap = new HashMap<>();
runReqMap.put("data", converMap.get("data"));
runReqMap.put("params", JsonUtils.jsonToMap(StringUtils.isEmpty(experiment.getGlobalParam())?"{}":experiment.getGlobalParam()));
runReqMap.put("experiment", new HashMap<String, Object>().put("name", "experiment-"+experiment.getId()));
Map<String ,Object> output = (Map<String, Object>) converMap.get("output");
// 调argo运行接口
String runRes = HttpUtils.sendPost(argoUrl + argoWorkflowRun, JsonUtils.mapToJson(runReqMap));


+ 5
- 2
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/utils/K8sClientUtil.java View File

@@ -1,13 +1,16 @@
package com.ruoyi.platform.utils;

import com.alibaba.nacos.shaded.com.google.gson.reflect.TypeToken;
import io.kubernetes.client.Exec;
import io.kubernetes.client.custom.IntOrString;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.ApiResponse;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.*;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.credentials.AccessTokenAuthentication;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
@@ -15,7 +18,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -405,10 +407,11 @@ public class K8sClientUtil {
public static String getPodLogs(String podName,String namespace,int line) {
CoreV1Api api = new CoreV1Api(apiClient);
try {
String log = api.readNamespacedPodLog(podName, namespace, null, null, null, null, null,null, null, line, null);
String log = api.readNamespacedPodLog(podName, namespace, null, null, null, null, null,null, null, line, null);
return log;
} catch (ApiException e) {
throw new RuntimeException("获取Pod日志异常", e);
}

}
}

+ 70
- 0
ruoyi-modules/management-platform/src/main/java/com/ruoyi/platform/webSocket/WebSocket.java View File

@@ -0,0 +1,70 @@
package com.ruoyi.platform.webSocket;

import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.ruoyi.platform.service.ExperimentInsService;
import com.ruoyi.platform.utils.K8sClientUtil;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ConcurrentHashMap;

@Component
@Slf4j
@ServerEndpoint("/websocket/workflowLogs")
@RestController
public class WebSocket {
private Session session;
private String userId;
private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();

@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
try {
this.session = session;
this.userId = "workflowLogs";
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【WebSocket 消息】有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
// 异常处理
}
}

@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【WebSocket 消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
// 异常处理
}
}

@OnMessage
public void onMessage(String message, Session session) {
log.info("【WebSocket 消息】收到客户端消息:" + message);
// 处理收到的消息,例如获取实时日志数据

// 推送日志数据给客户端
try {
K8sClientUtil.watchPodLog("workflow-controller-7c6f89997b-9lr8z", "argo");
session.getBasicRemote().sendText("nihao");
} catch (Exception e) {
log.error("【WebSocket 消息】推送日志数据失败:" + e.getMessage());
}
}

@OnError
public void onError(Session session, Throwable error) {
log.error("【WebSocket 消息】发生错误:" + error.getMessage());
}
}

Loading…
Cancel
Save