package event import ( "database/sql" "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) type AgentCheckCache struct { *scevt.AgentCheckCache } func NewAgentCheckCache(nodeID int64, fileHashes []string) *AgentCheckCache { return &AgentCheckCache{ AgentCheckCache: scevt.NewAgentCheckCache(nodeID, fileHashes), } } func (t *AgentCheckCache) TryMerge(other Event) bool { event, ok := other.(*AgentCheckCache) if !ok { return false } if event.NodeID != t.NodeID { return false } // FileHashes为nil时代表全量检查 if event.FileHashes == nil { t.FileHashes = nil } else if t.FileHashes != nil { t.FileHashes = lo.Union(t.FileHashes, event.FileHashes) } return true } func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { log := logger.WithType[AgentCheckCache]("Event") log.Debugf("begin with %v", logger.FormatStruct(t.AgentCheckCache)) defer log.Debugf("end") // TODO unavailable的节点需不需要发送任务? if t.FileHashes == nil { t.checkComplete(execCtx) } else { t.checkIncrement(execCtx) } } func (t *AgentCheckCache) checkComplete(execCtx ExecuteContext) { log := logger.WithType[AgentCheckCache]("Event") mutex, err := reqbuilder.NewBuilder(). Metadata(). // 全量模式下修改某个节点所有的Cache记录 Cache().WriteAny(). IPFS(). // 全量模式下修改某个节点所有的副本数据 WriteAnyRep(t.NodeID). MutexLock(execCtx.Args.DistLock) if err != nil { log.Warnf("acquire locks failed, err: %s", err.Error()) return } defer mutex.Unlock() caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) return } t.startCheck(execCtx, true, caches) } func (t *AgentCheckCache) checkIncrement(execCtx ExecuteContext) { log := logger.WithType[AgentCheckCache]("Event") builder := reqbuilder.NewBuilder() for _, hash := range t.FileHashes { builder. // 增量模式下,不会有改动到Cache记录的操作 Metadata().Cache().ReadOne(t.NodeID, hash). // 由于副本Write锁的特点,Pin文件(创建文件)不需要Create锁 IPFS().WriteOneRep(t.NodeID, hash) } mutex, err := builder.MutexLock(execCtx.Args.DistLock) if err != nil { log.Warnf("acquire locks failed, err: %s", err.Error()) return } defer mutex.Unlock() var caches []model.Cache for _, hash := range t.FileHashes { ch, err := execCtx.Args.DB.Cache().Get(execCtx.Args.DB.SQLCtx(), hash, t.NodeID) // 记录不存在则跳过 if err == sql.ErrNoRows { continue } if err != nil { log.WithField("FileHash", hash).WithField("NodeID", t.NodeID).Warnf("get cache failed, err: %s", err.Error()) return } caches = append(caches, ch) } t.startCheck(execCtx, false, caches) } func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, caches []model.Cache) { log := logger.WithType[AgentCheckCache]("Event") // 然后向代理端发送移动文件的请求 agtCli, err := stgglb.AgentMQPool.Acquire(t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) return } defer stgglb.AgentMQPool.Release(agtCli) checkResp, err := agtCli.CheckCache(agtmq.NewCheckCache(isComplete, caches), mq.RequestOption{Timeout: time.Minute}) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error()) return } // 根据返回结果修改数据库 for _, entry := range checkResp.Entries { switch entry.Operation { case agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP: err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) if err != nil { log.WithField("FileHash", entry.FileHash). WithField("NodeID", t.NodeID). Warnf("delete temp cache failed, err: %s", err.Error()) } log.WithField("FileHash", entry.FileHash). WithField("NodeID", t.NodeID). Debugf("delete temp cache") case agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP: err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) if err != nil { log.WithField("FileHash", entry.FileHash). WithField("NodeID", t.NodeID). Warnf("create temp cache failed, err: %s", err.Error()) } log.WithField("FileHash", entry.FileHash). WithField("NodeID", t.NodeID). Debugf("create temp cache") } } } func init() { RegisterMessageConvertor(func(msg *scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) }) }