From cb3fe4080e159f6dd7791c719ab20b98fd174f5f Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Tue, 22 Oct 2024 15:52:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Egorm=E6=A1=86=E6=9E=B6?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/http/hub_io.go | 30 ++++++++------ agent/internal/http/service.go | 9 +++- agent/main.go | 6 +-- common/pkgs/db2/db2.go | 24 +++++++++++ common/pkgs/db2/node.go | 52 ++++++++++++++++++++++++ common/pkgs/ioswitch2/http_hub_worker.go | 10 ++--- common/pkgs/ioswitch2/parser/parser.go | 14 ++++++- coordinator/internal/mq/node.go | 4 +- coordinator/internal/mq/service.go | 9 ++-- coordinator/main.go | 8 +++- 10 files changed, 135 insertions(+), 31 deletions(-) create mode 100644 common/pkgs/db2/db2.go create mode 100644 common/pkgs/db2/node.go diff --git a/agent/internal/http/hub_io.go b/agent/internal/http/hub_io.go index f3a54fd..8772bbf 100644 --- a/agent/internal/http/hub_io.go +++ b/agent/internal/http/hub_io.go @@ -1,6 +1,7 @@ package http import ( + "bytes" "context" "fmt" "github.com/gin-gonic/gin" @@ -11,13 +12,13 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/serder" "io" + "io/ioutil" "net/http" "time" ) type IOService struct { *Server - swWorker *exec.Worker } func (s *Server) IOSvc() *IOService { @@ -27,12 +28,8 @@ func (s *Server) IOSvc() *IOService { } func (s *IOService) GetStream(ctx *gin.Context) { - //planID := ctx.Query("plan_id") - //varID := ctx.Query("var_id") - //signalData := ctx.Query("signal") - var req cdssdk.GetStreamReq - if err := ctx.ShouldBindQuery(&req); err != nil { + if err := ctx.ShouldBindJSON(&req); err != nil { logger.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return @@ -47,7 +44,7 @@ func (s *IOService) GetStream(ctx *gin.Context) { c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) defer cancel() - sw := s.swWorker.FindByIDContexted(c, req.PlanID) + sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) if sw == nil { ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) return @@ -145,7 +142,7 @@ func (s *IOService) SendStream(ctx *gin.Context) { c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) defer cancel() - sw := s.swWorker.FindByIDContexted(c, req.PlanID) + sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) if sw == nil { ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) return @@ -197,6 +194,15 @@ func (s *IOService) SendStream(ctx *gin.Context) { } func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { + bodyBytes, err := ioutil.ReadAll(ctx.Request.Body) + if err != nil { + logger.Warnf("reading body: %s", err.Error()) + ctx.JSON(http.StatusInternalServerError, Failed("400", "internal error")) + return + } + println("Received body: %s", string(bodyBytes)) + ctx.Request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) // Reset body for subsequent reads + var req cdssdk.ExecuteIOPlanReq if err := ctx.ShouldBindJSON(&req); err != nil { logger.Warnf("binding body: %s", err.Error()) @@ -217,8 +223,8 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { sw := exec.NewExecutor(plan) - s.swWorker.Add(sw) - defer s.swWorker.Remove(sw) + s.svc.swWorker.Add(sw) + defer s.svc.swWorker.Remove(sw) // 设置上下文超时 c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) @@ -244,7 +250,7 @@ func (s *IOService) SendVar(ctx *gin.Context) { c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) defer cancel() - sw := s.swWorker.FindByIDContexted(c, req.PlanID) + sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) if sw == nil { ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) return @@ -272,7 +278,7 @@ func (s *IOService) GetVar(ctx *gin.Context) { c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) defer cancel() - sw := s.swWorker.FindByIDContexted(c, req.PlanID) + sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) if sw == nil { ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) return diff --git a/agent/internal/http/service.go b/agent/internal/http/service.go index 56dc3d0..7188b0c 100644 --- a/agent/internal/http/service.go +++ b/agent/internal/http/service.go @@ -1,8 +1,13 @@ package http +import "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + type Service struct { + swWorker *exec.Worker } -func NewService() (*Service, error) { - return &Service{}, nil +func NewService(swWorker *exec.Worker) *Service { + return &Service{ + swWorker: swWorker, + } } diff --git a/agent/main.go b/agent/main.go index f611727..f47600f 100644 --- a/agent/main.go +++ b/agent/main.go @@ -52,7 +52,9 @@ func main() { stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) stgglb.InitIPFSPool(&config.Cfg().IPFS) - svc, err := http.NewService() + sw := exec.NewWorker() + + svc := http.NewService(&sw) if err != nil { logger.Fatalf("new http service failed, err: %s", err.Error()) } @@ -108,8 +110,6 @@ func main() { logger.Fatalf("new ipfs failed, err: %s", err.Error()) } - sw := exec.NewWorker() - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat) diff --git a/common/pkgs/db2/db2.go b/common/pkgs/db2/db2.go new file mode 100644 index 0000000..a0447b5 --- /dev/null +++ b/common/pkgs/db2/db2.go @@ -0,0 +1,24 @@ +package db2 + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/sirupsen/logrus" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/config" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +type DB struct { + db *gorm.DB +} + +func NewDB(cfg *config.Config) (*DB, error) { + mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{}) + if err != nil { + logrus.Fatalf("failed to connect to database: %v", err) + } + + return &DB{ + db: mydb, + }, nil +} diff --git a/common/pkgs/db2/node.go b/common/pkgs/db2/node.go new file mode 100644 index 0000000..baeb7d3 --- /dev/null +++ b/common/pkgs/db2/node.go @@ -0,0 +1,52 @@ +package db2 + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "time" +) + +type NodeDB struct { + *DB +} + +func (nodeDB *DB) Node() *NodeDB { + return &NodeDB{DB: nodeDB} +} + +func (nodeDB *NodeDB) GetAllNodes() ([]cdssdk.Node, error) { + var ret []cdssdk.Node + + err := nodeDB.DB.db.Table("node").Find(&ret).Error + return ret, err +} + +func (nodeDB *NodeDB) GetByID(nodeID cdssdk.NodeID) (cdssdk.Node, error) { + var ret cdssdk.Node + err := nodeDB.DB.db.Table("node").Where("NodeID = ?", nodeID).Find(&ret).Error + + return ret, err +} + +// GetUserNodes 根据用户id查询可用node +func (nodeDB *NodeDB) GetUserNodes(userID cdssdk.UserID) ([]cdssdk.Node, error) { + var nodes []cdssdk.Node + err := nodeDB.DB.db. + Table("Node"). + Select("Node.*"). + Joins("JOIN UserNode ON UserNode.NodeID = Node.NodeID"). + Where("UserNode.UserID = ?", userID). + Find(&nodes).Error + return nodes, err +} + +// UpdateState 更新状态,并且设置上次上报时间为现在 +func (nodeDB *NodeDB) UpdateState(nodeID cdssdk.NodeID, state string) error { + err := nodeDB.DB.db. + Model(&cdssdk.Node{}). + Where("NodeID = ?", nodeID). + Updates(map[string]interface{}{ + "State": state, + "LastReportTime": time.Now(), + }).Error + return err +} diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index b83abb2..e121779 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -3,6 +3,7 @@ package ioswitch2 import ( "context" "io" + "strconv" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/types" @@ -19,13 +20,10 @@ type HttpHubWorker struct { } func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { - //cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&w.Node)) - //if err != nil { - // return nil, err - //} - + addressInfo := w.Node.Address.(*cdssdk.HttpAddressInfo) + baseUrl := "http://" + addressInfo.ExternalIP + ":" + strconv.Itoa(addressInfo.Port) config := cdssdk.Config{ - URL: "", + URL: baseUrl, } pool := cdssdk.NewPool(&config) cli, err := pool.Acquire() diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index ab5af49..dd5130d 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -253,8 +253,18 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2 } if f.Node != nil { - t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) - t.Env().Pinned = true + switch typeInfo := f.Node.Address.(type) { + case *cdssdk.HttpAddressInfo: + t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Node: *f.Node}) + t.Env().Pinned = true + + case *cdssdk.GRPCAddressInfo: + t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) + t.Env().Pinned = true + + default: + return nil, fmt.Errorf("unsupported node address type %T", typeInfo) + } } return t, nil diff --git a/coordinator/internal/mq/node.go b/coordinator/internal/mq/node.go index 3111ce6..fadea66 100644 --- a/coordinator/internal/mq/node.go +++ b/coordinator/internal/mq/node.go @@ -27,7 +27,7 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co if msg.NodeIDs == nil { var err error - nodes, err = svc.db.Node().GetAllNodes(svc.db.SQLCtx()) + nodes, err = svc.db2.Node().GetAllNodes() if err != nil { logger.Warnf("getting all nodes: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get all node failed") @@ -36,7 +36,7 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co } else { // 可以不用事务 for _, id := range msg.NodeIDs { - node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id) + node, err := svc.db2.Node().GetByID(id) if err != nil { logger.WithField("NodeID", id). Warnf("query node failed, err: %s", err.Error()) diff --git a/coordinator/internal/mq/service.go b/coordinator/internal/mq/service.go index ca44dcc..6a11e16 100644 --- a/coordinator/internal/mq/service.go +++ b/coordinator/internal/mq/service.go @@ -2,14 +2,17 @@ package mq import ( mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" + mydb2 "gitlink.org.cn/cloudream/storage/common/pkgs/db2" ) type Service struct { - db *mydb.DB + db *mydb.DB + db2 *mydb2.DB } -func NewService(db *mydb.DB) *Service { +func NewService(db *mydb.DB, db2 *mydb2.DB) *Service { return &Service{ - db: db, + db: db, + db2: db2, } } diff --git a/coordinator/main.go b/coordinator/main.go index 90b2d1b..d7a4798 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" + mydb2 "gitlink.org.cn/cloudream/storage/common/pkgs/db2" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage/coordinator/internal/config" "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" @@ -29,7 +30,12 @@ func main() { logger.Fatalf("new db failed, err: %s", err.Error()) } - coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ) + db2, err := mydb2.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db failed, err: %s", err.Error()) + } + + coorSvr, err := coormq.NewServer(mq.NewService(db, db2), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) }