diff --git a/client/internal/http/server.go b/client/internal/http/server.go index f727ac4..c4a1b91 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -39,29 +39,35 @@ func (s *Server) Serve() error { } func (s *Server) initRouters() { - s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download) - s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) - s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) - s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo) - s.engine.POST(cdssdk.ObjectMovePath, s.Object().Move) - s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete) + rt := s.engine.Use(auth) - s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) - s.engine.GET(cdssdk.PackageGetByNamePath, s.Package().GetByName) - s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) - s.engine.POST(cdssdk.PackageDeletePath, s.Package().Delete) - s.engine.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages) - s.engine.GET(cdssdk.PackageGetCachedNodesPath, s.Package().GetCachedNodes) - s.engine.GET(cdssdk.PackageGetLoadedNodesPath, s.Package().GetLoadedNodes) + rt.GET(cdssdk.ObjectDownloadPath, s.Object().Download) + rt.POST(cdssdk.ObjectUploadPath, s.Object().Upload) + rt.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) + rt.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo) + rt.POST(cdssdk.ObjectMovePath, s.Object().Move) + rt.POST(cdssdk.ObjectDeletePath, s.Object().Delete) - s.engine.POST(cdssdk.StorageLoadPackagePath, s.Storage().LoadPackage) - s.engine.POST(cdssdk.StorageCreatePackagePath, s.Storage().CreatePackage) - s.engine.GET(cdssdk.StorageGetInfoPath, s.Storage().GetInfo) + rt.GET(cdssdk.PackageGetPath, s.Package().Get) + rt.GET(cdssdk.PackageGetByNamePath, s.Package().GetByName) + rt.POST(cdssdk.PackageCreatePath, s.Package().Create) + rt.POST(cdssdk.PackageDeletePath, s.Package().Delete) + rt.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages) + rt.GET(cdssdk.PackageGetCachedNodesPath, s.Package().GetCachedNodes) + rt.GET(cdssdk.PackageGetLoadedNodesPath, s.Package().GetLoadedNodes) - s.engine.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) + rt.POST(cdssdk.StorageLoadPackagePath, s.Storage().LoadPackage) + rt.POST(cdssdk.StorageCreatePackagePath, s.Storage().CreatePackage) + rt.GET(cdssdk.StorageGetInfoPath, s.Storage().GetInfo) - s.engine.GET(cdssdk.BucketGetByNamePath, s.Bucket().GetByName) - s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) - s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) - s.engine.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) + rt.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) + + rt.GET(cdssdk.BucketGetByNamePath, s.Bucket().GetByName) + rt.POST(cdssdk.BucketCreatePath, s.Bucket().Create) + rt.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) + rt.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) + + rt.GET("/bucket/listDetails", s.Temp().ListDetails) + rt.GET("/bucket/getObjects", s.Temp().GetObjects) + rt.GET("/object/getDetail", s.Temp().GetObjectDetail) } diff --git a/client/internal/http/temp.go b/client/internal/http/temp.go new file mode 100644 index 0000000..c7f3f86 --- /dev/null +++ b/client/internal/http/temp.go @@ -0,0 +1,231 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +type TempService struct { + *Server +} + +func (s *Server) Temp() *TempService { + return &TempService{ + Server: s, + } +} + +type TempListDetailsResp struct { + Buckets []BucketDetail `json:"buckets"` +} +type BucketDetail struct { + BucketID cdssdk.BucketID `json:"bucketID"` + Name string `json:"name"` + ObjectCount int `json:"objectCount"` +} + +func (s *TempService) ListDetails(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.ListBucketsDetails") + + bkts, err := s.svc.BucketSvc().GetUserBuckets(1) + if err != nil { + log.Warnf("getting user buckets: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get user buckets failed")) + return + } + + details := make([]BucketDetail, len(bkts)) + for i := range bkts { + details[i].BucketID = bkts[i].BucketID + details[i].Name = bkts[i].Name + objs, err := s.getBucketObjects(bkts[i].BucketID) + if err != nil { + log.Warnf("getting bucket objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket objects failed")) + return + } + details[i].ObjectCount = len(objs) + } + + ctx.JSON(http.StatusOK, OK(TempListDetailsResp{ + Buckets: details, + })) +} + +type TempGetObjects struct { + BucketID cdssdk.BucketID `form:"bucketID"` +} +type BucketGetObjectsResp struct { + Objects []cdssdk.Object `json:"objects"` +} + +func (s *TempService) GetObjects(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.ListBucketsDetails") + + var req TempGetObjects + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + objs, err := s.getBucketObjects(req.BucketID) + if err != nil { + log.Warnf("getting bucket objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(BucketGetObjectsResp{ + Objects: objs, + })) +} + +type TempGetObjectDetail struct { + ObjectID cdssdk.ObjectID `form:"objectID"` +} +type TempGetObjectDetailResp struct { + Blocks []ObjectBlockDetail `json:"blocks"` +} +type ObjectBlockDetail struct { + Type string `json:"type"` + FileHash string `json:"fileHash"` + LocationType string `json:"locationType"` + LocationName string `json:"locationName"` +} + +func (s *TempService) GetObjectDetail(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.GetObjectDetail") + + var req TempGetObjectDetail + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + details, err := s.svc.ObjectSvc().GetObjectDetail(req.ObjectID) + if err != nil { + log.Warnf("getting object detail: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object detail failed")) + return + } + + loadedNodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(1, details.Object.PackageID) + if err != nil { + log.Warnf("getting loaded nodes: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) + return + } + + var allNodeIDs []cdssdk.NodeID + allNodeIDs = append(allNodeIDs, details.PinnedAt...) + for _, b := range details.Blocks { + allNodeIDs = append(allNodeIDs, b.NodeID) + } + allNodeIDs = append(allNodeIDs, loadedNodeIDs...) + + allNodeIDs = lo.Uniq(allNodeIDs) + + getNodes, err := s.svc.NodeSvc().GetNodes(allNodeIDs) + if err != nil { + log.Warnf("getting nodes: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) + return + } + + allNodes := make(map[cdssdk.NodeID]*cdssdk.Node) + for _, n := range getNodes { + n2 := n + allNodes[n.NodeID] = &n2 + } + + var blocks []ObjectBlockDetail + + for _, nodeID := range details.PinnedAt { + blocks = append(blocks, ObjectBlockDetail{ + Type: "Rep", + FileHash: details.Object.FileHash, + LocationType: "Agent", + LocationName: allNodes[nodeID].Name, + }) + } + + switch details.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + for _, blk := range details.Blocks { + if !lo.Contains(details.PinnedAt, blk.NodeID) { + blocks = append(blocks, ObjectBlockDetail{ + Type: "Rep", + FileHash: blk.FileHash, + LocationType: "Agent", + LocationName: allNodes[blk.NodeID].Name, + }) + } + } + case *cdssdk.RepRedundancy: + for _, blk := range details.Blocks { + if !lo.Contains(details.PinnedAt, blk.NodeID) { + blocks = append(blocks, ObjectBlockDetail{ + Type: "Rep", + FileHash: blk.FileHash, + LocationType: "Agent", + LocationName: allNodes[blk.NodeID].Name, + }) + } + } + + case *cdssdk.ECRedundancy: + for _, blk := range details.Blocks { + blocks = append(blocks, ObjectBlockDetail{ + Type: "Block", + FileHash: blk.FileHash, + LocationType: "Agent", + LocationName: allNodes[blk.NodeID].Name, + }) + } + } + + for _, nodeID := range loadedNodeIDs { + blocks = append(blocks, ObjectBlockDetail{ + Type: "Rep", + FileHash: details.Object.FileHash, + LocationType: "Storage", + LocationName: allNodes[nodeID].Name, + }) + } + + ctx.JSON(http.StatusOK, OK(TempGetObjectDetailResp{ + Blocks: blocks, + })) +} + +func (s *TempService) getBucketObjects(bktID cdssdk.BucketID) ([]cdssdk.Object, error) { + pkgs, err := s.svc.PackageSvc().GetBucketPackages(1, bktID) + if err != nil { + return nil, err + } + + var allObjs []cdssdk.Object + for _, pkg := range pkgs { + objs, err := s.svc.ObjectSvc().GetPackageObjects(1, pkg.PackageID) + if err != nil { + return nil, err + } + allObjs = append(allObjs, objs...) + } + + return allObjs, nil +} + +func auth(ctx *gin.Context) { + token := ctx.Request.Header.Get("X-CDS-Auth") + if token != "cloudream@123" { + ctx.AbortWithStatus(http.StatusUnauthorized) + } +} diff --git a/client/internal/services/object.go b/client/internal/services/object.go index d057d06..b5f44aa 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -7,6 +7,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" @@ -109,3 +110,18 @@ func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdss return getResp.Objects, nil } + +func (svc *ObjectService) GetObjectDetail(objectID cdssdk.ObjectID) (*stgmod.ObjectDetail, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objectID})) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return getResp.Objects[0], nil +} diff --git a/coordinator/internal/mq/service.go b/coordinator/internal/mq/service.go index 1b00486..ca44dcc 100644 --- a/coordinator/internal/mq/service.go +++ b/coordinator/internal/mq/service.go @@ -2,17 +2,14 @@ package mq import ( mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" - scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" ) type Service struct { - db *mydb.DB - scanner *scmq.Client + db *mydb.DB } -func NewService(db *mydb.DB, scanner *scmq.Client) *Service { +func NewService(db *mydb.DB) *Service { return &Service{ - db: db, - scanner: scanner, + db: db, } } diff --git a/coordinator/main.go b/coordinator/main.go index 182e847..ccf65be 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -7,7 +7,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage/coordinator/internal/config" "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" ) @@ -30,12 +29,7 @@ func main() { logger.Fatalf("new db failed, err: %s", err.Error()) } - scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - logger.Fatalf("new scanner client failed, err: %s", err.Error()) - } - - coorSvr, err := coormq.NewServer(mq.NewService(db, scanner), &config.Cfg().RabbitMQ) + coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) }