diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go index 7e749b0..ed944f2 100644 --- a/client/internal/http/bucket.go +++ b/client/internal/http/bucket.go @@ -59,3 +59,25 @@ func (s *BucketService) Delete(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } + +func (s *BucketService) ListUserBuckets(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.ListUserBuckets") + + var req cdssdk.BucketListUserBucketsReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + buckets, err := s.svc.BucketSvc().GetUserBuckets(req.UserID) + if err != nil { + log.Warnf("getting user buckets: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get user buckets failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.BucketListUserBucketsResp{ + Buckets: buckets, + })) +} diff --git a/client/internal/http/object.go b/client/internal/http/object.go index d7dba18..fb2c1f1 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -71,22 +71,17 @@ func (s *ObjectService) Upload(ctx *gin.Context) { } } -type ObjectDownloadReq struct { - UserID *cdssdk.UserID `form:"userID" binding:"required"` - ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"` -} - func (s *ObjectService) Download(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Download") - var req ObjectDownloadReq + var req cdssdk.ObjectDownloadReq if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - file, err := s.svc.ObjectSvc().Download(*req.UserID, *req.ObjectID) + file, err := s.svc.ObjectSvc().Download(req.UserID, req.ObjectID) if err != nil { log.Warnf("downloading object: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) @@ -124,28 +119,62 @@ func (s *ObjectService) Download(ctx *gin.Context) { }) } -type GetPackageObjectsReq struct { - UserID *cdssdk.UserID `form:"userID" binding:"required"` - PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` +func (s *ObjectService) UpdateInfo(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.UpdateInfo") + + var req cdssdk.ObjectUpdateInfoReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings) + if err != nil { + log.Warnf("updating objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "update objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) +} + +func (s *ObjectService) Delete(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Delete") + + var req cdssdk.ObjectDeleteReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + err := s.svc.ObjectSvc().Delete(req.UserID, req.ObjectIDs) + if err != nil { + log.Warnf("deleting objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) } -type GetPackageObjectsResp = cdssdk.ObjectGetPackageObjectsResp func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.GetPackageObjects") - var req GetPackageObjectsReq + var req cdssdk.ObjectGetPackageObjectsReq if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - objs, err := s.svc.ObjectSvc().GetPackageObjects(*req.UserID, *req.PackageID) + objs, err := s.svc.ObjectSvc().GetPackageObjects(req.UserID, req.PackageID) if err != nil { log.Warnf("getting package objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object failed")) return } - ctx.JSON(http.StatusOK, OK(GetPackageObjectsResp{Objects: objs})) + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectGetPackageObjectsResp{Objects: objs})) } diff --git a/client/internal/http/package.go b/client/internal/http/package.go index cdaf3e0..c18559d 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -24,32 +23,24 @@ func (s *Server) Package() *PackageService { } } -type PackageGetReq struct { - UserID *cdssdk.UserID `form:"userID" binding:"required"` - PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` -} -type PackageGetResp struct { - model.Package -} - func (s *PackageService) Get(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Get") - var req PackageGetReq + var req cdssdk.PackageGetReq if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - pkg, err := s.svc.PackageSvc().Get(*req.UserID, *req.PackageID) + pkg, err := s.svc.PackageSvc().Get(req.UserID, req.PackageID) if err != nil { log.Warnf("getting package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed")) return } - ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg})) + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetResp{Package: *pkg})) } func (s *PackageService) Create(ctx *gin.Context) { @@ -73,22 +64,17 @@ func (s *PackageService) Create(ctx *gin.Context) { })) } -type PackageDeleteReq struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` -} - func (s *PackageService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Delete") - var req PackageDeleteReq + var req cdssdk.PackageDeleteReq if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - err := s.svc.PackageSvc().DeletePackage(*req.UserID, *req.PackageID) + err := s.svc.PackageSvc().DeletePackage(req.UserID, req.PackageID) if err != nil { log.Warnf("deleting package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed")) @@ -98,61 +84,66 @@ func (s *PackageService) Delete(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } -type GetCachedNodesReq struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` -} -type GetCachedNodesResp struct { - cdssdk.PackageCachingInfo +func (s *PackageService) ListBucketPackages(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.ListBucketPackages") + + var req cdssdk.PackageListBucketPackagesReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + pkgs, err := s.svc.PackageSvc().GetBucketPackages(req.UserID, req.BucketID) + if err != nil { + log.Warnf("getting bucket packages: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket packages failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.PackageListBucketPackagesResp{ + Packages: pkgs, + })) } func (s *PackageService) GetCachedNodes(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.GetCachedNodes") - var req GetCachedNodesReq - if err := ctx.ShouldBindJSON(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) + var req cdssdk.PackageGetCachedNodesReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - resp, err := s.svc.PackageSvc().GetCachedNodes(*req.UserID, *req.PackageID) + resp, err := s.svc.PackageSvc().GetCachedNodes(req.UserID, req.PackageID) if err != nil { log.Warnf("get package cached nodes failed: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package cached nodes failed")) return } - ctx.JSON(http.StatusOK, OK(GetCachedNodesResp{resp})) -} - -type GetLoadedNodesReq struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` -} - -type GetLoadedNodesResp struct { - NodeIDs []cdssdk.NodeID `json:"nodeIDs"` + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetCachedNodesResp{PackageCachingInfo: resp})) } func (s *PackageService) GetLoadedNodes(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.GetLoadedNodes") - var req GetLoadedNodesReq - if err := ctx.ShouldBindJSON(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) + var req cdssdk.PackageGetLoadedNodesReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(*req.UserID, *req.PackageID) + nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(req.UserID, req.PackageID) if err != nil { log.Warnf("get package loaded nodes failed: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package loaded nodes failed")) return } - ctx.JSON(http.StatusOK, OK(GetLoadedNodesResp{ + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetLoadedNodesResp{ NodeIDs: nodeIDs, })) } diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 462f76a..ce4e276 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -42,12 +42,15 @@ func (s *Server) initRouters() { s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download) s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) + s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo) + s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete) s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) - s.engine.POST("/package/delete", s.Package().Delete) - s.engine.GET("/package/getCachedNodes", s.Package().GetCachedNodes) - s.engine.GET("/package/getLoadedNodes", s.Package().GetLoadedNodes) + s.engine.POST(cdssdk.PackageDeletePath, s.Package().Delete) + s.engine.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages) + s.engine.GET(cdssdk.PackageGetCachedNodesPath, s.Package().GetCachedNodes) + s.engine.GET(cdssdk.PackageGetLoadedNodesPath, s.Package().GetLoadedNodes) s.engine.POST("/storage/loadPackage", s.Storage().LoadPackage) s.engine.POST("/storage/createPackage", s.Storage().CreatePackage) @@ -57,4 +60,5 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) + s.engine.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) } diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index 4e297ce..d402b54 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -74,8 +74,6 @@ func (svc *BucketService) DeleteBucket(userID cdssdk.UserID, bucketID cdssdk.Buc } defer stgglb.CoordinatorMQPool.Release(coorCli) - // TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁 - _, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 0dc7374..f4f24ab 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -35,10 +35,40 @@ func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration return false, nil, nil } +func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + _, err = coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings)) + if err != nil { + return fmt.Errorf("requsting to coodinator: %w", err) + } + + return nil +} + func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { panic("not implement yet!") } +func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + _, err = coorCli.DeleteObjects(coormq.ReqDeleteObjects(userID, objectIDs)) + if err != nil { + return fmt.Errorf("requsting to coodinator: %w", err) + } + + return nil +} + func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]model.Object, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 9667356..405d53f 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -34,6 +34,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) return &getResp.Package, nil } +func (svc *PackageService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]cdssdk.Package, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return getResp.Packages, nil +} + func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 3655c98..ec9a669 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -26,25 +26,46 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret.ToObject(), err } -func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) { +func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]model.Object, error) { + if len(objectIDs) == 0 { + return nil, nil + } + + // TODO In语句 + stmt, args, err := sqlx.In("select * from Object where ObjectID in (?) order by ObjectID asc", objectIDs) + if err != nil { + return nil, err + } + stmt = ctx.Rebind(stmt) + + objs := make([]model.TempObject, 0, len(objectIDs)) + err = sqlx.Select(ctx, &objs, stmt, args...) + if err != nil { + return nil, err + } + + return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil +} + +func (db *ObjectDB) BatchByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { if len(pathes) == 0 { return nil, nil } // TODO In语句 - stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) + stmt, args, err := sqlx.In("select * from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) if err != nil { return nil, err } stmt = ctx.Rebind(stmt) - objIDs := make([]cdssdk.ObjectID, 0, len(pathes)) - err = sqlx.Select(ctx, &objIDs, stmt, args...) + objs := make([]model.TempObject, 0, len(pathes)) + err = sqlx.Select(ctx, &objs, stmt, args...) if err != nil { return nil, err } - return objIDs, nil + return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil } func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { @@ -63,8 +84,8 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, return cdssdk.ObjectID(objectID), nil } -// 可以用于批量创建或者更新记录 -// 用于创建时,需要额外检查PackageID+Path的唯一性 +// 可以用于批量创建或者更新记录。 +// 用于创建时,需要额外检查PackageID+Path的唯一性。 // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { if len(objs) == 0 { @@ -163,17 +184,22 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] for _, add := range adds { pathes = append(pathes, add.Path) } - objIDs, err := db.BatchGetPackageObjectIDs(ctx, packageID, pathes) + // 这里可以不用检查查询结果是否与pathes的数量相同 + addedObjs, err := db.BatchByPackagePath(ctx, packageID, pathes) if err != nil { return nil, fmt.Errorf("batch get object ids: %w", err) } + addedObjIDs := make([]cdssdk.ObjectID, len(addedObjs)) + for i := range addedObjs { + addedObjIDs[i] = addedObjs[i].ObjectID + } - err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) + err = db.ObjectBlock().BatchDeleteByObjectID(ctx, addedObjIDs) if err != nil { return nil, fmt.Errorf("batch delete object blocks: %w", err) } - err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) + err = db.PinnedObject().BatchDeleteByObjectID(ctx, addedObjIDs) if err != nil { return nil, fmt.Errorf("batch delete pinned objects: %w", err) } @@ -181,7 +207,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] objBlocks := make([]stgmod.ObjectBlock, 0, len(adds)) for i, add := range adds { objBlocks = append(objBlocks, stgmod.ObjectBlock{ - ObjectID: objIDs[i], + ObjectID: addedObjIDs[i], Index: 0, NodeID: add.NodeID, FileHash: add.FileHash, @@ -206,10 +232,10 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] return nil, fmt.Errorf("batch create caches: %w", err) } - return objIDs, nil + return addedObjIDs, nil } -func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { +func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error { if len(objs) == 0 { return nil } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 45b5b24..5ec380d 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -13,7 +13,11 @@ type ObjectService interface { GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) - ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, *mq.CodeMessage) + UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, *mq.CodeMessage) + + UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage) + + DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 @@ -71,30 +75,82 @@ func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*Ge } // 更新Object的冗余方式 -var _ = Register(Service.ChangeObjectRedundancy) +var _ = Register(Service.UpdateObjectRedundancy) -type ChangeObjectRedundancy struct { +type UpdateObjectRedundancy struct { mq.MessageBodyBase - Entries []ChangeObjectRedundancyEntry `json:"entries"` + Updatings []UpdatingObjectRedundancy `json:"updatings"` } -type ChangeObjectRedundancyResp struct { +type UpdateObjectRedundancyResp struct { mq.MessageBodyBase } -type ChangeObjectRedundancyEntry struct { +type UpdatingObjectRedundancy struct { ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"` PinnedAt []cdssdk.NodeID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` } -func ReqChangeObjectRedundancy(entries []ChangeObjectRedundancyEntry) *ChangeObjectRedundancy { - return &ChangeObjectRedundancy{ - Entries: entries, +func ReqUpdateObjectRedundancy(updatings []UpdatingObjectRedundancy) *UpdateObjectRedundancy { + return &UpdateObjectRedundancy{ + Updatings: updatings, + } +} +func RespUpdateObjectRedundancy() *UpdateObjectRedundancyResp { + return &UpdateObjectRedundancyResp{} +} +func (client *Client) UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, error) { + return mq.Request(Service.UpdateObjectRedundancy, client.rabbitCli, msg) +} + +// 更新Object元数据 +var _ = Register(Service.UpdateObjectInfos) + +type UpdateObjectInfos struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Updatings []cdssdk.UpdatingObject `json:"updatings"` +} + +type UpdateObjectInfosResp struct { + mq.MessageBodyBase +} + +func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) *UpdateObjectInfos { + return &UpdateObjectInfos{ + UserID: userID, + Updatings: updatings, + } +} +func RespUpdateObjectInfos() *UpdateObjectInfosResp { + return &UpdateObjectInfosResp{} +} +func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) { + return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg) +} + +// 删除Object +var _ = Register(Service.DeleteObjects) + +type DeleteObjects struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` +} + +type DeleteObjectsResp struct { + mq.MessageBodyBase +} + +func ReqDeleteObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *DeleteObjects { + return &DeleteObjects{ + UserID: userID, + ObjectIDs: objectIDs, } } -func RespChangeObjectRedundancy() *ChangeObjectRedundancyResp { - return &ChangeObjectRedundancyResp{} +func RespDeleteObjects() *DeleteObjectsResp { + return &DeleteObjectsResp{} } -func (client *Client) ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, error) { - return mq.Request(Service.ChangeObjectRedundancy, client.rabbitCli, msg) +func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, error) { + return mq.Request(Service.DeleteObjects, client.rabbitCli, msg) } diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 2af1ccc..b3171eb 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -5,9 +5,12 @@ import ( "fmt" "github.com/jmoiron/sqlx" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sort2" stgmod "gitlink.org.cn/cloudream/storage/common/models" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -51,14 +54,162 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details)) } -func (svc *Service) ChangeObjectRedundancy(msg *coormq.ChangeObjectRedundancy) (*coormq.ChangeObjectRedundancyResp, *mq.CodeMessage) { +func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { - return svc.db.Object().BatchUpdateRedundancy(tx, msg.Entries) + return svc.db.Object().BatchUpdateRedundancy(tx, msg.Updatings) }) if err != nil { logger.Warnf("batch updating redundancy: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed") } - return mq.ReplyOK(coormq.RespChangeObjectRedundancy()) + return mq.ReplyOK(coormq.RespUpdateObjectRedundancy()) +} + +func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int { + return sort2.Cmp(o1.ObjectID, o2.ObjectID) + }) + + objIDs := make([]cdssdk.ObjectID, len(msg.Updatings)) + for i, obj := range msg.Updatings { + objIDs[i] = obj.ObjectID + } + + oldObjs, err := svc.db.Object().BatchGet(tx, objIDs) + if err != nil { + return fmt.Errorf("batch getting objects: %w", err) + } + oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs)) + for i, obj := range oldObjs { + oldObjIDs[i] = obj.ObjectID + } + + avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs) + if len(notExistsObjs) > 0 { + // TODO 部分对象已经不存在 + } + + // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突 + // 否则,直接更新即可 + //var pkgIDChangedObjs []cdssdk.Object + //var pathChangedObjs []cdssdk.Object + //var infoChangedObjs []cdssdk.Object + //for i := range willUpdateObjs { + // if willUpdateObjs[i].PackageID != oldObjs[i].PackageID { + // newObj := oldObjs[i] + // willUpdateObjs[i].ApplyTo(&newObj) + // pkgIDChangedObjs = append(pkgIDChangedObjs, newObj) + // } else if willUpdateObjs[i].Path != oldObjs[i].Path { + // newObj := oldObjs[i] + // willUpdateObjs[i].ApplyTo(&newObj) + // pathChangedObjs = append(pathChangedObjs, newObj) + // } else { + // newObj := oldObjs[i] + // willUpdateObjs[i].ApplyTo(&newObj) + // infoChangedObjs = append(infoChangedObjs, newObj) + // } + //} + + newObjs := make([]cdssdk.Object, len(avaiUpdatings)) + for i := range newObjs { + newObj := oldObjs[i] + avaiUpdatings[i].ApplyTo(&newObj) + } + + err = svc.db.Object().BatchCreateOrUpdate(tx, newObjs) + if err != nil { + return fmt.Errorf("batch create or update: %w", err) + } + + return nil + }) + + if err != nil { + logger.Warnf("batch updating objects: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed") + } + + return mq.ReplyOK(coormq.RespUpdateObjectInfos()) +} + +func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.ObjectID) { + objIdx := 0 + IDIdx := 0 + + for IDIdx < len(objIDs) { + if objs[objIdx].ObjectID == objIDs[IDIdx] { + pickedObjs = append(pickedObjs, objs[objIdx]) + IDIdx++ + objIdx++ + } else if objs[objIdx].ObjectID < objIDs[IDIdx] { + objIdx++ + } else { + notFoundObjs = append(notFoundObjs, objIDs[IDIdx]) + IDIdx++ + } + } + + return +} + +func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Object) ([]cdssdk.Object, error) { + type PackageObjects struct { + PackageID cdssdk.PackageID + ObjectByPath map[string]*cdssdk.Object + } + + packages := make(map[cdssdk.PackageID]*PackageObjects) + for _, obj := range objs { + pkg, ok := packages[obj.PackageID] + if !ok { + pkg = &PackageObjects{ + PackageID: obj.PackageID, + ObjectByPath: make(map[string]*cdssdk.Object), + } + packages[obj.PackageID] = pkg + } + + if pkg.ObjectByPath[obj.Path] == nil { + o := obj + pkg.ObjectByPath[obj.Path] = &o + } else { + // TODO 有冲突 + } + } + + var willUpdateObjs []cdssdk.Object + for _, pkg := range packages { + existsObjs, err := svc.db.Object().BatchByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath)) + if err != nil { + return nil, fmt.Errorf("batch getting objects by package path: %w", err) + } + + for _, obj := range existsObjs { + pkg.ObjectByPath[obj.Path] = nil + } + + for _, obj := range pkg.ObjectByPath { + if obj == nil { + continue + } + willUpdateObjs = append(willUpdateObjs, *obj) + } + + } + + return willUpdateObjs, nil +} + +func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + return svc.db.Object().BatchDelete(tx, msg.ObjectIDs) + }) + if err != nil { + logger.Warnf("batch deleting objects: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed") + } + + return mq.ReplyOK(coormq.RespDeleteObjects()) } diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 0a6eee0..880aa26 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -109,7 +109,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } } - var changedObjects []coormq.ChangeObjectRedundancyEntry + var changedObjects []coormq.UpdatingObjectRedundancy defRep := cdssdk.DefaultRepRedundancy defEC := cdssdk.DefaultECRedundancy @@ -136,7 +136,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { defer mutex.Unlock() for _, obj := range getObjs.Objects { - var entry *coormq.ChangeObjectRedundancyEntry + var updating *coormq.UpdatingObjectRedundancy var err error shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold @@ -145,32 +145,32 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { case *cdssdk.NoneRedundancy: if shouldUseEC { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") - entry, err = t.noneToEC(obj, &defEC, newECNodes) + updating, err = t.noneToEC(obj, &defEC, newECNodes) } else { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep") - entry, err = t.noneToRep(obj, &defRep, newRepNodes) + updating, err = t.noneToRep(obj, &defRep, newRepNodes) } case *cdssdk.RepRedundancy: if shouldUseEC { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") - entry, err = t.repToEC(obj, &defEC, newECNodes) + updating, err = t.repToEC(obj, &defEC, newECNodes) } else { - entry, err = t.repToRep(obj, &defRep, rechoosedRepNodes) + updating, err = t.repToRep(obj, &defRep, rechoosedRepNodes) } case *cdssdk.ECRedundancy: if shouldUseEC { uploadNodes := t.rechooseNodesForEC(obj, red, allNodes) - entry, err = t.ecToEC(obj, red, &defEC, uploadNodes) + updating, err = t.ecToEC(obj, red, &defEC, uploadNodes) } else { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep") - entry, err = t.ecToRep(obj, red, &defRep, newRepNodes) + updating, err = t.ecToRep(obj, red, &defRep, newRepNodes) } } - if entry != nil { - changedObjects = append(changedObjects, *entry) + if updating != nil { + changedObjects = append(changedObjects, *updating) } if err != nil { @@ -182,7 +182,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { return } - _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changedObjects)) + _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects)) if err != nil { log.Warnf("requesting to change object redundancy: %s", err.Error()) return @@ -367,7 +367,7 @@ func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadI return chosen } -func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -389,14 +389,14 @@ func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk. }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -443,14 +443,14 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -479,18 +479,18 @@ func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.R }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { return t.noneToEC(obj, red, uploadNodes) } -func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -556,14 +556,14 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: tarRed, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -654,7 +654,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. newBlocks[idx].FileHash = v.(string) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: tarRed, Blocks: newBlocks, diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 67e4599..9b30fc0 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -109,7 +109,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { pinPlans := make(map[cdssdk.NodeID]*[]string) // 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法 - var repObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + var repObjectsUpdating []coormq.UpdatingObjectRedundancy repMostNodeIDs := t.summaryRepObjectBlockNodes(repObjects) solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ totalBlockCount: 1, @@ -118,11 +118,11 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { blocks: nil, }) for _, obj := range repObjects { - repObjectsUpdateEntries = append(repObjectsUpdateEntries, t.makePlansForRepObject(solu, obj, pinPlans)) + repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(solu, obj, pinPlans)) } // 对于ec对象,则每个对象单独进行退火算法 - var ecObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + var ecObjectsUpdating []coormq.UpdatingObjectRedundancy for _, obj := range ecObjects { ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ @@ -131,7 +131,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { pinnedAt: obj.PinnedAt, blocks: obj.Blocks, }) - ecObjectsUpdateEntries = append(ecObjectsUpdateEntries, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld)) + ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld)) } ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld) @@ -141,13 +141,13 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } // 根据按照方案进行调整的结果,填充更新元数据的命令 - for i := range ecObjectsUpdateEntries { - t.populateECObjectEntry(&ecObjectsUpdateEntries[i], ecObjects[i], ioSwRets) + for i := range ecObjectsUpdating { + t.populateECObjectEntry(&ecObjectsUpdating[i], ecObjects[i], ioSwRets) } - finalEntries := append(repObjectsUpdateEntries, ecObjectsUpdateEntries...) + finalEntries := append(repObjectsUpdating, ecObjectsUpdating...) if len(finalEntries) > 0 { - _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(finalEntries)) + _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(finalEntries)) if err != nil { log.Warnf("changing object redundancy: %s", err.Error()) return @@ -715,8 +715,8 @@ func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate return v > rand.Float64() } -func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.ChangeObjectRedundancyEntry { - entry := coormq.ChangeObjectRedundancyEntry{ +func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.UpdatingObjectRedundancy { + entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: obj.Object.Redundancy, } @@ -748,8 +748,8 @@ func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.O return entry } -func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.ChangeObjectRedundancyEntry { - entry := coormq.ChangeObjectRedundancyEntry{ +func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.UpdatingObjectRedundancy { + entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: obj.Object.Redundancy, } @@ -871,7 +871,7 @@ func (t *CleanPinned) executePlans(execCtx ExecuteContext, pinPlans map[cdssdk.N return ioSwRets, nil } -func (t *CleanPinned) populateECObjectEntry(entry *coormq.ChangeObjectRedundancyEntry, obj stgmod.ObjectDetail, ioRets map[string]any) { +func (t *CleanPinned) populateECObjectEntry(entry *coormq.UpdatingObjectRedundancy, obj stgmod.ObjectDetail, ioRets map[string]any) { for i := range entry.Blocks { if entry.Blocks[i].FileHash != "" { continue