浏览代码

增加更新Object元数据、获取Package列表等接口

gitlink
Sydonian 1年前
父节点
当前提交
24856fd23f
共有 12 个文件被更改,包括 449 次插入127 次删除
  1. +22
    -0
      client/internal/http/bucket.go
  2. +43
    -14
      client/internal/http/object.go
  3. +35
    -44
      client/internal/http/package.go
  4. +7
    -3
      client/internal/http/server.go
  5. +0
    -2
      client/internal/services/bucket.go
  6. +30
    -0
      client/internal/services/object.go
  7. +15
    -0
      client/internal/services/package.go
  8. +39
    -13
      common/pkgs/db/object.go
  9. +69
    -13
      common/pkgs/mq/coordinator/object.go
  10. +154
    -3
      coordinator/internal/mq/object.go
  11. +22
    -22
      scanner/internal/event/check_package_redundancy.go
  12. +13
    -13
      scanner/internal/event/clean_pinned.go

+ 22
- 0
client/internal/http/bucket.go 查看文件

@@ -59,3 +59,25 @@ func (s *BucketService) Delete(ctx *gin.Context) {


ctx.JSON(http.StatusOK, OK(nil)) ctx.JSON(http.StatusOK, OK(nil))
} }

func (s *BucketService) ListUserBuckets(ctx *gin.Context) {
log := logger.WithField("HTTP", "Bucket.ListUserBuckets")

var req cdssdk.BucketListUserBucketsReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

buckets, err := s.svc.BucketSvc().GetUserBuckets(req.UserID)
if err != nil {
log.Warnf("getting user buckets: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get user buckets failed"))
return
}

ctx.JSON(http.StatusOK, OK(cdssdk.BucketListUserBucketsResp{
Buckets: buckets,
}))
}

+ 43
- 14
client/internal/http/object.go 查看文件

@@ -71,22 +71,17 @@ func (s *ObjectService) Upload(ctx *gin.Context) {
} }
} }


type ObjectDownloadReq struct {
UserID *cdssdk.UserID `form:"userID" binding:"required"`
ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"`
}

func (s *ObjectService) Download(ctx *gin.Context) { func (s *ObjectService) Download(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Download") log := logger.WithField("HTTP", "Object.Download")


var req ObjectDownloadReq
var req cdssdk.ObjectDownloadReq
if err := ctx.ShouldBindQuery(&req); err != nil { if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error()) log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return return
} }


file, err := s.svc.ObjectSvc().Download(*req.UserID, *req.ObjectID)
file, err := s.svc.ObjectSvc().Download(req.UserID, req.ObjectID)
if err != nil { if err != nil {
log.Warnf("downloading object: %s", err.Error()) log.Warnf("downloading object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed"))
@@ -124,28 +119,62 @@ func (s *ObjectService) Download(ctx *gin.Context) {
}) })
} }


type GetPackageObjectsReq struct {
UserID *cdssdk.UserID `form:"userID" binding:"required"`
PackageID *cdssdk.PackageID `form:"packageID" binding:"required"`
func (s *ObjectService) UpdateInfo(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.UpdateInfo")

var req cdssdk.ObjectUpdateInfoReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings)
if err != nil {
log.Warnf("updating objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "update objects failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
}

func (s *ObjectService) Delete(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Delete")

var req cdssdk.ObjectDeleteReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

err := s.svc.ObjectSvc().Delete(req.UserID, req.ObjectIDs)
if err != nil {
log.Warnf("deleting objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
} }
type GetPackageObjectsResp = cdssdk.ObjectGetPackageObjectsResp


func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { func (s *ObjectService) GetPackageObjects(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.GetPackageObjects") log := logger.WithField("HTTP", "Object.GetPackageObjects")


var req GetPackageObjectsReq
var req cdssdk.ObjectGetPackageObjectsReq
if err := ctx.ShouldBindQuery(&req); err != nil { if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error()) log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return return
} }


objs, err := s.svc.ObjectSvc().GetPackageObjects(*req.UserID, *req.PackageID)
objs, err := s.svc.ObjectSvc().GetPackageObjects(req.UserID, req.PackageID)
if err != nil { if err != nil {
log.Warnf("getting package objects: %s", err.Error()) log.Warnf("getting package objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object failed"))
return return
} }


ctx.JSON(http.StatusOK, OK(GetPackageObjectsResp{Objects: objs}))
ctx.JSON(http.StatusOK, OK(cdssdk.ObjectGetPackageObjectsResp{Objects: objs}))
} }

+ 35
- 44
client/internal/http/package.go 查看文件

@@ -10,7 +10,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"


"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
) )


@@ -24,32 +23,24 @@ func (s *Server) Package() *PackageService {
} }
} }


type PackageGetReq struct {
UserID *cdssdk.UserID `form:"userID" binding:"required"`
PackageID *cdssdk.PackageID `form:"packageID" binding:"required"`
}
type PackageGetResp struct {
model.Package
}

func (s *PackageService) Get(ctx *gin.Context) { func (s *PackageService) Get(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Get") log := logger.WithField("HTTP", "Package.Get")


var req PackageGetReq
var req cdssdk.PackageGetReq
if err := ctx.ShouldBindQuery(&req); err != nil { if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error()) log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return return
} }


pkg, err := s.svc.PackageSvc().Get(*req.UserID, *req.PackageID)
pkg, err := s.svc.PackageSvc().Get(req.UserID, req.PackageID)
if err != nil { if err != nil {
log.Warnf("getting package: %s", err.Error()) log.Warnf("getting package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed"))
return return
} }


ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg}))
ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetResp{Package: *pkg}))
} }


func (s *PackageService) Create(ctx *gin.Context) { func (s *PackageService) Create(ctx *gin.Context) {
@@ -73,22 +64,17 @@ func (s *PackageService) Create(ctx *gin.Context) {
})) }))
} }


type PackageDeleteReq struct {
UserID *cdssdk.UserID `json:"userID" binding:"required"`
PackageID *cdssdk.PackageID `json:"packageID" binding:"required"`
}

func (s *PackageService) Delete(ctx *gin.Context) { func (s *PackageService) Delete(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Delete") log := logger.WithField("HTTP", "Package.Delete")


var req PackageDeleteReq
var req cdssdk.PackageDeleteReq
if err := ctx.ShouldBindJSON(&req); err != nil { if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error()) log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return return
} }


err := s.svc.PackageSvc().DeletePackage(*req.UserID, *req.PackageID)
err := s.svc.PackageSvc().DeletePackage(req.UserID, req.PackageID)
if err != nil { if err != nil {
log.Warnf("deleting package: %s", err.Error()) log.Warnf("deleting package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed"))
@@ -98,61 +84,66 @@ func (s *PackageService) Delete(ctx *gin.Context) {
ctx.JSON(http.StatusOK, OK(nil)) ctx.JSON(http.StatusOK, OK(nil))
} }


type GetCachedNodesReq struct {
UserID *cdssdk.UserID `json:"userID" binding:"required"`
PackageID *cdssdk.PackageID `json:"packageID" binding:"required"`
}
type GetCachedNodesResp struct {
cdssdk.PackageCachingInfo
func (s *PackageService) ListBucketPackages(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.ListBucketPackages")

var req cdssdk.PackageListBucketPackagesReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

pkgs, err := s.svc.PackageSvc().GetBucketPackages(req.UserID, req.BucketID)
if err != nil {
log.Warnf("getting bucket packages: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket packages failed"))
return
}

ctx.JSON(http.StatusOK, OK(cdssdk.PackageListBucketPackagesResp{
Packages: pkgs,
}))
} }


func (s *PackageService) GetCachedNodes(ctx *gin.Context) { func (s *PackageService) GetCachedNodes(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.GetCachedNodes") log := logger.WithField("HTTP", "Package.GetCachedNodes")


var req GetCachedNodesReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
var req cdssdk.PackageGetCachedNodesReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return return
} }


resp, err := s.svc.PackageSvc().GetCachedNodes(*req.UserID, *req.PackageID)
resp, err := s.svc.PackageSvc().GetCachedNodes(req.UserID, req.PackageID)
if err != nil { if err != nil {
log.Warnf("get package cached nodes failed: %s", err.Error()) log.Warnf("get package cached nodes failed: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package cached nodes failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package cached nodes failed"))
return return
} }


ctx.JSON(http.StatusOK, OK(GetCachedNodesResp{resp}))
}

type GetLoadedNodesReq struct {
UserID *cdssdk.UserID `json:"userID" binding:"required"`
PackageID *cdssdk.PackageID `json:"packageID" binding:"required"`
}

type GetLoadedNodesResp struct {
NodeIDs []cdssdk.NodeID `json:"nodeIDs"`
ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetCachedNodesResp{PackageCachingInfo: resp}))
} }


func (s *PackageService) GetLoadedNodes(ctx *gin.Context) { func (s *PackageService) GetLoadedNodes(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.GetLoadedNodes") log := logger.WithField("HTTP", "Package.GetLoadedNodes")


var req GetLoadedNodesReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
var req cdssdk.PackageGetLoadedNodesReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return return
} }


nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(*req.UserID, *req.PackageID)
nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(req.UserID, req.PackageID)
if err != nil { if err != nil {
log.Warnf("get package loaded nodes failed: %s", err.Error()) log.Warnf("get package loaded nodes failed: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package loaded nodes failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package loaded nodes failed"))
return return
} }


ctx.JSON(http.StatusOK, OK(GetLoadedNodesResp{
ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetLoadedNodesResp{
NodeIDs: nodeIDs, NodeIDs: nodeIDs,
})) }))
} }


+ 7
- 3
client/internal/http/server.go 查看文件

@@ -42,12 +42,15 @@ func (s *Server) initRouters() {
s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download) s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download)
s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload)
s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects)
s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo)
s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete)


s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) s.engine.GET(cdssdk.PackageGetPath, s.Package().Get)
s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create)
s.engine.POST("/package/delete", s.Package().Delete)
s.engine.GET("/package/getCachedNodes", s.Package().GetCachedNodes)
s.engine.GET("/package/getLoadedNodes", s.Package().GetLoadedNodes)
s.engine.POST(cdssdk.PackageDeletePath, s.Package().Delete)
s.engine.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages)
s.engine.GET(cdssdk.PackageGetCachedNodesPath, s.Package().GetCachedNodes)
s.engine.GET(cdssdk.PackageGetLoadedNodesPath, s.Package().GetLoadedNodes)


s.engine.POST("/storage/loadPackage", s.Storage().LoadPackage) s.engine.POST("/storage/loadPackage", s.Storage().LoadPackage)
s.engine.POST("/storage/createPackage", s.Storage().CreatePackage) s.engine.POST("/storage/createPackage", s.Storage().CreatePackage)
@@ -57,4 +60,5 @@ func (s *Server) initRouters() {


s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create)
s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete)
s.engine.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets)
} }

+ 0
- 2
client/internal/services/bucket.go 查看文件

@@ -74,8 +74,6 @@ func (svc *BucketService) DeleteBucket(userID cdssdk.UserID, bucketID cdssdk.Buc
} }
defer stgglb.CoordinatorMQPool.Release(coorCli) defer stgglb.CoordinatorMQPool.Release(coorCli)


// TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁

_, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) _, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID))
if err != nil { if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err) return fmt.Errorf("request to coordinator failed, err: %w", err)


+ 30
- 0
client/internal/services/object.go 查看文件

@@ -35,10 +35,40 @@ func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration
return false, nil, nil return false, nil, nil
} }


func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

_, err = coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings))
if err != nil {
return fmt.Errorf("requsting to coodinator: %w", err)
}

return nil
}

func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) {
panic("not implement yet!") panic("not implement yet!")
} }


func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

_, err = coorCli.DeleteObjects(coormq.ReqDeleteObjects(userID, objectIDs))
if err != nil {
return fmt.Errorf("requsting to coodinator: %w", err)
}

return nil
}

func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]model.Object, error) { func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]model.Object, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {


+ 15
- 0
client/internal/services/package.go 查看文件

@@ -34,6 +34,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID)
return &getResp.Package, nil return &getResp.Package, nil
} }


func (svc *PackageService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]cdssdk.Package, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return getResp.Packages, nil
}

func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {


+ 39
- 13
common/pkgs/db/object.go 查看文件

@@ -26,25 +26,46 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj
return ret.ToObject(), err return ret.ToObject(), err
} }


func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) {
func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]model.Object, error) {
if len(objectIDs) == 0 {
return nil, nil
}

// TODO In语句
stmt, args, err := sqlx.In("select * from Object where ObjectID in (?) order by ObjectID asc", objectIDs)
if err != nil {
return nil, err
}
stmt = ctx.Rebind(stmt)

objs := make([]model.TempObject, 0, len(objectIDs))
err = sqlx.Select(ctx, &objs, stmt, args...)
if err != nil {
return nil, err
}

return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
}

func (db *ObjectDB) BatchByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
if len(pathes) == 0 { if len(pathes) == 0 {
return nil, nil return nil, nil
} }


// TODO In语句 // TODO In语句
stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes)
stmt, args, err := sqlx.In("select * from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stmt = ctx.Rebind(stmt) stmt = ctx.Rebind(stmt)


objIDs := make([]cdssdk.ObjectID, 0, len(pathes))
err = sqlx.Select(ctx, &objIDs, stmt, args...)
objs := make([]model.TempObject, 0, len(pathes))
err = sqlx.Select(ctx, &objs, stmt, args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }


return objIDs, nil
return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
} }


func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
@@ -63,8 +84,8 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID,
return cdssdk.ObjectID(objectID), nil return cdssdk.ObjectID(objectID), nil
} }


// 可以用于批量创建或者更新记录
// 用于创建时,需要额外检查PackageID+Path的唯一性
// 可以用于批量创建或者更新记录
// 用于创建时,需要额外检查PackageID+Path的唯一性
// 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。
func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 { if len(objs) == 0 {
@@ -163,17 +184,22 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
for _, add := range adds { for _, add := range adds {
pathes = append(pathes, add.Path) pathes = append(pathes, add.Path)
} }
objIDs, err := db.BatchGetPackageObjectIDs(ctx, packageID, pathes)
// 这里可以不用检查查询结果是否与pathes的数量相同
addedObjs, err := db.BatchByPackagePath(ctx, packageID, pathes)
if err != nil { if err != nil {
return nil, fmt.Errorf("batch get object ids: %w", err) return nil, fmt.Errorf("batch get object ids: %w", err)
} }
addedObjIDs := make([]cdssdk.ObjectID, len(addedObjs))
for i := range addedObjs {
addedObjIDs[i] = addedObjs[i].ObjectID
}


err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
err = db.ObjectBlock().BatchDeleteByObjectID(ctx, addedObjIDs)
if err != nil { if err != nil {
return nil, fmt.Errorf("batch delete object blocks: %w", err) return nil, fmt.Errorf("batch delete object blocks: %w", err)
} }


err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
err = db.PinnedObject().BatchDeleteByObjectID(ctx, addedObjIDs)
if err != nil { if err != nil {
return nil, fmt.Errorf("batch delete pinned objects: %w", err) return nil, fmt.Errorf("batch delete pinned objects: %w", err)
} }
@@ -181,7 +207,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
objBlocks := make([]stgmod.ObjectBlock, 0, len(adds)) objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
for i, add := range adds { for i, add := range adds {
objBlocks = append(objBlocks, stgmod.ObjectBlock{ objBlocks = append(objBlocks, stgmod.ObjectBlock{
ObjectID: objIDs[i],
ObjectID: addedObjIDs[i],
Index: 0, Index: 0,
NodeID: add.NodeID, NodeID: add.NodeID,
FileHash: add.FileHash, FileHash: add.FileHash,
@@ -206,10 +232,10 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
return nil, fmt.Errorf("batch create caches: %w", err) return nil, fmt.Errorf("batch create caches: %w", err)
} }


return objIDs, nil
return addedObjIDs, nil
} }


func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error {
func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error {
if len(objs) == 0 { if len(objs) == 0 {
return nil return nil
} }


+ 69
- 13
common/pkgs/mq/coordinator/object.go 查看文件

@@ -13,7 +13,11 @@ type ObjectService interface {


GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage)


ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, *mq.CodeMessage)
UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, *mq.CodeMessage)

UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage)

DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage)
} }


// 查询Package中的所有Object,返回的Objects会按照ObjectID升序 // 查询Package中的所有Object,返回的Objects会按照ObjectID升序
@@ -71,30 +75,82 @@ func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*Ge
} }


// 更新Object的冗余方式 // 更新Object的冗余方式
var _ = Register(Service.ChangeObjectRedundancy)
var _ = Register(Service.UpdateObjectRedundancy)


type ChangeObjectRedundancy struct {
type UpdateObjectRedundancy struct {
mq.MessageBodyBase mq.MessageBodyBase
Entries []ChangeObjectRedundancyEntry `json:"entries"`
Updatings []UpdatingObjectRedundancy `json:"updatings"`
} }
type ChangeObjectRedundancyResp struct {
type UpdateObjectRedundancyResp struct {
mq.MessageBodyBase mq.MessageBodyBase
} }
type ChangeObjectRedundancyEntry struct {
type UpdatingObjectRedundancy struct {
ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"`
Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"` Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"`
PinnedAt []cdssdk.NodeID `json:"pinnedAt"` PinnedAt []cdssdk.NodeID `json:"pinnedAt"`
Blocks []stgmod.ObjectBlock `json:"blocks"` Blocks []stgmod.ObjectBlock `json:"blocks"`
} }


func ReqChangeObjectRedundancy(entries []ChangeObjectRedundancyEntry) *ChangeObjectRedundancy {
return &ChangeObjectRedundancy{
Entries: entries,
func ReqUpdateObjectRedundancy(updatings []UpdatingObjectRedundancy) *UpdateObjectRedundancy {
return &UpdateObjectRedundancy{
Updatings: updatings,
}
}
func RespUpdateObjectRedundancy() *UpdateObjectRedundancyResp {
return &UpdateObjectRedundancyResp{}
}
func (client *Client) UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, error) {
return mq.Request(Service.UpdateObjectRedundancy, client.rabbitCli, msg)
}

// 更新Object元数据
var _ = Register(Service.UpdateObjectInfos)

type UpdateObjectInfos struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Updatings []cdssdk.UpdatingObject `json:"updatings"`
}

type UpdateObjectInfosResp struct {
mq.MessageBodyBase
}

func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) *UpdateObjectInfos {
return &UpdateObjectInfos{
UserID: userID,
Updatings: updatings,
}
}
func RespUpdateObjectInfos() *UpdateObjectInfosResp {
return &UpdateObjectInfosResp{}
}
func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) {
return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg)
}

// 删除Object
var _ = Register(Service.DeleteObjects)

type DeleteObjects struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
}

type DeleteObjectsResp struct {
mq.MessageBodyBase
}

func ReqDeleteObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *DeleteObjects {
return &DeleteObjects{
UserID: userID,
ObjectIDs: objectIDs,
} }
} }
func RespChangeObjectRedundancy() *ChangeObjectRedundancyResp {
return &ChangeObjectRedundancyResp{}
func RespDeleteObjects() *DeleteObjectsResp {
return &DeleteObjectsResp{}
} }
func (client *Client) ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, error) {
return mq.Request(Service.ChangeObjectRedundancy, client.rabbitCli, msg)
func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, error) {
return mq.Request(Service.DeleteObjects, client.rabbitCli, msg)
} }

+ 154
- 3
coordinator/internal/mq/object.go 查看文件

@@ -5,9 +5,12 @@ import (
"fmt" "fmt"


"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sort2"
stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmod "gitlink.org.cn/cloudream/storage/common/models"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
) )
@@ -51,14 +54,162 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails)
return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details)) return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details))
} }


func (svc *Service) ChangeObjectRedundancy(msg *coormq.ChangeObjectRedundancy) (*coormq.ChangeObjectRedundancyResp, *mq.CodeMessage) {
func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
return svc.db.Object().BatchUpdateRedundancy(tx, msg.Entries)
return svc.db.Object().BatchUpdateRedundancy(tx, msg.Updatings)
}) })
if err != nil { if err != nil {
logger.Warnf("batch updating redundancy: %s", err.Error()) logger.Warnf("batch updating redundancy: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed") return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed")
} }


return mq.ReplyOK(coormq.RespChangeObjectRedundancy())
return mq.ReplyOK(coormq.RespUpdateObjectRedundancy())
}

func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int {
return sort2.Cmp(o1.ObjectID, o2.ObjectID)
})

objIDs := make([]cdssdk.ObjectID, len(msg.Updatings))
for i, obj := range msg.Updatings {
objIDs[i] = obj.ObjectID
}

oldObjs, err := svc.db.Object().BatchGet(tx, objIDs)
if err != nil {
return fmt.Errorf("batch getting objects: %w", err)
}
oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
for i, obj := range oldObjs {
oldObjIDs[i] = obj.ObjectID
}

avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs)
if len(notExistsObjs) > 0 {
// TODO 部分对象已经不存在
}

// 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
// 否则,直接更新即可
//var pkgIDChangedObjs []cdssdk.Object
//var pathChangedObjs []cdssdk.Object
//var infoChangedObjs []cdssdk.Object
//for i := range willUpdateObjs {
// if willUpdateObjs[i].PackageID != oldObjs[i].PackageID {
// newObj := oldObjs[i]
// willUpdateObjs[i].ApplyTo(&newObj)
// pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
// } else if willUpdateObjs[i].Path != oldObjs[i].Path {
// newObj := oldObjs[i]
// willUpdateObjs[i].ApplyTo(&newObj)
// pathChangedObjs = append(pathChangedObjs, newObj)
// } else {
// newObj := oldObjs[i]
// willUpdateObjs[i].ApplyTo(&newObj)
// infoChangedObjs = append(infoChangedObjs, newObj)
// }
//}

newObjs := make([]cdssdk.Object, len(avaiUpdatings))
for i := range newObjs {
newObj := oldObjs[i]
avaiUpdatings[i].ApplyTo(&newObj)
}

err = svc.db.Object().BatchCreateOrUpdate(tx, newObjs)
if err != nil {
return fmt.Errorf("batch create or update: %w", err)
}

return nil
})

if err != nil {
logger.Warnf("batch updating objects: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed")
}

return mq.ReplyOK(coormq.RespUpdateObjectInfos())
}

func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.ObjectID) {
objIdx := 0
IDIdx := 0

for IDIdx < len(objIDs) {
if objs[objIdx].ObjectID == objIDs[IDIdx] {
pickedObjs = append(pickedObjs, objs[objIdx])
IDIdx++
objIdx++
} else if objs[objIdx].ObjectID < objIDs[IDIdx] {
objIdx++
} else {
notFoundObjs = append(notFoundObjs, objIDs[IDIdx])
IDIdx++
}
}

return
}

func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Object) ([]cdssdk.Object, error) {
type PackageObjects struct {
PackageID cdssdk.PackageID
ObjectByPath map[string]*cdssdk.Object
}

packages := make(map[cdssdk.PackageID]*PackageObjects)
for _, obj := range objs {
pkg, ok := packages[obj.PackageID]
if !ok {
pkg = &PackageObjects{
PackageID: obj.PackageID,
ObjectByPath: make(map[string]*cdssdk.Object),
}
packages[obj.PackageID] = pkg
}

if pkg.ObjectByPath[obj.Path] == nil {
o := obj
pkg.ObjectByPath[obj.Path] = &o
} else {
// TODO 有冲突
}
}

var willUpdateObjs []cdssdk.Object
for _, pkg := range packages {
existsObjs, err := svc.db.Object().BatchByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
if err != nil {
return nil, fmt.Errorf("batch getting objects by package path: %w", err)
}

for _, obj := range existsObjs {
pkg.ObjectByPath[obj.Path] = nil
}

for _, obj := range pkg.ObjectByPath {
if obj == nil {
continue
}
willUpdateObjs = append(willUpdateObjs, *obj)
}

}

return willUpdateObjs, nil
}

func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
return svc.db.Object().BatchDelete(tx, msg.ObjectIDs)
})
if err != nil {
logger.Warnf("batch deleting objects: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
}

return mq.ReplyOK(coormq.RespDeleteObjects())
} }

+ 22
- 22
scanner/internal/event/check_package_redundancy.go 查看文件

@@ -109,7 +109,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
} }
} }


var changedObjects []coormq.ChangeObjectRedundancyEntry
var changedObjects []coormq.UpdatingObjectRedundancy


defRep := cdssdk.DefaultRepRedundancy defRep := cdssdk.DefaultRepRedundancy
defEC := cdssdk.DefaultECRedundancy defEC := cdssdk.DefaultECRedundancy
@@ -136,7 +136,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
defer mutex.Unlock() defer mutex.Unlock()


for _, obj := range getObjs.Objects { for _, obj := range getObjs.Objects {
var entry *coormq.ChangeObjectRedundancyEntry
var updating *coormq.UpdatingObjectRedundancy
var err error var err error


shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
@@ -145,32 +145,32 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
case *cdssdk.NoneRedundancy: case *cdssdk.NoneRedundancy:
if shouldUseEC { if shouldUseEC {
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
entry, err = t.noneToEC(obj, &defEC, newECNodes)
updating, err = t.noneToEC(obj, &defEC, newECNodes)
} else { } else {
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep") log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
entry, err = t.noneToRep(obj, &defRep, newRepNodes)
updating, err = t.noneToRep(obj, &defRep, newRepNodes)
} }


case *cdssdk.RepRedundancy: case *cdssdk.RepRedundancy:
if shouldUseEC { if shouldUseEC {
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
entry, err = t.repToEC(obj, &defEC, newECNodes)
updating, err = t.repToEC(obj, &defEC, newECNodes)
} else { } else {
entry, err = t.repToRep(obj, &defRep, rechoosedRepNodes)
updating, err = t.repToRep(obj, &defRep, rechoosedRepNodes)
} }


case *cdssdk.ECRedundancy: case *cdssdk.ECRedundancy:
if shouldUseEC { if shouldUseEC {
uploadNodes := t.rechooseNodesForEC(obj, red, allNodes) uploadNodes := t.rechooseNodesForEC(obj, red, allNodes)
entry, err = t.ecToEC(obj, red, &defEC, uploadNodes)
updating, err = t.ecToEC(obj, red, &defEC, uploadNodes)
} else { } else {
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep") log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
entry, err = t.ecToRep(obj, red, &defRep, newRepNodes)
updating, err = t.ecToRep(obj, red, &defRep, newRepNodes)
} }
} }


if entry != nil {
changedObjects = append(changedObjects, *entry)
if updating != nil {
changedObjects = append(changedObjects, *updating)
} }


if err != nil { if err != nil {
@@ -182,7 +182,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
return return
} }


_, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changedObjects))
_, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects))
if err != nil { if err != nil {
log.Warnf("requesting to change object redundancy: %s", err.Error()) log.Warnf("requesting to change object redundancy: %s", err.Error())
return return
@@ -367,7 +367,7 @@ func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadI
return chosen return chosen
} }


func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) {
func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
if len(obj.Blocks) == 0 { if len(obj.Blocks) == 0 {
return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep")
} }
@@ -389,14 +389,14 @@ func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.
}) })
} }


return &coormq.ChangeObjectRedundancyEntry{
return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Redundancy: red, Redundancy: red,
Blocks: blocks, Blocks: blocks,
}, nil }, nil
} }


func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) {
func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -443,14 +443,14 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E
}) })
} }


return &coormq.ChangeObjectRedundancyEntry{
return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Redundancy: red, Redundancy: red,
Blocks: blocks, Blocks: blocks,
}, nil }, nil
} }


func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) {
func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
if len(obj.Blocks) == 0 { if len(obj.Blocks) == 0 {
return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep")
} }
@@ -479,18 +479,18 @@ func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.R
}) })
} }


return &coormq.ChangeObjectRedundancyEntry{
return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Redundancy: red, Redundancy: red,
Blocks: blocks, Blocks: blocks,
}, nil }, nil
} }


func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) {
func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
return t.noneToEC(obj, red, uploadNodes) return t.noneToEC(obj, red, uploadNodes)
} }


func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) {
func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -556,14 +556,14 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk
}) })
} }


return &coormq.ChangeObjectRedundancyEntry{
return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Redundancy: tarRed, Redundancy: tarRed,
Blocks: blocks, Blocks: blocks,
}, nil }, nil
} }


func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) {
func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -654,7 +654,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.
newBlocks[idx].FileHash = v.(string) newBlocks[idx].FileHash = v.(string)
} }


return &coormq.ChangeObjectRedundancyEntry{
return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Redundancy: tarRed, Redundancy: tarRed,
Blocks: newBlocks, Blocks: newBlocks,


+ 13
- 13
scanner/internal/event/clean_pinned.go 查看文件

@@ -109,7 +109,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
pinPlans := make(map[cdssdk.NodeID]*[]string) pinPlans := make(map[cdssdk.NodeID]*[]string)


// 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法 // 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法
var repObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry
var repObjectsUpdating []coormq.UpdatingObjectRedundancy
repMostNodeIDs := t.summaryRepObjectBlockNodes(repObjects) repMostNodeIDs := t.summaryRepObjectBlockNodes(repObjects)
solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{
totalBlockCount: 1, totalBlockCount: 1,
@@ -118,11 +118,11 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
blocks: nil, blocks: nil,
}) })
for _, obj := range repObjects { for _, obj := range repObjects {
repObjectsUpdateEntries = append(repObjectsUpdateEntries, t.makePlansForRepObject(solu, obj, pinPlans))
repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(solu, obj, pinPlans))
} }


// 对于ec对象,则每个对象单独进行退火算法 // 对于ec对象,则每个对象单独进行退火算法
var ecObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry
var ecObjectsUpdating []coormq.UpdatingObjectRedundancy
for _, obj := range ecObjects { for _, obj := range ecObjects {
ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy)
solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{
@@ -131,7 +131,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
pinnedAt: obj.PinnedAt, pinnedAt: obj.PinnedAt,
blocks: obj.Blocks, blocks: obj.Blocks,
}) })
ecObjectsUpdateEntries = append(ecObjectsUpdateEntries, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld))
ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld))
} }


ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld) ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld)
@@ -141,13 +141,13 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
} }


// 根据按照方案进行调整的结果,填充更新元数据的命令 // 根据按照方案进行调整的结果,填充更新元数据的命令
for i := range ecObjectsUpdateEntries {
t.populateECObjectEntry(&ecObjectsUpdateEntries[i], ecObjects[i], ioSwRets)
for i := range ecObjectsUpdating {
t.populateECObjectEntry(&ecObjectsUpdating[i], ecObjects[i], ioSwRets)
} }


finalEntries := append(repObjectsUpdateEntries, ecObjectsUpdateEntries...)
finalEntries := append(repObjectsUpdating, ecObjectsUpdating...)
if len(finalEntries) > 0 { if len(finalEntries) > 0 {
_, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(finalEntries))
_, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(finalEntries))
if err != nil { if err != nil {
log.Warnf("changing object redundancy: %s", err.Error()) log.Warnf("changing object redundancy: %s", err.Error())
return return
@@ -715,8 +715,8 @@ func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate
return v > rand.Float64() return v > rand.Float64()
} }


func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.ChangeObjectRedundancyEntry {
entry := coormq.ChangeObjectRedundancyEntry{
func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.UpdatingObjectRedundancy {
entry := coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Redundancy: obj.Object.Redundancy, Redundancy: obj.Object.Redundancy,
} }
@@ -748,8 +748,8 @@ func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.O
return entry return entry
} }


func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.ChangeObjectRedundancyEntry {
entry := coormq.ChangeObjectRedundancyEntry{
func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.UpdatingObjectRedundancy {
entry := coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Redundancy: obj.Object.Redundancy, Redundancy: obj.Object.Redundancy,
} }
@@ -871,7 +871,7 @@ func (t *CleanPinned) executePlans(execCtx ExecuteContext, pinPlans map[cdssdk.N
return ioSwRets, nil return ioSwRets, nil
} }


func (t *CleanPinned) populateECObjectEntry(entry *coormq.ChangeObjectRedundancyEntry, obj stgmod.ObjectDetail, ioRets map[string]any) {
func (t *CleanPinned) populateECObjectEntry(entry *coormq.UpdatingObjectRedundancy, obj stgmod.ObjectDetail, ioRets map[string]any) {
for i := range entry.Blocks { for i := range entry.Blocks {
if entry.Blocks[i].FileHash != "" { if entry.Blocks[i].FileHash != "" {
continue continue


正在加载...
取消
保存