From acd01c5d0ce11c3190d6264de175afc62cc993d8 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 19 May 2023 16:04:28 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96CheckCache=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/task/agent_check_cache.go | 110 ++++++++++------------------- 1 file changed, 37 insertions(+), 73 deletions(-) diff --git a/internal/task/agent_check_cache.go b/internal/task/agent_check_cache.go index a80e096..7574511 100644 --- a/internal/task/agent_check_cache.go +++ b/internal/task/agent_check_cache.go @@ -2,119 +2,84 @@ package task import ( "database/sql" - "fmt" - "github.com/jmoiron/sqlx" "github.com/samber/lo" "gitlink.org.cn/cloudream/db/model" mysql "gitlink.org.cn/cloudream/db/sql" "gitlink.org.cn/cloudream/scanner/internal/config" - log "gitlink.org.cn/cloudream/utils/logger" + "gitlink.org.cn/cloudream/utils/logger" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task" ) -type AgentCheckCacheTaskEntry struct { +type AgentCheckCacheTask struct { NodeID int FileHashes []string // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查 } -func NewAgentCheckCacheTaskEntry(nodeID int, fileHashes []string) AgentCheckCacheTaskEntry { - return AgentCheckCacheTaskEntry{ +func NewAgentCheckCacheTask(nodeID int, fileHashes []string) *AgentCheckCacheTask { + return &AgentCheckCacheTask{ NodeID: nodeID, FileHashes: fileHashes, } } -type AgentCheckCacheTask struct { - Entries []AgentCheckCacheTaskEntry -} - -func NewAgentCheckCacheTask(entries []AgentCheckCacheTaskEntry) *AgentCheckCacheTask { - return &AgentCheckCacheTask{ - Entries: entries, - } -} - func (t *AgentCheckCacheTask) TryMerge(other Task) bool { - chkTask, ok := other.(*AgentCheckCacheTask) + task, ok := other.(*AgentCheckCacheTask) if !ok { return false } - for _, entry := range chkTask.Entries { - _, index, ok := lo.FindIndexOf(t.Entries, func(e AgentCheckCacheTaskEntry) bool { return e.NodeID == entry.NodeID }) - if ok { - myEntry := &t.Entries[index] - - // FileHashes为nil时代表全量检查 - if entry.FileHashes == nil { - myEntry.FileHashes = nil - } else if myEntry.FileHashes != nil { - myEntry.FileHashes = lo.Union(myEntry.FileHashes, entry.FileHashes) - } - - } else { - t.Entries = append(t.Entries, entry) - } + // FileHashes为nil时代表全量检查 + if task.FileHashes == nil { + t.FileHashes = nil + } else if t.FileHashes != nil { + t.FileHashes = lo.Union(t.FileHashes, task.FileHashes) } return true } func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { - for _, entry := range t.Entries { - err := t.checkOneAgentCache(entry, execCtx, execOpts) - if err != nil { - log.Warnf("let agent check cache failed, err: %s", err.Error()) - continue - } - } -} - -func (t *AgentCheckCacheTask) checkOneAgentCache(entry AgentCheckCacheTaskEntry, execCtx *ExecuteContext, execOpts ExecuteOption) error { var isComplete bool var caches []model.Cache - err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { - // TODO unavailable的节点需不需要发送任务? + // TODO unavailable的节点需不需要发送任务? - if entry.FileHashes == nil { - var err error - caches, err = mysql.Cache.GetNodeCaches(tx, entry.NodeID) - if err != nil { - return fmt.Errorf("get node caches failed, err: %w", err) + if t.FileHashes == nil { + var err error + caches, err = mysql.Cache.GetNodeCaches(execCtx.DB.SQLCtx(), t.NodeID) + if err != nil { + logger.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) + return + } + isComplete = true + + } else { + for _, hash := range t.FileHashes { + ch, err := mysql.Cache.Get(execCtx.DB.SQLCtx(), hash, t.NodeID) + // 记录不存在则跳过 + if err == sql.ErrNoRows { + continue } - isComplete = true - } else { - for _, hash := range entry.FileHashes { - ch, err := mysql.Cache.Get(tx, hash, entry.NodeID) - // 记录不存在则跳过 - if err == sql.ErrNoRows { - continue - } - - if err != nil { - return fmt.Errorf("get cache failed, err: %w", err) - } - - caches = append(caches, ch) + if err != nil { + logger.WithField("FileHash", hash).WithField("NodeID", t.NodeID).Warnf("get cache failed, err: %w", err) + return } - isComplete = false + + caches = append(caches, ch) } - return nil - }) - if err != nil { - return err + isComplete = false } // 然后向代理端发送移动文件的请求 - agentClient, err := agtcli.NewAgentClient(entry.NodeID, &config.Cfg().RabbitMQ) + agentClient, err := agtcli.NewAgentClient(t.NodeID, &config.Cfg().RabbitMQ) if err != nil { - return fmt.Errorf("create agent client to %d failed, err: %w", entry.NodeID, err) + logger.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) + return } defer agentClient.Close() @@ -123,8 +88,7 @@ func (t *AgentCheckCacheTask) checkOneAgentCache(entry AgentCheckCacheTaskEntry, execOpts.IsEmergency, // 继承本任务的执行选项 execOpts.DontMerge)) if err != nil { - return fmt.Errorf("request to agent %d failed, err: %w", entry.NodeID, err) + logger.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) + return } - - return nil }