diff --git a/scanner/internal/event/clean_pinned_test.go b/client/internal/ticktock/redundancy_shrink_test.go similarity index 90% rename from scanner/internal/event/clean_pinned_test.go rename to client/internal/ticktock/redundancy_shrink_test.go index dd6388a..d09a015 100644 --- a/scanner/internal/event/clean_pinned_test.go +++ b/client/internal/ticktock/redundancy_shrink_test.go @@ -1,24 +1,24 @@ -package event +package ticktock import ( "testing" . "github.com/smartystreets/goconvey/convey" "gitlink.org.cn/cloudream/common/pkgs/bitmap" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/jcs-pub/client/types" ) func newTreeTest(nodeBlocksMap []bitmap.Bitmap64) combinatorialTree { tree := combinatorialTree{ blocksMaps: make(map[int]bitmap.Bitmap64), - stgIDToLocalStgID: make(map[cdssdk.StorageID]int), + stgIDToLocalStgID: make(map[types.UserSpaceID]int), } tree.nodes = make([]combinatorialTreeNode, (1 << len(nodeBlocksMap))) for id, mp := range nodeBlocksMap { - tree.stgIDToLocalStgID[cdssdk.StorageID(id)] = len(tree.localStgIDToStgID) + tree.stgIDToLocalStgID[types.UserSpaceID(id)] = len(tree.localStgIDToStgID) tree.blocksMaps[len(tree.localStgIDToStgID)] = mp - tree.localStgIDToStgID = append(tree.localStgIDToStgID, cdssdk.StorageID(id)) + tree.localStgIDToStgID = append(tree.localStgIDToStgID, types.UserSpaceID(id)) } tree.nodes[0].localHubID = -1 @@ -125,7 +125,7 @@ func Test_UpdateBitmap(t *testing.T) { testcases := []struct { title string nodeBlocks []bitmap.Bitmap64 - updatedHubID cdssdk.StorageID + updatedHubID types.UserSpaceID updatedBitmap bitmap.Bitmap64 k int expectedTreeNodeBitmaps []int @@ -134,7 +134,7 @@ func Test_UpdateBitmap(t *testing.T) { { title: "4个节点,更新但值不变", nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, - updatedHubID: cdssdk.StorageID(0), + updatedHubID: types.UserSpaceID(0), updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, @@ -143,7 +143,7 @@ func Test_UpdateBitmap(t *testing.T) { { title: "4个节点,更新0", nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, - updatedHubID: cdssdk.StorageID(0), + updatedHubID: types.UserSpaceID(0), updatedBitmap: bitmap.Bitmap64(2), k: 4, expectedTreeNodeBitmaps: []int{0, 2, 2, 6, 14, 10, 6, 14, 10, 2, 6, 14, 10, 4, 12, 8}, @@ -152,7 +152,7 @@ func Test_UpdateBitmap(t *testing.T) { { title: "4个节点,更新1", nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, - updatedHubID: cdssdk.StorageID(1), + updatedHubID: types.UserSpaceID(1), updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 1, 5, 13, 9, 5, 13, 9, 1, 5, 13, 9, 4, 12, 8}, @@ -161,7 +161,7 @@ func Test_UpdateBitmap(t *testing.T) { { title: "4个节点,更新2", nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, - updatedHubID: cdssdk.StorageID(2), + updatedHubID: types.UserSpaceID(2), updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 3, 3, 11, 11, 1, 9, 9, 2, 3, 11, 10, 1, 9, 8}, @@ -170,7 +170,7 @@ func Test_UpdateBitmap(t *testing.T) { { title: "4个节点,更新3", nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, - updatedHubID: cdssdk.StorageID(3), + updatedHubID: types.UserSpaceID(3), updatedBitmap: bitmap.Bitmap64(1), k: 4, expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 7, 3, 5, 5, 1, 2, 6, 7, 3, 4, 5, 1}, @@ -179,7 +179,7 @@ func Test_UpdateBitmap(t *testing.T) { { title: "4个节点,k<4,更新0,0之前没有k个块,现在拥有", nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, - updatedHubID: cdssdk.StorageID(0), + updatedHubID: types.UserSpaceID(0), updatedBitmap: bitmap.Bitmap64(3), k: 2, expectedTreeNodeBitmaps: []int{0, 3, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, @@ -187,7 +187,7 @@ func Test_UpdateBitmap(t *testing.T) { { title: "4个节点,k<4,更新0,0之前有k个块,现在没有", nodeBlocks: []bitmap.Bitmap64{3, 4, 0, 0}, - updatedHubID: cdssdk.StorageID(0), + updatedHubID: types.UserSpaceID(0), updatedBitmap: bitmap.Bitmap64(0), k: 2, expectedTreeNodeBitmaps: []int{0, 0, 4, 4, 4, 4, 0, 0, 0, 4, 4, 4, 4, 0, 0, 0}, diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go deleted file mode 100644 index f660a35..0000000 --- a/scanner/internal/event/check_package_redundancy.go +++ /dev/null @@ -1,1375 +0,0 @@ -package event - -import ( - "context" - "fmt" - "strconv" - "time" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/common/utils/sort2" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - stgmod "gitlink.org.cn/cloudream/jcs-pub/common/models" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc" - lrcparser "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/parser" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/config" -) - -type CheckPackageRedundancy struct { - *scevt.CheckPackageRedundancy -} - -func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageRedundancy { - return &CheckPackageRedundancy{ - CheckPackageRedundancy: evt, - } -} - -type StorageLoadInfo struct { - Storage stgmod.StorageDetail - AccessAmount float64 -} - -func (t *CheckPackageRedundancy) TryMerge(other Event) bool { - event, ok := other.(*CheckPackageRedundancy) - if !ok { - return false - } - - return event.PackageID == t.PackageID -} - -func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { - log := logger.WithType[CheckPackageRedundancy]("Event") - startTime := time.Now() - log.Debugf("begin with %v", logger.FormatStruct(t.CheckPackageRedundancy)) - defer func() { - log.Debugf("end, time: %v", time.Since(startTime)) - }() - - // TODO 应该像其他event一样直接读取数据库 - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - log.Warnf("new coordinator client: %s", err.Error()) - return - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID)) - if err != nil { - log.Warnf("getting package objects: %s", err.Error()) - return - } - - stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.DefCtx(), t.PackageID) - if err != nil { - log.Warnf("getting package access stats: %s", err.Error()) - return - } - - // TODO UserID - getStgs, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(1)) - if err != nil { - log.Warnf("getting all storages: %s", err.Error()) - return - } - - if len(getStgs.Storages) == 0 { - log.Warnf("no available storages") - return - } - - userAllStorages := make(map[cdssdk.StorageID]*StorageLoadInfo) - for _, stg := range getStgs.Storages { - userAllStorages[stg.Storage.StorageID] = &StorageLoadInfo{ - Storage: stg, - } - } - - for _, stat := range stats { - info, ok := userAllStorages[stat.StorageID] - if !ok { - continue - } - info.AccessAmount = stat.Amount - } - - var changedObjects []coormq.UpdatingObjectRedundancy - - defRep := cdssdk.DefaultRepRedundancy - defEC := cdssdk.DefaultECRedundancy - - // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点 - // TODO 放到chooseRedundancy函数中 - mostBlockStgIDs := t.summaryRepObjectBlockStorages(getObjs.Objects, 2) - newRepStgs := t.chooseNewStoragesForRep(&defRep, userAllStorages) - rechoosedRepStgs := t.rechooseStoragesForRep(mostBlockStgIDs, &defRep, userAllStorages) - newECStgs := t.chooseNewStoragesForEC(&defEC, userAllStorages) - - // 加锁 - builder := reqbuilder.NewBuilder() - for _, storage := range newRepStgs { - builder.Shard().Buzy(storage.Storage.Storage.StorageID) - } - for _, storage := range newECStgs { - builder.Shard().Buzy(storage.Storage.Storage.StorageID) - } - mutex, err := builder.MutexLock(execCtx.Args.DistLock) - if err != nil { - log.Warnf("acquiring dist lock: %s", err.Error()) - return - } - defer mutex.Unlock() - - for _, obj := range getObjs.Objects { - var updating *coormq.UpdatingObjectRedundancy - var err error - - newRed, selectedStorages := t.chooseRedundancy(obj, userAllStorages) - - switch srcRed := obj.Object.Redundancy.(type) { - case *cdssdk.NoneRedundancy: - switch newRed := newRed.(type) { - case *cdssdk.RepRedundancy: - log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep") - updating, err = t.noneToRep(execCtx, obj, newRed, newRepStgs, userAllStorages) - - case *cdssdk.ECRedundancy: - log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") - updating, err = t.noneToEC(execCtx, obj, newRed, newECStgs, userAllStorages) - - case *cdssdk.LRCRedundancy: - log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc") - updating, err = t.noneToLRC(execCtx, obj, newRed, selectedStorages, userAllStorages) - - case *cdssdk.SegmentRedundancy: - log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> segment") - updating, err = t.noneToSeg(execCtx, obj, newRed, selectedStorages, userAllStorages) - } - - case *cdssdk.RepRedundancy: - switch newRed := newRed.(type) { - case *cdssdk.RepRedundancy: - updating, err = t.repToRep(execCtx, obj, srcRed, rechoosedRepStgs, userAllStorages) - - case *cdssdk.ECRedundancy: - log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") - updating, err = t.repToEC(execCtx, obj, newRed, newECStgs, userAllStorages) - } - - case *cdssdk.ECRedundancy: - switch newRed := newRed.(type) { - case *cdssdk.RepRedundancy: - log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep") - updating, err = t.ecToRep(execCtx, obj, srcRed, newRed, newRepStgs, userAllStorages) - - case *cdssdk.ECRedundancy: - uploadStorages := t.rechooseStoragesForEC(obj, srcRed, userAllStorages) - updating, err = t.ecToEC(execCtx, obj, srcRed, newRed, uploadStorages, userAllStorages) - } - - case *cdssdk.LRCRedundancy: - switch newRed := newRed.(type) { - case *cdssdk.LRCRedundancy: - uploadStorages := t.rechooseStoragesForLRC(obj, srcRed, userAllStorages) - updating, err = t.lrcToLRC(execCtx, obj, srcRed, newRed, uploadStorages, userAllStorages) - } - } - - if updating != nil { - changedObjects = append(changedObjects, *updating) - } - - if err != nil { - log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error()) - } - } - - if len(changedObjects) == 0 { - return - } - - _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects)) - if err != nil { - log.Warnf("requesting to change object redundancy: %s", err.Error()) - return - } -} - -func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllStgs map[cdssdk.StorageID]*StorageLoadInfo) (cdssdk.Redundancy, []*StorageLoadInfo) { - switch obj.Object.Redundancy.(type) { - case *cdssdk.NoneRedundancy: - if obj.Object.Size > config.Cfg().ECFileSizeThreshold { - newStgs := t.chooseNewStoragesForEC(&cdssdk.DefaultECRedundancy, userAllStgs) - return &cdssdk.DefaultECRedundancy, newStgs - } - - return &cdssdk.DefaultRepRedundancy, t.chooseNewStoragesForRep(&cdssdk.DefaultRepRedundancy, userAllStgs) - - case *cdssdk.RepRedundancy: - if obj.Object.Size > config.Cfg().ECFileSizeThreshold { - newStgs := t.chooseNewStoragesForEC(&cdssdk.DefaultECRedundancy, userAllStgs) - return &cdssdk.DefaultECRedundancy, newStgs - } - - case *cdssdk.ECRedundancy: - if obj.Object.Size <= config.Cfg().ECFileSizeThreshold { - return &cdssdk.DefaultRepRedundancy, t.chooseNewStoragesForRep(&cdssdk.DefaultRepRedundancy, userAllStgs) - } - - case *cdssdk.LRCRedundancy: - newLRCStgs := t.rechooseStoragesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllStgs) - return &cdssdk.DefaultLRCRedundancy, newLRCStgs - - } - return nil, nil -} - -// 统计每个对象块所在的节点,选出块最多的不超过storageCnt个节点 -func (t *CheckPackageRedundancy) summaryRepObjectBlockStorages(objs []stgmod.ObjectDetail, storageCnt int) []cdssdk.StorageID { - type stgBlocks struct { - StorageID cdssdk.StorageID - Count int - } - - stgBlocksMap := make(map[cdssdk.StorageID]*stgBlocks) - for _, obj := range objs { - shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold - if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC { - for _, block := range obj.Blocks { - if _, ok := stgBlocksMap[block.StorageID]; !ok { - stgBlocksMap[block.StorageID] = &stgBlocks{ - StorageID: block.StorageID, - Count: 0, - } - } - stgBlocksMap[block.StorageID].Count++ - } - } - } - - storages := lo.Values(stgBlocksMap) - sort2.Sort(storages, func(left *stgBlocks, right *stgBlocks) int { - return right.Count - left.Count - }) - - ids := lo.Map(storages, func(item *stgBlocks, idx int) cdssdk.StorageID { return item.StorageID }) - if len(ids) > storageCnt { - ids = ids[:storageCnt] - } - return ids -} - -func (t *CheckPackageRedundancy) chooseNewStoragesForRep(red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo { - sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int { - return sort2.Cmp(right.AccessAmount, left.AccessAmount) - }) - - return t.chooseSoManyStorages(red.RepCount, sortedStorages) -} - -func (t *CheckPackageRedundancy) chooseNewStoragesForEC(red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo { - sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int { - return sort2.Cmp(right.AccessAmount, left.AccessAmount) - }) - - return t.chooseSoManyStorages(red.N, sortedStorages) -} - -func (t *CheckPackageRedundancy) chooseNewStoragesForLRC(red *cdssdk.LRCRedundancy, allStorages map[cdssdk.HubID]*StorageLoadInfo) []*StorageLoadInfo { - sortedStorages := sort2.Sort(lo.Values(allStorages), func(left *StorageLoadInfo, right *StorageLoadInfo) int { - return sort2.Cmp(right.AccessAmount, left.AccessAmount) - }) - - return t.chooseSoManyStorages(red.N, sortedStorages) -} - -func (t *CheckPackageRedundancy) chooseNewStoragesForSeg(segCount int, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo { - sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int { - return sort2.Cmp(right.AccessAmount, left.AccessAmount) - }) - - return t.chooseSoManyStorages(segCount, sortedStorages) -} - -func (t *CheckPackageRedundancy) rechooseStoragesForRep(mostBlockStgIDs []cdssdk.StorageID, red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo { - type rechooseStorage struct { - *StorageLoadInfo - HasBlock bool - } - - var rechooseStgs []*rechooseStorage - for _, stg := range allStgs { - hasBlock := false - for _, id := range mostBlockStgIDs { - if id == stg.Storage.Storage.StorageID { - hasBlock = true - break - } - } - - rechooseStgs = append(rechooseStgs, &rechooseStorage{ - StorageLoadInfo: stg, - HasBlock: hasBlock, - }) - } - - sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStorage, right *rechooseStorage) int { - // 已经缓存了文件块的节点优先选择 - v := sort2.CmpBool(right.HasBlock, left.HasBlock) - if v != 0 { - return v - } - - return sort2.Cmp(right.AccessAmount, left.AccessAmount) - }) - - return t.chooseSoManyStorages(red.RepCount, lo.Map(sortedStgs, func(storage *rechooseStorage, idx int) *StorageLoadInfo { return storage.StorageLoadInfo })) -} - -func (t *CheckPackageRedundancy) rechooseStoragesForEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo { - type rechooseStg struct { - *StorageLoadInfo - CachedBlockIndex int - } - - var rechooseStgs []*rechooseStg - for _, stg := range allStgs { - cachedBlockIndex := -1 - for _, block := range obj.Blocks { - if block.StorageID == stg.Storage.Storage.StorageID { - cachedBlockIndex = block.Index - break - } - } - - rechooseStgs = append(rechooseStgs, &rechooseStg{ - StorageLoadInfo: stg, - CachedBlockIndex: cachedBlockIndex, - }) - } - - sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int { - // 已经缓存了文件块的节点优先选择 - v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) - if v != 0 { - return v - } - - return sort2.Cmp(right.AccessAmount, left.AccessAmount) - }) - - // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 - return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo })) -} - -func (t *CheckPackageRedundancy) rechooseStoragesForLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo { - type rechooseStg struct { - *StorageLoadInfo - CachedBlockIndex int - } - - var rechooseStgs []*rechooseStg - for _, stg := range allStgs { - cachedBlockIndex := -1 - for _, block := range obj.Blocks { - if block.StorageID == stg.Storage.Storage.StorageID { - cachedBlockIndex = block.Index - break - } - } - - rechooseStgs = append(rechooseStgs, &rechooseStg{ - StorageLoadInfo: stg, - CachedBlockIndex: cachedBlockIndex, - }) - } - - sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int { - // 已经缓存了文件块的节点优先选择 - v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) - if v != 0 { - return v - } - - return sort2.Cmp(right.AccessAmount, left.AccessAmount) - }) - - // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 - return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo })) -} - -func (t *CheckPackageRedundancy) chooseSoManyStorages(count int, stgs []*StorageLoadInfo) []*StorageLoadInfo { - repeateCount := (count + len(stgs) - 1) / len(stgs) - extendStgs := make([]*StorageLoadInfo, repeateCount*len(stgs)) - - // 使用复制的方式将节点数扩充到要求的数量 - // 复制之后的结构:ABCD -> AAABBBCCCDDD - for p := 0; p < repeateCount; p++ { - for i, storage := range stgs { - putIdx := i*repeateCount + p - extendStgs[putIdx] = storage - } - } - extendStgs = extendStgs[:count] - - var chosen []*StorageLoadInfo - for len(chosen) < count { - // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮 - chosenLocations := make(map[cdssdk.LocationID]bool) - for i, stg := range extendStgs { - if stg == nil { - continue - } - - if chosenLocations[stg.Storage.MasterHub.LocationID] { - continue - } - - chosen = append(chosen, stg) - chosenLocations[stg.Storage.MasterHub.LocationID] = true - extendStgs[i] = nil - } - } - - return chosen -} - -func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - if len(obj.Blocks) == 0 { - return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep") - } - - srcStg, ok := allStgs[obj.Blocks[0].StorageID] - if !ok { - return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID) - } - if srcStg.Storage.MasterHub == nil { - return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID) - } - - // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) - - ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) - for i, stg := range uploadStgs { - ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i))) - } - - plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - // TODO 添加依赖 - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ret, err := plans.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - var blocks []stgmod.ObjectBlock - var blockChgs []stgmod.BlockChange - for i, stg := range uploadStgs { - r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) - blocks = append(blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: 0, - StorageID: stg.Storage.Storage.StorageID, - FileHash: r.Hash, - Size: r.Size, - }) - blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ - BlockType: stgmod.BlockTypeRaw, - SourceStorageID: obj.Blocks[0].StorageID, - TargetStorageID: stg.Storage.Storage.StorageID, - TransferBytes: 1, - }) - } - - // 删除原本的文件块 - blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ - Index: 0, - StorageID: obj.Blocks[0].StorageID, - }) - - ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: blockChgs, - }) - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: red, - Blocks: blocks, - }, nil -} - -func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - if len(obj.Blocks) == 0 { - return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec") - } - - srcStg, ok := allStgs[obj.Blocks[0].StorageID] - if !ok { - return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID) - } - if srcStg.Storage.MasterHub == nil { - return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID) - } - - ft := ioswitch2.NewFromTo() - ft.ECParam = red - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) - for i := 0; i < red.N; i++ { - ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i))) - } - plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ioRet, err := plans.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - var blocks []stgmod.ObjectBlock - var evtTargetBlocks []stgmod.Block - var evtBlockTrans []stgmod.DataTransfer - for i := 0; i < red.N; i++ { - r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) - blocks = append(blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: i, - StorageID: uploadStgs[i].Storage.Storage.StorageID, - FileHash: r.Hash, - Size: r.Size, - }) - evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeEC, - Index: i, - StorageID: uploadStgs[i].Storage.Storage.StorageID, - }) - evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ - SourceStorageID: obj.Blocks[0].StorageID, - TargetStorageID: uploadStgs[i].Storage.Storage.StorageID, - TransferBytes: 1, - }) - } - - ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: []stgmod.BlockChange{ - &stgmod.BlockChangeEnDecode{ - SourceBlocks: []stgmod.Block{{ - BlockType: stgmod.BlockTypeRaw, - StorageID: obj.Blocks[0].StorageID, - }}, - TargetBlocks: evtTargetBlocks, - DataTransfers: evtBlockTrans, - }, - - // 删除原本的文件块 - &stgmod.BlockChangeDeleted{ - Index: 0, - StorageID: obj.Blocks[0].StorageID, - }, - }, - }) - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: red, - Blocks: blocks, - }, nil -} - -func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - if len(obj.Blocks) == 0 { - return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec") - } - - srcStg, ok := allStgs[obj.Blocks[0].StorageID] - if !ok { - return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID) - } - if srcStg.Storage.MasterHub == nil { - return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID) - } - - var toes []ioswitchlrc.To - for i := 0; i < red.N; i++ { - toes = append(toes, ioswitchlrc.NewToStorage(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, i, fmt.Sprintf("%d", i))) - } - - plans := exec.NewPlanBuilder() - err := lrcparser.Encode(ioswitchlrc.NewFromStorage(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, -1), toes, plans) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ioRet, err := plans.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - var blocks []stgmod.ObjectBlock - var evtTargetBlocks []stgmod.Block - var evtBlockTrans []stgmod.DataTransfer - for i := 0; i < red.N; i++ { - r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) - blocks = append(blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: i, - StorageID: uploadStgs[i].Storage.Storage.StorageID, - FileHash: r.Hash, - Size: r.Size, - }) - evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeEC, - Index: i, - StorageID: uploadStgs[i].Storage.Storage.StorageID, - }) - evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ - SourceStorageID: obj.Blocks[0].StorageID, - TargetStorageID: uploadStgs[i].Storage.Storage.StorageID, - TransferBytes: 1, - }) - } - - ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: []stgmod.BlockChange{ - &stgmod.BlockChangeEnDecode{ - SourceBlocks: []stgmod.Block{{ - BlockType: stgmod.BlockTypeRaw, - StorageID: obj.Blocks[0].StorageID, - }}, - TargetBlocks: evtTargetBlocks, - DataTransfers: evtBlockTrans, - }, - - // 删除原本的文件块 - &stgmod.BlockChangeDeleted{ - Index: 0, - StorageID: obj.Blocks[0].StorageID, - }, - }, - }) - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: red, - Blocks: blocks, - }, nil -} - -func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.SegmentRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - if len(obj.Blocks) == 0 { - return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep") - } - - srcStg, ok := allStgs[obj.Blocks[0].StorageID] - if !ok { - return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID) - } - if srcStg.Storage.MasterHub == nil { - return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID) - } - - // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) - - ft := ioswitch2.NewFromTo() - ft.SegmentParam = red - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) - for i, stg := range uploadStgs { - ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i))) - } - - plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - // TODO 添加依赖 - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ret, err := plans.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - var blocks []stgmod.ObjectBlock - var evtTargetBlocks []stgmod.Block - var evtBlockTrans []stgmod.DataTransfer - for i, stg := range uploadStgs { - r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) - blocks = append(blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: i, - StorageID: stg.Storage.Storage.StorageID, - FileHash: r.Hash, - Size: r.Size, - }) - evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeSegment, - Index: i, - StorageID: uploadStgs[i].Storage.Storage.StorageID, - }) - evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ - SourceStorageID: obj.Blocks[0].StorageID, - TargetStorageID: uploadStgs[i].Storage.Storage.StorageID, - TransferBytes: 1, - }) - } - - ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: []stgmod.BlockChange{ - &stgmod.BlockChangeEnDecode{ - SourceBlocks: []stgmod.Block{{ - BlockType: stgmod.BlockTypeRaw, - StorageID: obj.Blocks[0].StorageID, - }}, - TargetBlocks: evtTargetBlocks, - DataTransfers: evtBlockTrans, - }, - - // 删除原本的文件块 - &stgmod.BlockChangeDeleted{ - Index: 0, - StorageID: obj.Blocks[0].StorageID, - }, - }, - }) - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: red, - Blocks: blocks, - }, nil -} - -func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - if len(obj.Blocks) == 0 { - return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep") - } - - srcStg, ok := allStgs[obj.Blocks[0].StorageID] - if !ok { - return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID) - } - if srcStg.Storage.MasterHub == nil { - return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID) - } - - // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) - - ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) - for i, stg := range uploadStgs { - ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i))) - } - - plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - // TODO 添加依赖 - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ret, err := plans.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - var blocks []stgmod.ObjectBlock - var blockChgs []stgmod.BlockChange - for i, stg := range uploadStgs { - r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) - blocks = append(blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: 0, - StorageID: stg.Storage.Storage.StorageID, - FileHash: r.Hash, - Size: r.Size, - }) - blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ - BlockType: stgmod.BlockTypeRaw, - SourceStorageID: obj.Blocks[0].StorageID, - TargetStorageID: stg.Storage.Storage.StorageID, - TransferBytes: 1, - }) - } - - // 删除原本的文件块 - blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ - Index: 0, - StorageID: obj.Blocks[0].StorageID, - }) - - ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: blockChgs, - }) - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: red, - Blocks: blocks, - }, nil -} - -func (t *CheckPackageRedundancy) repToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - return t.noneToEC(ctx, obj, red, uploadStorages, allStgs) -} - -func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - var chosenBlocks []stgmod.GrouppedObjectBlock - var chosenBlockIndexes []int - var chosenBlockStg []stgmod.StorageDetail - for _, block := range obj.GroupBlocks() { - if len(block.StorageIDs) > 0 { - // TODO 考虑选择最优的节点 - stg, ok := allStgs[block.StorageIDs[0]] - if !ok { - continue - } - if stg.Storage.MasterHub == nil { - continue - } - - chosenBlocks = append(chosenBlocks, block) - chosenBlockIndexes = append(chosenBlockIndexes, block.Index) - chosenBlockStg = append(chosenBlockStg, stg.Storage) - } - - if len(chosenBlocks) == srcRed.K { - break - } - } - - if len(chosenBlocks) < srcRed.K { - return nil, fmt.Errorf("no enough blocks to reconstruct the original file data") - } - - // 如果选择的备份节点都是同一个,那么就只要上传一次 - uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) - - planBlder := exec.NewPlanBuilder() - ft := ioswitch2.NewFromTo() - ft.ECParam = srcRed - - for i, block := range chosenBlocks { - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index))) - } - - for i := range uploadStgs { - ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.NewRange(0, obj.Object.Size))) - } - - err := parser.Parse(ft, planBlder) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - // TODO 添加依赖 - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ioRet, err := planBlder.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - var blocks []stgmod.ObjectBlock - - for i := range uploadStgs { - r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) - blocks = append(blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: 0, - StorageID: uploadStgs[i].Storage.Storage.StorageID, - FileHash: r.Hash, - Size: r.Size, - }) - } - - var evtSrcBlocks []stgmod.Block - var evtTargetBlocks []stgmod.Block - for i2, block := range chosenBlocks { - evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeEC, - Index: block.Index, - StorageID: chosenBlockStg[i2].Storage.StorageID, - }) - } - - for _, stg := range uploadStgs { - evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeRaw, - Index: 0, - StorageID: stg.Storage.Storage.StorageID, - }) - } - - var evtBlockTrans []stgmod.DataTransfer - for _, stg := range uploadStgs { - for i2 := range chosenBlocks { - evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ - SourceStorageID: chosenBlockStg[i2].Storage.StorageID, - TargetStorageID: stg.Storage.Storage.StorageID, - TransferBytes: 1, - }) - } - } - - var blockChgs []stgmod.BlockChange - blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{ - SourceBlocks: evtSrcBlocks, - TargetBlocks: evtTargetBlocks, - DataTransfers: evtBlockTrans, - }) - - for _, block := range obj.Blocks { - blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ - Index: block.Index, - StorageID: block.StorageID, - }) - } - - ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: blockChgs, - }) - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: tarRed, - Blocks: blocks, - }, nil -} - -func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - grpBlocks := obj.GroupBlocks() - - var chosenBlocks []stgmod.GrouppedObjectBlock - var chosenBlockStg []stgmod.StorageDetail - for _, block := range grpBlocks { - if len(block.StorageIDs) > 0 { - stg, ok := allStgs[block.StorageIDs[0]] - if !ok { - continue - } - if stg.Storage.MasterHub == nil { - continue - } - - chosenBlocks = append(chosenBlocks, block) - chosenBlockStg = append(chosenBlockStg, stg.Storage) - } - - if len(chosenBlocks) == srcRed.K { - break - } - } - - if len(chosenBlocks) < srcRed.K { - return nil, fmt.Errorf("no enough blocks to reconstruct the original file data") - } - - // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 - planBlder := exec.NewPlanBuilder() - - var evtSrcBlocks []stgmod.Block - var evtTargetBlocks []stgmod.Block - - ft := ioswitch2.NewFromTo() - ft.ECParam = srcRed - - for i, block := range chosenBlocks { - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index))) - - evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeEC, - Index: block.Index, - StorageID: chosenBlockStg[i].Storage.StorageID, - }) - } - - var newBlocks []stgmod.ObjectBlock - shouldUpdateBlocks := false - for i, stg := range uploadStorages { - newBlock := stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: i, - StorageID: stg.Storage.Storage.StorageID, - } - - grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i }) - - // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 - if ok && lo.Contains(grp.StorageIDs, stg.Storage.Storage.StorageID) { - newBlock.FileHash = grp.FileHash - newBlock.Size = grp.Size - newBlocks = append(newBlocks, newBlock) - continue - } - - shouldUpdateBlocks = true - - // 否则就要重建出这个节点需要的块 - // 输出只需要自己要保存的那一块 - ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i))) - - evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeEC, - Index: i, - StorageID: stg.Storage.Storage.StorageID, - }) - - newBlocks = append(newBlocks, newBlock) - } - - err := parser.Parse(ft, planBlder) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - // 如果没有任何Plan,Wait会直接返回成功 - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ret, err := planBlder.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - if !shouldUpdateBlocks { - return nil, nil - } - - for k, v := range ret { - idx, err := strconv.ParseInt(k, 10, 64) - if err != nil { - return nil, fmt.Errorf("parsing result key %s as index: %w", k, err) - } - - r := v.(*ops2.ShardInfoValue) - newBlocks[idx].FileHash = r.Hash - newBlocks[idx].Size = r.Size - } - - var evtBlockTrans []stgmod.DataTransfer - for _, src := range evtSrcBlocks { - for _, tar := range evtTargetBlocks { - evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ - SourceStorageID: src.StorageID, - TargetStorageID: tar.StorageID, - TransferBytes: 1, - }) - } - } - - var blockChgs []stgmod.BlockChange - for _, block := range obj.Blocks { - keep := lo.ContainsBy(newBlocks, func(newBlock stgmod.ObjectBlock) bool { - return newBlock.Index == block.Index && newBlock.StorageID == block.StorageID - }) - if !keep { - blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ - Index: block.Index, - StorageID: block.StorageID, - }) - } - } - blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{ - SourceBlocks: evtSrcBlocks, - TargetBlocks: evtTargetBlocks, - DataTransfers: evtBlockTrans, - }) - - ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: blockChgs, - }) - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: tarRed, - Blocks: newBlocks, - }, nil -} - -func (t *CheckPackageRedundancy) lrcToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - - blocksGrpByIndex := obj.GroupBlocks() - - var lostBlocks []int - var lostBlockGrps []int - canGroupReconstruct := true - - allBlockFlags := make([]bool, srcRed.N) - for _, block := range blocksGrpByIndex { - allBlockFlags[block.Index] = true - } - - for i, ok := range allBlockFlags { - grpID := srcRed.FindGroup(i) - if !ok { - if grpID == -1 { - canGroupReconstruct = false - break - } - - if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID { - canGroupReconstruct = false - break - } - - lostBlocks = append(lostBlocks, i) - lostBlockGrps = append(lostBlockGrps, grpID) - } - } - - // TODO 产生BlockTransfer事件 - - if canGroupReconstruct { - // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadStorages) - } - - return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadStorages, allStgs) -} - -/* -TODO2 修复这一块的代码 - - func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - grped := make(map[int]stgmod.GrouppedObjectBlock) - for _, b := range grpedBlocks { - grped[b.Index] = b - } - - plans := exec.NewPlanBuilder() - - for i := 0; i < len(lostBlocks); i++ { - var froms []ioswitchlrc.From - grpEles := red.GetGroupElements(lostBlockGrps[i]) - for _, ele := range grpEles { - if ele == lostBlocks[i] { - continue - } - - froms = append(froms, ioswitchlrc.NewFromStorage(grped[ele].FileHash, nil, ele)) - } - - err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{ - ioswitchlrc.NewToStorage(uploadStorages[i].Storage, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])), - }, plans) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - } - - fmt.Printf("plans: %v\n", plans) - - // 如果没有任何Plan,Wait会直接返回成功 - // TODO 添加依赖 - ret, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - var newBlocks []stgmod.ObjectBlock - for _, i := range lostBlocks { - newBlocks = append(newBlocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: i, - StorageID: uploadStorages[i].Storage.Storage.StorageID, - FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, - }) - } - for _, b := range grpedBlocks { - for _, hubID := range b.StorageIDs { - newBlocks = append(newBlocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: b.Index, - StorageID: hubID, - FileHash: b.FileHash, - }) - } - } - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - Redundancy: red, - Blocks: newBlocks, - }, nil - } -*/ -func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - var chosenBlocks []stgmod.GrouppedObjectBlock - var chosenBlockStg []stgmod.StorageDetail - - for _, block := range grpBlocks { - if len(block.StorageIDs) > 0 && block.Index < red.M() { - stg, ok := allStgs[block.StorageIDs[0]] - if !ok { - continue - } - if stg.Storage.MasterHub == nil { - continue - } - - chosenBlocks = append(chosenBlocks, block) - chosenBlockStg = append(chosenBlockStg, stg.Storage) - } - - if len(chosenBlocks) == red.K { - break - } - } - - if len(chosenBlocks) < red.K { - return nil, fmt.Errorf("no enough blocks to reconstruct the original file data") - } - - // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 - planBlder := exec.NewPlanBuilder() - - var froms []ioswitchlrc.From - var toes []ioswitchlrc.To - var newBlocks []stgmod.ObjectBlock - shouldUpdateBlocks := false - for i, storage := range uploadStorages { - newBlock := stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: i, - StorageID: storage.Storage.Storage.StorageID, - } - - grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i }) - - // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 - if ok && lo.Contains(grp.StorageIDs, storage.Storage.Storage.StorageID) { - newBlock.FileHash = grp.FileHash - newBlock.Size = grp.Size - newBlocks = append(newBlocks, newBlock) - continue - } - - shouldUpdateBlocks = true - - // 否则就要重建出这个节点需要的块 - - for i2, block := range chosenBlocks { - froms = append(froms, ioswitchlrc.NewFromStorage(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2].Storage, block.Index)) - } - - // 输出只需要自己要保存的那一块 - toes = append(toes, ioswitchlrc.NewToStorage(*storage.Storage.MasterHub, storage.Storage.Storage, i, fmt.Sprintf("%d", i))) - - newBlocks = append(newBlocks, newBlock) - } - - err := lrcparser.ReconstructAny(froms, toes, planBlder) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } - - fmt.Printf("plans: %v\n", planBlder) - - // 如果没有任何Plan,Wait会直接返回成功 - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ret, err := planBlder.Execute(execCtx).Wait(context.Background()) - if err != nil { - return nil, fmt.Errorf("executing io plan: %w", err) - } - - if !shouldUpdateBlocks { - return nil, nil - } - - for k, v := range ret { - idx, err := strconv.ParseInt(k, 10, 64) - if err != nil { - return nil, fmt.Errorf("parsing result key %s as index: %w", k, err) - } - - r := v.(*ops2.ShardInfoValue) - newBlocks[idx].FileHash = r.Hash - newBlocks[idx].Size = r.Size - } - - return &coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: red, - Blocks: newBlocks, - }, nil -} - -// func (t *CheckPackageRedundancy) pinObject(hubID cdssdk.HubID, fileHash string) error { -// agtCli, err := stgglb.HubMQPool.Acquire(hubID) -// if err != nil { -// return fmt.Errorf("new hub client: %w", err) -// } -// defer stgglb.HubMQPool.Release(agtCli) - -// _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false)) -// if err != nil { -// return fmt.Errorf("start pinning object: %w", err) -// } - -// return nil -// } - -func init() { - RegisterMessageConvertor(NewCheckPackageRedundancy) -} diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go deleted file mode 100644 index ad14a2e..0000000 --- a/scanner/internal/event/clean_pinned.go +++ /dev/null @@ -1,1043 +0,0 @@ -package event - -import ( - "context" - "fmt" - "math" - "math/rand" - "sync" - "time" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/bitmap" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/common/utils/sort2" - "gitlink.org.cn/cloudream/jcs-pub/common/consts" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - stgmod "gitlink.org.cn/cloudream/jcs-pub/common/models" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" -) - -type CleanPinned struct { - *scevt.CleanPinned -} - -func NewCleanPinned(evt *scevt.CleanPinned) *CleanPinned { - return &CleanPinned{ - CleanPinned: evt, - } -} - -func (t *CleanPinned) TryMerge(other Event) bool { - event, ok := other.(*CleanPinned) - if !ok { - return false - } - - return t.PackageID == event.PackageID -} - -func (t *CleanPinned) Execute(execCtx ExecuteContext) { - log := logger.WithType[CleanPinned]("Event") - startTime := time.Now() - log.Debugf("begin with %v", logger.FormatStruct(t.CleanPinned)) - defer func() { - log.Debugf("end, time: %v", time.Since(startTime)) - }() - - // TODO 应该与其他event一样,直接访问数据库 - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - log.Warnf("new coordinator client: %s", err.Error()) - return - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID)) - if err != nil { - log.Warnf("getting package objects: %s", err.Error()) - return - } - - stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.DefCtx(), t.PackageID) - if err != nil { - log.Warnf("getting package access stat: %s", err.Error()) - return - } - var readerStgIDs []cdssdk.StorageID - for _, item := range stats { - // TODO 可以考虑做成配置 - if item.Amount >= float64(len(getObjs.Objects)/2) { - readerStgIDs = append(readerStgIDs, item.StorageID) - } - } - - // 注意!需要保证allStgID包含所有之后可能用到的节点ID - // TOOD 可以考虑设计Cache机制 - var allStgID []cdssdk.StorageID - for _, obj := range getObjs.Objects { - for _, block := range obj.Blocks { - allStgID = append(allStgID, block.StorageID) - } - allStgID = append(allStgID, obj.PinnedAt...) - } - allStgID = append(allStgID, readerStgIDs...) - - getStgResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(lo.Union(allStgID))) - if err != nil { - log.Warnf("getting nodes: %s", err.Error()) - return - } - - allStgInfos := make(map[cdssdk.StorageID]*stgmod.StorageDetail) - for _, stg := range getStgResp.Storages { - allStgInfos[stg.Storage.StorageID] = stg - } - - // 只对ec和rep对象进行处理 - var ecObjects []stgmod.ObjectDetail - var repObjects []stgmod.ObjectDetail - for _, obj := range getObjs.Objects { - if _, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok { - ecObjects = append(ecObjects, obj) - } else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok { - repObjects = append(repObjects, obj) - } - } - - planBld := exec.NewPlanBuilder() - planningStgIDs := make(map[cdssdk.StorageID]bool) - - var sysEvents []stgmod.SysEventBody - - // 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法 - var repObjectsUpdating []coormq.UpdatingObjectRedundancy - repMostHubIDs := t.summaryRepObjectBlockNodes(repObjects) - solu := t.startAnnealing(allStgInfos, readerStgIDs, annealingObject{ - totalBlockCount: 1, - minBlockCnt: 1, - pinnedAt: repMostHubIDs, - blocks: nil, - }) - for _, obj := range repObjects { - repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(allStgInfos, solu, obj, planBld, planningStgIDs)) - sysEvents = append(sysEvents, t.generateSysEventForRepObject(solu, obj)...) - } - - // 对于ec对象,则每个对象单独进行退火算法 - var ecObjectsUpdating []coormq.UpdatingObjectRedundancy - for _, obj := range ecObjects { - ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) - solu := t.startAnnealing(allStgInfos, readerStgIDs, annealingObject{ - totalBlockCount: ecRed.N, - minBlockCnt: ecRed.K, - pinnedAt: obj.PinnedAt, - blocks: obj.Blocks, - }) - ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allStgInfos, solu, obj, planBld, planningStgIDs)) - sysEvents = append(sysEvents, t.generateSysEventForECObject(solu, obj)...) - } - - ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs) - if err != nil { - log.Warn(err.Error()) - return - } - - // 根据按照方案进行调整的结果,填充更新元数据的命令 - for i := range ecObjectsUpdating { - t.populateECObjectEntry(&ecObjectsUpdating[i], ecObjects[i], ioSwRets) - } - - finalEntries := append(repObjectsUpdating, ecObjectsUpdating...) - if len(finalEntries) > 0 { - _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(finalEntries)) - if err != nil { - log.Warnf("changing object redundancy: %s", err.Error()) - return - } - - for _, e := range sysEvents { - execCtx.Args.EvtPub.Publish(e) - } - } -} - -func (t *CleanPinned) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail) []cdssdk.StorageID { - type stgBlocks struct { - StorageID cdssdk.StorageID - Count int - } - - stgBlocksMap := make(map[cdssdk.StorageID]*stgBlocks) - for _, obj := range objs { - cacheBlockStgs := make(map[cdssdk.StorageID]bool) - for _, block := range obj.Blocks { - if _, ok := stgBlocksMap[block.StorageID]; !ok { - stgBlocksMap[block.StorageID] = &stgBlocks{ - StorageID: block.StorageID, - Count: 0, - } - } - stgBlocksMap[block.StorageID].Count++ - cacheBlockStgs[block.StorageID] = true - } - - for _, hubID := range obj.PinnedAt { - if cacheBlockStgs[hubID] { - continue - } - - if _, ok := stgBlocksMap[hubID]; !ok { - stgBlocksMap[hubID] = &stgBlocks{ - StorageID: hubID, - Count: 0, - } - } - stgBlocksMap[hubID].Count++ - } - } - - stgs := lo.Values(stgBlocksMap) - sort2.Sort(stgs, func(left *stgBlocks, right *stgBlocks) int { - return right.Count - left.Count - }) - - // 只选出块数超过一半的节点,但要保证至少有两个节点 - for i := 2; i < len(stgs); i++ { - if stgs[i].Count < len(objs)/2 { - stgs = stgs[:i] - break - } - } - - return lo.Map(stgs, func(item *stgBlocks, idx int) cdssdk.StorageID { return item.StorageID }) -} - -type annealingState struct { - allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail // 所有节点的信息 - readerStgIDs []cdssdk.StorageID // 近期可能访问此对象的节点 - stgsSortedByReader map[cdssdk.StorageID][]stgDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 - object annealingObject // 进行退火的对象 - blockList []objectBlock // 排序后的块分布情况 - stgBlockBitmaps map[cdssdk.StorageID]*bitmap.Bitmap64 // 用位图的形式表示每一个节点上有哪些块 - stgCombTree combinatorialTree // 节点组合树,用于加速计算容灾度 - - maxScore float64 // 搜索过程中得到过的最大分数 - maxScoreRmBlocks []bool // 最大分数对应的删除方案 - - rmBlocks []bool // 当前删除方案 - inversedIndex int // 当前删除方案是从上一次的方案改动哪个flag而来的 - lastDisasterTolerance float64 // 上一次方案的容灾度 - lastSpaceCost float64 // 上一次方案的冗余度 - lastMinAccessCost float64 // 上一次方案的最小访问费用 - lastScore float64 // 上一次方案的分数 -} - -type objectBlock struct { - Index int - StorageID cdssdk.StorageID - HasEntity bool // 节点拥有实际的文件数据块 - HasShadow bool // 如果节点拥有完整文件数据,那么认为这个节点拥有所有块,这些块被称为影子块 - FileHash cdssdk.FileHash // 只有在拥有实际文件数据块时,这个字段才有值 - Size int64 // 块大小 -} - -type stgDist struct { - StorageID cdssdk.StorageID - Distance float64 -} - -type combinatorialTree struct { - nodes []combinatorialTreeNode - blocksMaps map[int]bitmap.Bitmap64 - stgIDToLocalStgID map[cdssdk.StorageID]int - localStgIDToStgID []cdssdk.StorageID -} - -type annealingObject struct { - totalBlockCount int - minBlockCnt int - pinnedAt []cdssdk.StorageID - blocks []stgmod.ObjectBlock -} - -const ( - iterActionNone = 0 - iterActionSkip = 1 - iterActionBreak = 2 -) - -func newCombinatorialTree(stgBlocksMaps map[cdssdk.StorageID]*bitmap.Bitmap64) combinatorialTree { - tree := combinatorialTree{ - blocksMaps: make(map[int]bitmap.Bitmap64), - stgIDToLocalStgID: make(map[cdssdk.StorageID]int), - } - - tree.nodes = make([]combinatorialTreeNode, (1 << len(stgBlocksMaps))) - for id, mp := range stgBlocksMaps { - tree.stgIDToLocalStgID[id] = len(tree.localStgIDToStgID) - tree.blocksMaps[len(tree.localStgIDToStgID)] = *mp - tree.localStgIDToStgID = append(tree.localStgIDToStgID, id) - } - - tree.nodes[0].localHubID = -1 - index := 1 - tree.initNode(0, &tree.nodes[0], &index) - - return tree -} - -func (t *combinatorialTree) initNode(minAvaiLocalHubID int, parent *combinatorialTreeNode, index *int) { - for i := minAvaiLocalHubID; i < len(t.stgIDToLocalStgID); i++ { - curIndex := *index - *index++ - bitMp := t.blocksMaps[i] - bitMp.Or(&parent.blocksBitmap) - - t.nodes[curIndex] = combinatorialTreeNode{ - localHubID: i, - parent: parent, - blocksBitmap: bitMp, - } - t.initNode(i+1, &t.nodes[curIndex], index) - } -} - -// 获得索引指定的节点所在的层 -func (t *combinatorialTree) GetDepth(index int) int { - depth := 0 - - // 反复判断节点在哪个子树。从左到右,子树节点的数量呈现8 4 2的变化,由此可以得到每个子树的索引值的范围 - subTreeCount := 1 << len(t.stgIDToLocalStgID) - for index > 0 { - if index < subTreeCount { - // 定位到一个子树后,深度+1,然后进入这个子树,使用同样的方法再进行定位。 - // 进入子树后需要将索引值-1,因为要去掉子树的根节点 - index-- - depth++ - } else { - // 如果索引值不在这个子树范围内,则将值减去子树的节点数量, - // 这样每一次都可以视为使用同样的逻辑对不同大小的树进行判断。 - index -= subTreeCount - } - subTreeCount >>= 1 - } - - return depth -} - -// 更新某一个算力中心节点的块分布位图,同时更新它对应组合树节点的所有子节点。 -// 如果更新到某个节点时,已有K个块,那么就不会再更新它的子节点 -func (t *combinatorialTree) UpdateBitmap(stgID cdssdk.StorageID, mp bitmap.Bitmap64, k int) { - t.blocksMaps[t.stgIDToLocalStgID[stgID]] = mp - // 首先定义两种遍历树节点时的移动方式: - // 1. 竖直移动(深度增加):从一个节点移动到它最左边的子节点。每移动一步,index+1 - // 2. 水平移动:从一个节点移动到它右边的兄弟节点。每移动一步,根据它所在的深度,index+8,+4,+2 - // LocalID从0开始,将其+1后得到移动步数steps。 - // 将移动步数拆成多部分,分配到上述的两种移动方式上,并进行任意组合,且保证第一次为至少进行一次的竖直移动,移动之后的节点都会是同一个计算中心节点。 - steps := t.stgIDToLocalStgID[stgID] + 1 - for d := 1; d <= steps; d++ { - t.iterCombBits(len(t.stgIDToLocalStgID)-1, steps-d, 0, func(i int) { - index := d + i - node := &t.nodes[index] - - newMp := t.blocksMaps[node.localHubID] - newMp.Or(&node.parent.blocksBitmap) - node.blocksBitmap = newMp - if newMp.Weight() >= k { - return - } - - t.iterChildren(index, func(index, parentIndex, depth int) int { - curNode := &t.nodes[index] - parentNode := t.nodes[parentIndex] - - newMp := t.blocksMaps[curNode.localHubID] - newMp.Or(&parentNode.blocksBitmap) - curNode.blocksBitmap = newMp - if newMp.Weight() >= k { - return iterActionSkip - } - - return iterActionNone - }) - }) - } -} - -// 遍历树,找到至少拥有K个块的树节点的最大深度 -func (t *combinatorialTree) FindKBlocksMaxDepth(k int) int { - maxDepth := -1 - t.iterChildren(0, func(index, parentIndex, depth int) int { - if t.nodes[index].blocksBitmap.Weight() >= k { - if maxDepth < depth { - maxDepth = depth - } - return iterActionSkip - } - // 如果到了叶子节点,还没有找到K个块,那就认为要满足K个块,至少需要再多一个节点,即深度+1。 - // 由于遍历时采用的是深度优先的算法,因此遍历到这个叶子节点时,叶子节点再加一个节点的组合已经在前面搜索过, - // 所以用当前叶子节点深度+1来作为当前分支的结果就可以,即使当前情况下增加任意一个节点依然不够K块, - // 可以使用同样的思路去递推到当前叶子节点增加两个块的情况。 - if t.nodes[index].localHubID == len(t.stgIDToLocalStgID)-1 { - if maxDepth < depth+1 { - maxDepth = depth + 1 - } - } - - return iterActionNone - }) - - if maxDepth == -1 || maxDepth > len(t.stgIDToLocalStgID) { - return len(t.stgIDToLocalStgID) - } - - return maxDepth -} - -func (t *combinatorialTree) iterCombBits(width int, count int, offset int, callback func(int)) { - if count == 0 { - callback(offset) - return - } - - for b := width; b >= count; b-- { - t.iterCombBits(b-1, count-1, offset+(1<>= 1 - } -} - -func (t *combinatorialTree) itering(index int, parentIndex int, depth int, do func(index int, parentIndex int, depth int) int) int { - act := do(index, parentIndex, depth) - if act == iterActionBreak { - return act - } - if act == iterActionSkip { - return iterActionNone - } - - curNode := &t.nodes[index] - childIndex := index + 1 - - childCounts := len(t.stgIDToLocalStgID) - 1 - curNode.localHubID - if childCounts == 0 { - return iterActionNone - } - - childTreeNodeCnt := 1 << (childCounts - 1) - for c := 0; c < childCounts; c++ { - act = t.itering(childIndex, index, depth+1, do) - if act == iterActionBreak { - return act - } - - childIndex += childTreeNodeCnt - childTreeNodeCnt >>= 1 - } - - return iterActionNone -} - -type combinatorialTreeNode struct { - localHubID int - parent *combinatorialTreeNode - blocksBitmap bitmap.Bitmap64 // 选择了这个中心之后,所有中心一共包含多少种块 -} - -type annealingSolution struct { - blockList []objectBlock // 所有节点的块分布情况 - rmBlocks []bool // 要删除哪些块 - disasterTolerance float64 // 本方案的容灾度 - spaceCost float64 // 本方案的冗余度 - minAccessCost float64 // 本方案的最小访问费用 -} - -func (t *CleanPinned) startAnnealing(allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail, readerStgIDs []cdssdk.StorageID, object annealingObject) annealingSolution { - state := &annealingState{ - allStgInfos: allStgInfos, - readerStgIDs: readerStgIDs, - stgsSortedByReader: make(map[cdssdk.StorageID][]stgDist), - object: object, - stgBlockBitmaps: make(map[cdssdk.StorageID]*bitmap.Bitmap64), - } - - t.initBlockList(state) - if state.blockList == nil { - return annealingSolution{} - } - - t.initNodeBlockBitmap(state) - - t.sortNodeByReaderDistance(state) - - state.rmBlocks = make([]bool, len(state.blockList)) - state.inversedIndex = -1 - state.stgCombTree = newCombinatorialTree(state.stgBlockBitmaps) - - state.lastScore = t.calcScore(state) - state.maxScore = state.lastScore - state.maxScoreRmBlocks = lo2.ArrayClone(state.rmBlocks) - - // 模拟退火算法的温度 - curTemp := state.lastScore - // 结束温度 - finalTemp := curTemp * 0.2 - // 冷却率 - coolingRate := 0.95 - - for curTemp > finalTemp { - state.inversedIndex = rand.Intn(len(state.rmBlocks)) - block := state.blockList[state.inversedIndex] - state.rmBlocks[state.inversedIndex] = !state.rmBlocks[state.inversedIndex] - state.stgBlockBitmaps[block.StorageID].Set(block.Index, !state.rmBlocks[state.inversedIndex]) - state.stgCombTree.UpdateBitmap(block.StorageID, *state.stgBlockBitmaps[block.StorageID], state.object.minBlockCnt) - - curScore := t.calcScore(state) - - dScore := curScore - state.lastScore - // 如果新方案比旧方案得分低,且没有要求强制接受新方案,那么就将变化改回去 - if curScore == 0 || (dScore < 0 && !t.alwaysAccept(curTemp, dScore, coolingRate)) { - state.rmBlocks[state.inversedIndex] = !state.rmBlocks[state.inversedIndex] - state.stgBlockBitmaps[block.StorageID].Set(block.Index, !state.rmBlocks[state.inversedIndex]) - state.stgCombTree.UpdateBitmap(block.StorageID, *state.stgBlockBitmaps[block.StorageID], state.object.minBlockCnt) - // fmt.Printf("\n") - } else { - // fmt.Printf(" accept!\n") - state.lastScore = curScore - if state.maxScore < curScore { - state.maxScore = state.lastScore - state.maxScoreRmBlocks = lo2.ArrayClone(state.rmBlocks) - } - } - curTemp *= coolingRate - } - // fmt.Printf("final: %v\n", state.maxScoreRmBlocks) - return annealingSolution{ - blockList: state.blockList, - rmBlocks: state.maxScoreRmBlocks, - disasterTolerance: state.lastDisasterTolerance, - spaceCost: state.lastSpaceCost, - minAccessCost: state.lastMinAccessCost, - } -} - -func (t *CleanPinned) initBlockList(ctx *annealingState) { - blocksMap := make(map[cdssdk.StorageID][]objectBlock) - - // 先生成所有的影子块 - for _, pinned := range ctx.object.pinnedAt { - blocks := make([]objectBlock, 0, ctx.object.totalBlockCount) - for i := 0; i < ctx.object.totalBlockCount; i++ { - blocks = append(blocks, objectBlock{ - Index: i, - StorageID: pinned, - HasShadow: true, - }) - } - blocksMap[pinned] = blocks - } - - // 再填充实际块 - for _, b := range ctx.object.blocks { - blocks := blocksMap[b.StorageID] - - has := false - for i := range blocks { - if blocks[i].Index == b.Index { - blocks[i].HasEntity = true - blocks[i].FileHash = b.FileHash - has = true - break - } - } - - if has { - continue - } - - blocks = append(blocks, objectBlock{ - Index: b.Index, - StorageID: b.StorageID, - HasEntity: true, - FileHash: b.FileHash, - Size: b.Size, - }) - blocksMap[b.StorageID] = blocks - } - - var sortedBlocks []objectBlock - for _, bs := range blocksMap { - sortedBlocks = append(sortedBlocks, bs...) - } - sortedBlocks = sort2.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int { - d := left.StorageID - right.StorageID - if d != 0 { - return int(d) - } - - return left.Index - right.Index - }) - - ctx.blockList = sortedBlocks -} - -func (t *CleanPinned) initNodeBlockBitmap(state *annealingState) { - for _, b := range state.blockList { - mp, ok := state.stgBlockBitmaps[b.StorageID] - if !ok { - nb := bitmap.Bitmap64(0) - mp = &nb - state.stgBlockBitmaps[b.StorageID] = mp - } - mp.Set(b.Index, true) - } -} - -func (t *CleanPinned) sortNodeByReaderDistance(state *annealingState) { - for _, r := range state.readerStgIDs { - var nodeDists []stgDist - - for n := range state.stgBlockBitmaps { - if r == n { - // 同节点时距离视为0.1 - nodeDists = append(nodeDists, stgDist{ - StorageID: n, - Distance: consts.StorageDistanceSameStorage, - }) - } else if state.allStgInfos[r].MasterHub.LocationID == state.allStgInfos[n].MasterHub.LocationID { - // 同地区时距离视为1 - nodeDists = append(nodeDists, stgDist{ - StorageID: n, - Distance: consts.StorageDistanceSameLocation, - }) - } else { - // 不同地区时距离视为5 - nodeDists = append(nodeDists, stgDist{ - StorageID: n, - Distance: consts.StorageDistanceOther, - }) - } - } - - state.stgsSortedByReader[r] = sort2.Sort(nodeDists, func(left, right stgDist) int { return sort2.Cmp(left.Distance, right.Distance) }) - } -} - -func (t *CleanPinned) calcScore(state *annealingState) float64 { - dt := t.calcDisasterTolerance(state) - ac := t.calcMinAccessCost(state) - sc := t.calcSpaceCost(state) - - state.lastDisasterTolerance = dt - state.lastMinAccessCost = ac - state.lastSpaceCost = sc - - dtSc := 1.0 - if dt < 1 { - dtSc = 0 - } else if dt >= 2 { - dtSc = 1.5 - } - - newSc := 0.0 - if dt == 0 || ac == 0 { - newSc = 0 - } else { - newSc = dtSc / (sc * ac) - } - - // fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v \n", state.rmBlocks, newSc, dt, ac, sc) - return newSc -} - -// 计算容灾度 -func (t *CleanPinned) calcDisasterTolerance(state *annealingState) float64 { - if state.inversedIndex != -1 { - node := state.blockList[state.inversedIndex] - state.stgCombTree.UpdateBitmap(node.StorageID, *state.stgBlockBitmaps[node.StorageID], state.object.minBlockCnt) - } - return float64(len(state.stgBlockBitmaps) - state.stgCombTree.FindKBlocksMaxDepth(state.object.minBlockCnt)) -} - -// 计算最小访问数据的代价 -func (t *CleanPinned) calcMinAccessCost(state *annealingState) float64 { - cost := math.MaxFloat64 - for _, reader := range state.readerStgIDs { - tarNodes := state.stgsSortedByReader[reader] - gotBlocks := bitmap.Bitmap64(0) - thisCost := 0.0 - - for _, tar := range tarNodes { - tarNodeMp := state.stgBlockBitmaps[tar.StorageID] - - // 只需要从目的节点上获得缺少的块 - curWeigth := gotBlocks.Weight() - // 下面的if会在拿到k个块之后跳出循环,所以or多了块也没关系 - gotBlocks.Or(tarNodeMp) - // 但是算读取块的消耗时,不能多算,最多算读了k个块的消耗 - willGetBlocks := math2.Min(gotBlocks.Weight()-curWeigth, state.object.minBlockCnt-curWeigth) - thisCost += float64(willGetBlocks) * float64(tar.Distance) - - if gotBlocks.Weight() >= state.object.minBlockCnt { - break - } - } - if gotBlocks.Weight() >= state.object.minBlockCnt { - cost = math.Min(cost, thisCost) - } - } - - return cost -} - -// 计算冗余度 -func (t *CleanPinned) calcSpaceCost(ctx *annealingState) float64 { - blockCount := 0 - for i, b := range ctx.blockList { - if ctx.rmBlocks[i] { - continue - } - - if b.HasEntity { - blockCount++ - } - if b.HasShadow { - blockCount++ - } - } - // 所有算力中心上拥有的块的总数 / 一个对象被分成了几个块 - return float64(blockCount) / float64(ctx.object.minBlockCnt) -} - -// 如果新方案得分比旧方案小,那么在一定概率内也接受新方案 -func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate float64) bool { - v := math.Exp(dScore / curTemp / coolingRate) - // fmt.Printf(" -- chance: %v, temp: %v", v, curTemp) - return v > rand.Float64() -} - -func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail, solu annealingSolution, obj stgmod.ObjectDetail, planBld *exec.PlanBuilder, planningHubIDs map[cdssdk.StorageID]bool) coormq.UpdatingObjectRedundancy { - entry := coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: obj.Object.Redundancy, - } - - ft := ioswitch2.NewFromTo() - - fromStg := allStgInfos[obj.Blocks[0].StorageID] - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, *fromStg, ioswitch2.RawStream())) - - for i, f := range solu.rmBlocks { - hasCache := lo.ContainsBy(obj.Blocks, func(b stgmod.ObjectBlock) bool { return b.StorageID == solu.blockList[i].StorageID }) || - lo.ContainsBy(obj.PinnedAt, func(n cdssdk.StorageID) bool { return n == solu.blockList[i].StorageID }) - willRm := f - - if !willRm { - // 如果对象在退火后要保留副本的节点没有副本,则需要在这个节点创建副本 - if !hasCache { - toStg := allStgInfos[solu.blockList[i].StorageID] - ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, *toStg, ioswitch2.RawStream(), fmt.Sprintf("%d.0", obj.Object.ObjectID))) - - planningHubIDs[solu.blockList[i].StorageID] = true - } - entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: solu.blockList[i].Index, - StorageID: solu.blockList[i].StorageID, - FileHash: obj.Object.FileHash, - Size: solu.blockList[i].Size, - }) - } - } - - err := parser.Parse(ft, planBld) - if err != nil { - // TODO 错误处理 - } - - return entry -} - -func (t *CleanPinned) generateSysEventForRepObject(solu annealingSolution, obj stgmod.ObjectDetail) []stgmod.SysEventBody { - var blockChgs []stgmod.BlockChange - - for i, f := range solu.rmBlocks { - hasCache := lo.ContainsBy(obj.Blocks, func(b stgmod.ObjectBlock) bool { return b.StorageID == solu.blockList[i].StorageID }) || - lo.ContainsBy(obj.PinnedAt, func(n cdssdk.StorageID) bool { return n == solu.blockList[i].StorageID }) - willRm := f - - if !willRm { - // 如果对象在退火后要保留副本的节点没有副本,则需要在这个节点创建副本 - if !hasCache { - blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ - BlockType: stgmod.BlockTypeRaw, - SourceStorageID: obj.Blocks[0].StorageID, - TargetStorageID: solu.blockList[i].StorageID, - }) - } - } else { - blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ - Index: 0, - StorageID: solu.blockList[i].StorageID, - }) - } - } - - transEvt := &stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: blockChgs, - } - - var blockDist []stgmod.BlockDistributionObjectInfo - for i, f := range solu.rmBlocks { - if !f { - blockDist = append(blockDist, stgmod.BlockDistributionObjectInfo{ - BlockType: stgmod.BlockTypeRaw, - Index: 0, - StorageID: solu.blockList[i].StorageID, - }) - } - } - - distEvt := &stgmod.BodyBlockDistribution{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - Path: obj.Object.Path, - Size: obj.Object.Size, - FileHash: obj.Object.FileHash, - FaultTolerance: solu.disasterTolerance, - Redundancy: solu.spaceCost, - AvgAccessCost: 0, // TODO 计算平均访问代价,从日常访问数据中统计 - BlockDistribution: blockDist, - // TODO 不好计算传输量 - } - - return []stgmod.SysEventBody{transEvt, distEvt} -} - -func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail, solu annealingSolution, obj stgmod.ObjectDetail, planBld *exec.PlanBuilder, planningHubIDs map[cdssdk.StorageID]bool) coormq.UpdatingObjectRedundancy { - entry := coormq.UpdatingObjectRedundancy{ - ObjectID: obj.Object.ObjectID, - FileHash: obj.Object.FileHash, - Size: obj.Object.Size, - Redundancy: obj.Object.Redundancy, - } - - reconstrct := make(map[cdssdk.StorageID]*[]int) - for i, f := range solu.rmBlocks { - block := solu.blockList[i] - if !f { - entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ - ObjectID: obj.Object.ObjectID, - Index: block.Index, - StorageID: block.StorageID, - FileHash: block.FileHash, - Size: block.Size, - }) - - // 如果这个块是影子块,那么就要从完整对象里重建这个块 - if !block.HasEntity { - re, ok := reconstrct[block.StorageID] - if !ok { - re = &[]int{} - reconstrct[block.StorageID] = re - } - - *re = append(*re, block.Index) - } - } - } - - ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) - - for id, idxs := range reconstrct { - // 依次生成每个节点上的执行计划,因为如果放到一个计划里一起生成,不能保证每个节点上的块用的都是本节点上的副本 - ft := ioswitch2.NewFromTo() - ft.ECParam = ecRed - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *allStgInfos[id].MasterHub, *allStgInfos[id], ioswitch2.RawStream())) - - for _, i := range *idxs { - ft.AddTo(ioswitch2.NewToShardStore(*allStgInfos[id].MasterHub, *allStgInfos[id], ioswitch2.ECStream(i), fmt.Sprintf("%d.%d", obj.Object.ObjectID, i))) - } - - err := parser.Parse(ft, planBld) - if err != nil { - // TODO 错误处理 - continue - } - - planningHubIDs[id] = true - } - return entry -} - -func (t *CleanPinned) generateSysEventForECObject(solu annealingSolution, obj stgmod.ObjectDetail) []stgmod.SysEventBody { - var blockChgs []stgmod.BlockChange - - reconstrct := make(map[cdssdk.StorageID]*[]int) - for i, f := range solu.rmBlocks { - block := solu.blockList[i] - if !f { - // 如果这个块是影子块,那么就要从完整对象里重建这个块 - if !block.HasEntity { - re, ok := reconstrct[block.StorageID] - if !ok { - re = &[]int{} - reconstrct[block.StorageID] = re - } - - *re = append(*re, block.Index) - } - } else { - blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ - Index: block.Index, - StorageID: block.StorageID, - }) - } - } - - // 由于每一个需要被重建的块都是从同中心的副本里构建出来的,所以对于每一个中心都要产生一个BlockChangeEnDecode - for id, idxs := range reconstrct { - var tarBlocks []stgmod.Block - for _, idx := range *idxs { - tarBlocks = append(tarBlocks, stgmod.Block{ - BlockType: stgmod.BlockTypeEC, - Index: idx, - StorageID: id, - }) - } - blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{ - SourceBlocks: []stgmod.Block{{ - BlockType: stgmod.BlockTypeRaw, - Index: 0, - StorageID: id, // 影子块的原始对象就在同一个节点上 - }}, - TargetBlocks: tarBlocks, - // 传输量为0 - }) - } - - transEvt := &stgmod.BodyBlockTransfer{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - BlockChanges: blockChgs, - } - - var blockDist []stgmod.BlockDistributionObjectInfo - for i, f := range solu.rmBlocks { - if !f { - blockDist = append(blockDist, stgmod.BlockDistributionObjectInfo{ - BlockType: stgmod.BlockTypeEC, - Index: solu.blockList[i].Index, - StorageID: solu.blockList[i].StorageID, - }) - } - } - - distEvt := &stgmod.BodyBlockDistribution{ - ObjectID: obj.Object.ObjectID, - PackageID: obj.Object.PackageID, - Path: obj.Object.Path, - Size: obj.Object.Size, - FileHash: obj.Object.FileHash, - FaultTolerance: solu.disasterTolerance, - Redundancy: solu.spaceCost, - AvgAccessCost: 0, // TODO 计算平均访问代价,从日常访问数据中统计 - BlockDistribution: blockDist, - // TODO 不好计算传输量 - } - - return []stgmod.SysEventBody{transEvt, distEvt} -} - -func (t *CleanPinned) executePlans(ctx ExecuteContext, planBld *exec.PlanBuilder, planningStgIDs map[cdssdk.StorageID]bool) (map[string]exec.VarValue, error) { - // 统一加锁,有重复也没关系 - lockBld := reqbuilder.NewBuilder() - for id := range planningStgIDs { - lockBld.Shard().Buzy(id) - } - lock, err := lockBld.MutexLock(ctx.Args.DistLock) - if err != nil { - return nil, fmt.Errorf("acquiring distlock: %w", err) - } - defer lock.Unlock() - - wg := sync.WaitGroup{} - - // 执行IO计划 - var ioSwRets map[string]exec.VarValue - var ioSwErr error - wg.Add(1) - go func() { - defer wg.Done() - - execCtx := exec.NewExecContext() - exec.SetValueByType(execCtx, ctx.Args.StgMgr) - ret, err := planBld.Execute(execCtx).Wait(context.TODO()) - if err != nil { - ioSwErr = fmt.Errorf("executing io switch plan: %w", err) - return - } - ioSwRets = ret - }() - - wg.Wait() - - if ioSwErr != nil { - return nil, ioSwErr - } - - return ioSwRets, nil -} - -func (t *CleanPinned) populateECObjectEntry(entry *coormq.UpdatingObjectRedundancy, obj stgmod.ObjectDetail, ioRets map[string]exec.VarValue) { - for i := range entry.Blocks { - if entry.Blocks[i].FileHash != "" { - continue - } - - key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index) - // 不应该出现key不存在的情况 - r := ioRets[key].(*ops2.ShardInfoValue) - entry.Blocks[i].FileHash = r.Hash - entry.Blocks[i].Size = r.Size - } -} - -func init() { - RegisterMessageConvertor(NewCleanPinned) -} diff --git a/scanner/internal/tickevent/batch_check_package_redudancy.go b/scanner/internal/tickevent/batch_check_package_redudancy.go deleted file mode 100644 index 363173b..0000000 --- a/scanner/internal/tickevent/batch_check_package_redudancy.go +++ /dev/null @@ -1,52 +0,0 @@ -package tickevent - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - evt "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -const ( - CheckPackageBatchSize = 100 -) - -type BatchCheckPackageRedundancy struct { - lastCheckStart int -} - -func NewBatchCheckPackageRedundancy() *BatchCheckPackageRedundancy { - return &BatchCheckPackageRedundancy{} -} - -func (e *BatchCheckPackageRedundancy) Execute(ctx ExecuteContext) { - log := logger.WithType[BatchCheckPackageRedundancy]("TickEvent") - log.Debugf("begin") - defer log.Debugf("end") - - // TODO 更好的策略 - nowHour := time.Now().Hour() - if nowHour > 6 { - return - } - - packageIDs, err := ctx.Args.DB.Package().BatchGetAllPackageIDs(ctx.Args.DB.DefCtx(), e.lastCheckStart, CheckPackageBatchSize) - if err != nil { - log.Warnf("batch get package ids failed, err: %s", err.Error()) - return - } - - for _, id := range packageIDs { - ctx.Args.EventExecutor.Post(evt.NewCheckPackageRedundancy(event.NewCheckPackageRedundancy(id))) - } - - // 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来 - if len(packageIDs) < CheckPackageBatchSize { - e.lastCheckStart = 0 - log.Debugf("all package checked, next time will start check at offset 0") - - } else { - e.lastCheckStart += CheckPackageBatchSize - } -} diff --git a/scanner/internal/tickevent/batch_clean_pinned.go b/scanner/internal/tickevent/batch_clean_pinned.go deleted file mode 100644 index 00360e2..0000000 --- a/scanner/internal/tickevent/batch_clean_pinned.go +++ /dev/null @@ -1,48 +0,0 @@ -package tickevent - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - evt "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -type BatchCleanPinned struct { - lastCheckStart int -} - -func NewBatchCleanPinned() *BatchCleanPinned { - return &BatchCleanPinned{} -} - -func (e *BatchCleanPinned) Execute(ctx ExecuteContext) { - log := logger.WithType[BatchCleanPinned]("TickEvent") - log.Debugf("begin") - defer log.Debugf("end") - - // TODO 更好的策略 - nowHour := time.Now().Hour() - if nowHour > 6 { - return - } - - packageIDs, err := ctx.Args.DB.Package().BatchGetAllPackageIDs(ctx.Args.DB.DefCtx(), e.lastCheckStart, CheckPackageBatchSize) - if err != nil { - log.Warnf("batch get package ids failed, err: %s", err.Error()) - return - } - - for _, id := range packageIDs { - ctx.Args.EventExecutor.Post(evt.NewCleanPinned(event.NewCleanPinned(id))) - } - - // 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来 - if len(packageIDs) < CheckPackageBatchSize { - e.lastCheckStart = 0 - log.Debugf("all package clean pinned, next time will start check at offset 0") - - } else { - e.lastCheckStart += CheckPackageBatchSize - } -}