package ticktock import ( "fmt" "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) // CheckShardStore 代表一个用于处理代理缓存检查事件的结构体 type CheckShardStore struct { } func (j *CheckShardStore) Name() string { return reflect2.TypeNameOf[CheckShardStore]() } // Execute 执行缓存检查操作,对比本地缓存与代理返回的缓存信息,更新数据库中的缓存记录 func (j *CheckShardStore) Execute(t *TickTock) { log := logger.WithType[CheckShardStore]("TickTock") startTime := time.Now() log.Infof("job start") defer func() { log.Infof("job end, time: %v", time.Since(startTime)) }() db2 := t.db spaceIDs, err := db2.UserSpace().GetAllIDs(db2.DefCtx()) if err != nil { log.Warnf("getting all user space ids: %s", err.Error()) return } for _, spaceID := range spaceIDs { detail := t.spaceMeta.Get(spaceID) if detail == nil { continue } err := j.checkOne(t, detail) if err != nil { log.Warnf("checking user space %v: %v", detail.String(), err) continue } } } func (j *CheckShardStore) checkOne(t *TickTock, space *jcstypes.UserSpaceDetail) error { // addr, ok := space.RecommendHub.Address.(*jcstypes.GRPCAddressInfo) // if !ok { // return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) // } // agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&space.RecommendHub, addr)) // defer agtCli.Release() // ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) // defer cancel() // checkResp, cerr := agtCli.CheckCache(ctx, &hubrpc.CheckCache{ // UserSpace: *space, // }) // if cerr != nil { // return fmt.Errorf("request to check cache: %w", cerr.ToError()) // } store, err := t.stgPool.GetShardStore(space) if err != nil { return fmt.Errorf("getting shard store: %w", err) } infos, err := store.ListAll() if err != nil { return fmt.Errorf("listing all files: %w", err) } fileHashes := lo.Map(infos, func(info stgtypes.FileInfo, _ int) jcstypes.FileHash { return info.Hash }) realFileHashes := lo.SliceToMap(fileHashes, func(hash jcstypes.FileHash) (jcstypes.FileHash, bool) { return hash, true }) // 在事务中执行缓存更新操作 t.db.DoTx(func(tx db.SQLContext) error { j.checkPinnedObject(t, tx, space, realFileHashes) j.checkObjectBlock(t, tx, space, realFileHashes) return nil }) return nil } // checkPinnedObject 对比PinnedObject表,若实际文件不存在,则进行删除操作 func (*CheckShardStore) checkPinnedObject(t *TickTock, tx db.SQLContext, space *jcstypes.UserSpaceDetail, realFileHashes map[jcstypes.FileHash]bool) { log := logger.WithType[CheckShardStore]("TickTock") objs, err := t.db.PinnedObject().GetObjectsByUserSpaceID(tx, space.UserSpace.UserSpaceID) if err != nil { log.Warnf("getting pinned objects by user space id %v: %v", space.UserSpace, err) return } var rms []jcstypes.ObjectID for _, c := range objs { if realFileHashes[c.FileHash] { continue } rms = append(rms, c.ObjectID) } if len(rms) > 0 { err = t.db.PinnedObject().BatchDelete(tx, space.UserSpace.UserSpaceID, rms) if err != nil { log.Warnf("batch delete user space %v pinned objects: %v", space.UserSpace, err) } } } // checkObjectBlock 对比ObjectBlock表,若实际文件不存在,则进行删除操作 func (*CheckShardStore) checkObjectBlock(t *TickTock, tx db.SQLContext, space *jcstypes.UserSpaceDetail, realFileHashes map[jcstypes.FileHash]bool) { log := logger.WithType[CheckShardStore]("TickTock") blocks, err := t.db.ObjectBlock().GetByUserSpaceID(tx, space.UserSpace.UserSpaceID) if err != nil { log.Warnf("getting object blocks by user space id %v: %v", space.UserSpace, err) return } var rms []jcstypes.FileHash for _, b := range blocks { if realFileHashes[b.FileHash] { continue } rms = append(rms, b.FileHash) } if len(rms) > 0 { err = t.db.ObjectBlock().BatchDeleteByFileHash(tx, space.UserSpace.UserSpaceID, rms) if err != nil { log.Warnf("batch delete user space %v object blocks: %v", space.UserSpace, err) } } }