Browse Source

增加接口

gitlink
Sydonian 1 year ago
parent
commit
2390728af8
5 changed files with 278 additions and 34 deletions
  1. +27
    -21
      client/internal/http/server.go
  2. +231
    -0
      client/internal/http/temp.go
  3. +16
    -0
      client/internal/services/object.go
  4. +3
    -6
      coordinator/internal/mq/service.go
  5. +1
    -7
      coordinator/main.go

+ 27
- 21
client/internal/http/server.go View File

@@ -39,29 +39,35 @@ func (s *Server) Serve() error {
}

func (s *Server) initRouters() {
s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download)
s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload)
s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects)
s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo)
s.engine.POST(cdssdk.ObjectMovePath, s.Object().Move)
s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete)
rt := s.engine.Use(auth)

s.engine.GET(cdssdk.PackageGetPath, s.Package().Get)
s.engine.GET(cdssdk.PackageGetByNamePath, s.Package().GetByName)
s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create)
s.engine.POST(cdssdk.PackageDeletePath, s.Package().Delete)
s.engine.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages)
s.engine.GET(cdssdk.PackageGetCachedNodesPath, s.Package().GetCachedNodes)
s.engine.GET(cdssdk.PackageGetLoadedNodesPath, s.Package().GetLoadedNodes)
rt.GET(cdssdk.ObjectDownloadPath, s.Object().Download)
rt.POST(cdssdk.ObjectUploadPath, s.Object().Upload)
rt.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects)
rt.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo)
rt.POST(cdssdk.ObjectMovePath, s.Object().Move)
rt.POST(cdssdk.ObjectDeletePath, s.Object().Delete)

s.engine.POST(cdssdk.StorageLoadPackagePath, s.Storage().LoadPackage)
s.engine.POST(cdssdk.StorageCreatePackagePath, s.Storage().CreatePackage)
s.engine.GET(cdssdk.StorageGetInfoPath, s.Storage().GetInfo)
rt.GET(cdssdk.PackageGetPath, s.Package().Get)
rt.GET(cdssdk.PackageGetByNamePath, s.Package().GetByName)
rt.POST(cdssdk.PackageCreatePath, s.Package().Create)
rt.POST(cdssdk.PackageDeletePath, s.Package().Delete)
rt.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages)
rt.GET(cdssdk.PackageGetCachedNodesPath, s.Package().GetCachedNodes)
rt.GET(cdssdk.PackageGetLoadedNodesPath, s.Package().GetLoadedNodes)

s.engine.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage)
rt.POST(cdssdk.StorageLoadPackagePath, s.Storage().LoadPackage)
rt.POST(cdssdk.StorageCreatePackagePath, s.Storage().CreatePackage)
rt.GET(cdssdk.StorageGetInfoPath, s.Storage().GetInfo)

s.engine.GET(cdssdk.BucketGetByNamePath, s.Bucket().GetByName)
s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create)
s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete)
s.engine.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets)
rt.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage)

rt.GET(cdssdk.BucketGetByNamePath, s.Bucket().GetByName)
rt.POST(cdssdk.BucketCreatePath, s.Bucket().Create)
rt.POST(cdssdk.BucketDeletePath, s.Bucket().Delete)
rt.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets)

rt.GET("/bucket/listDetails", s.Temp().ListDetails)
rt.GET("/bucket/getObjects", s.Temp().GetObjects)
rt.GET("/object/getDetail", s.Temp().GetObjectDetail)
}

+ 231
- 0
client/internal/http/temp.go View File

@@ -0,0 +1,231 @@
package http

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type TempService struct {
*Server
}

func (s *Server) Temp() *TempService {
return &TempService{
Server: s,
}
}

type TempListDetailsResp struct {
Buckets []BucketDetail `json:"buckets"`
}
type BucketDetail struct {
BucketID cdssdk.BucketID `json:"bucketID"`
Name string `json:"name"`
ObjectCount int `json:"objectCount"`
}

func (s *TempService) ListDetails(ctx *gin.Context) {
log := logger.WithField("HTTP", "Bucket.ListBucketsDetails")

bkts, err := s.svc.BucketSvc().GetUserBuckets(1)
if err != nil {
log.Warnf("getting user buckets: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get user buckets failed"))
return
}

details := make([]BucketDetail, len(bkts))
for i := range bkts {
details[i].BucketID = bkts[i].BucketID
details[i].Name = bkts[i].Name
objs, err := s.getBucketObjects(bkts[i].BucketID)
if err != nil {
log.Warnf("getting bucket objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket objects failed"))
return
}
details[i].ObjectCount = len(objs)
}

ctx.JSON(http.StatusOK, OK(TempListDetailsResp{
Buckets: details,
}))
}

type TempGetObjects struct {
BucketID cdssdk.BucketID `form:"bucketID"`
}
type BucketGetObjectsResp struct {
Objects []cdssdk.Object `json:"objects"`
}

func (s *TempService) GetObjects(ctx *gin.Context) {
log := logger.WithField("HTTP", "Bucket.ListBucketsDetails")

var req TempGetObjects
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

objs, err := s.getBucketObjects(req.BucketID)
if err != nil {
log.Warnf("getting bucket objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket objects failed"))
return
}

ctx.JSON(http.StatusOK, OK(BucketGetObjectsResp{
Objects: objs,
}))
}

type TempGetObjectDetail struct {
ObjectID cdssdk.ObjectID `form:"objectID"`
}
type TempGetObjectDetailResp struct {
Blocks []ObjectBlockDetail `json:"blocks"`
}
type ObjectBlockDetail struct {
Type string `json:"type"`
FileHash string `json:"fileHash"`
LocationType string `json:"locationType"`
LocationName string `json:"locationName"`
}

func (s *TempService) GetObjectDetail(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.GetObjectDetail")

var req TempGetObjectDetail
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

details, err := s.svc.ObjectSvc().GetObjectDetail(req.ObjectID)
if err != nil {
log.Warnf("getting object detail: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object detail failed"))
return
}

loadedNodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(1, details.Object.PackageID)
if err != nil {
log.Warnf("getting loaded nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
return
}

var allNodeIDs []cdssdk.NodeID
allNodeIDs = append(allNodeIDs, details.PinnedAt...)
for _, b := range details.Blocks {
allNodeIDs = append(allNodeIDs, b.NodeID)
}
allNodeIDs = append(allNodeIDs, loadedNodeIDs...)

allNodeIDs = lo.Uniq(allNodeIDs)

getNodes, err := s.svc.NodeSvc().GetNodes(allNodeIDs)
if err != nil {
log.Warnf("getting nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed"))
return
}

allNodes := make(map[cdssdk.NodeID]*cdssdk.Node)
for _, n := range getNodes {
n2 := n
allNodes[n.NodeID] = &n2
}

var blocks []ObjectBlockDetail

for _, nodeID := range details.PinnedAt {
blocks = append(blocks, ObjectBlockDetail{
Type: "Rep",
FileHash: details.Object.FileHash,
LocationType: "Agent",
LocationName: allNodes[nodeID].Name,
})
}

switch details.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
for _, blk := range details.Blocks {
if !lo.Contains(details.PinnedAt, blk.NodeID) {
blocks = append(blocks, ObjectBlockDetail{
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.NodeID].Name,
})
}
}
case *cdssdk.RepRedundancy:
for _, blk := range details.Blocks {
if !lo.Contains(details.PinnedAt, blk.NodeID) {
blocks = append(blocks, ObjectBlockDetail{
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.NodeID].Name,
})
}
}

case *cdssdk.ECRedundancy:
for _, blk := range details.Blocks {
blocks = append(blocks, ObjectBlockDetail{
Type: "Block",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.NodeID].Name,
})
}
}

for _, nodeID := range loadedNodeIDs {
blocks = append(blocks, ObjectBlockDetail{
Type: "Rep",
FileHash: details.Object.FileHash,
LocationType: "Storage",
LocationName: allNodes[nodeID].Name,
})
}

ctx.JSON(http.StatusOK, OK(TempGetObjectDetailResp{
Blocks: blocks,
}))
}

func (s *TempService) getBucketObjects(bktID cdssdk.BucketID) ([]cdssdk.Object, error) {
pkgs, err := s.svc.PackageSvc().GetBucketPackages(1, bktID)
if err != nil {
return nil, err
}

var allObjs []cdssdk.Object
for _, pkg := range pkgs {
objs, err := s.svc.ObjectSvc().GetPackageObjects(1, pkg.PackageID)
if err != nil {
return nil, err
}
allObjs = append(allObjs, objs...)
}

return allObjs, nil
}

func auth(ctx *gin.Context) {
token := ctx.Request.Header.Get("X-CDS-Auth")
if token != "cloudream@123" {
ctx.AbortWithStatus(http.StatusUnauthorized)
}
}

+ 16
- 0
client/internal/services/object.go View File

@@ -7,6 +7,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
mytask "gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
@@ -109,3 +110,18 @@ func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdss

return getResp.Objects, nil
}

func (svc *ObjectService) GetObjectDetail(objectID cdssdk.ObjectID) (*stgmod.ObjectDetail, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objectID}))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return getResp.Objects[0], nil
}

+ 3
- 6
coordinator/internal/mq/service.go View File

@@ -2,17 +2,14 @@ package mq

import (
mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
)

type Service struct {
db *mydb.DB
scanner *scmq.Client
db *mydb.DB
}

func NewService(db *mydb.DB, scanner *scmq.Client) *Service {
func NewService(db *mydb.DB) *Service {
return &Service{
db: db,
scanner: scanner,
db: db,
}
}

+ 1
- 7
coordinator/main.go View File

@@ -7,7 +7,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage/coordinator/internal/config"
"gitlink.org.cn/cloudream/storage/coordinator/internal/mq"
)
@@ -30,12 +29,7 @@ func main() {
logger.Fatalf("new db failed, err: %s", err.Error())
}

scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new scanner client failed, err: %s", err.Error())
}

coorSvr, err := coormq.NewServer(mq.NewService(db, scanner), &config.Cfg().RabbitMQ)
coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new coordinator server failed, err: %s", err.Error())
}


Loading…
Cancel
Save