Browse Source

简化CheckCache任务的逻辑

gitlink
Sydonian 2 years ago
parent
commit
acd01c5d0c
1 changed files with 37 additions and 73 deletions
  1. +37
    -73
      internal/task/agent_check_cache.go

+ 37
- 73
internal/task/agent_check_cache.go View File

@@ -2,119 +2,84 @@ package task


import ( import (
"database/sql" "database/sql"
"fmt"


"github.com/jmoiron/sqlx"
"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/db/model" "gitlink.org.cn/cloudream/db/model"
mysql "gitlink.org.cn/cloudream/db/sql" mysql "gitlink.org.cn/cloudream/db/sql"
"gitlink.org.cn/cloudream/scanner/internal/config" "gitlink.org.cn/cloudream/scanner/internal/config"
log "gitlink.org.cn/cloudream/utils/logger"
"gitlink.org.cn/cloudream/utils/logger"


agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task" agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task"
) )


type AgentCheckCacheTaskEntry struct {
type AgentCheckCacheTask struct {
NodeID int NodeID int
FileHashes []string // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查 FileHashes []string // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查
} }


func NewAgentCheckCacheTaskEntry(nodeID int, fileHashes []string) AgentCheckCacheTaskEntry {
return AgentCheckCacheTaskEntry{
func NewAgentCheckCacheTask(nodeID int, fileHashes []string) *AgentCheckCacheTask {
return &AgentCheckCacheTask{
NodeID: nodeID, NodeID: nodeID,
FileHashes: fileHashes, FileHashes: fileHashes,
} }
} }


type AgentCheckCacheTask struct {
Entries []AgentCheckCacheTaskEntry
}

func NewAgentCheckCacheTask(entries []AgentCheckCacheTaskEntry) *AgentCheckCacheTask {
return &AgentCheckCacheTask{
Entries: entries,
}
}

func (t *AgentCheckCacheTask) TryMerge(other Task) bool { func (t *AgentCheckCacheTask) TryMerge(other Task) bool {
chkTask, ok := other.(*AgentCheckCacheTask)
task, ok := other.(*AgentCheckCacheTask)
if !ok { if !ok {
return false return false
} }


for _, entry := range chkTask.Entries {
_, index, ok := lo.FindIndexOf(t.Entries, func(e AgentCheckCacheTaskEntry) bool { return e.NodeID == entry.NodeID })
if ok {
myEntry := &t.Entries[index]

// FileHashes为nil时代表全量检查
if entry.FileHashes == nil {
myEntry.FileHashes = nil
} else if myEntry.FileHashes != nil {
myEntry.FileHashes = lo.Union(myEntry.FileHashes, entry.FileHashes)
}

} else {
t.Entries = append(t.Entries, entry)
}
// FileHashes为nil时代表全量检查
if task.FileHashes == nil {
t.FileHashes = nil
} else if t.FileHashes != nil {
t.FileHashes = lo.Union(t.FileHashes, task.FileHashes)
} }


return true return true
} }


func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
for _, entry := range t.Entries {
err := t.checkOneAgentCache(entry, execCtx, execOpts)
if err != nil {
log.Warnf("let agent check cache failed, err: %s", err.Error())
continue
}
}
}

func (t *AgentCheckCacheTask) checkOneAgentCache(entry AgentCheckCacheTaskEntry, execCtx *ExecuteContext, execOpts ExecuteOption) error {
var isComplete bool var isComplete bool
var caches []model.Cache var caches []model.Cache


err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
// TODO unavailable的节点需不需要发送任务?
// TODO unavailable的节点需不需要发送任务?


if entry.FileHashes == nil {
var err error
caches, err = mysql.Cache.GetNodeCaches(tx, entry.NodeID)
if err != nil {
return fmt.Errorf("get node caches failed, err: %w", err)
if t.FileHashes == nil {
var err error
caches, err = mysql.Cache.GetNodeCaches(execCtx.DB.SQLCtx(), t.NodeID)
if err != nil {
logger.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error())
return
}
isComplete = true

} else {
for _, hash := range t.FileHashes {
ch, err := mysql.Cache.Get(execCtx.DB.SQLCtx(), hash, t.NodeID)
// 记录不存在则跳过
if err == sql.ErrNoRows {
continue
} }
isComplete = true


} else {
for _, hash := range entry.FileHashes {
ch, err := mysql.Cache.Get(tx, hash, entry.NodeID)
// 记录不存在则跳过
if err == sql.ErrNoRows {
continue
}

if err != nil {
return fmt.Errorf("get cache failed, err: %w", err)
}

caches = append(caches, ch)
if err != nil {
logger.WithField("FileHash", hash).WithField("NodeID", t.NodeID).Warnf("get cache failed, err: %w", err)
return
} }
isComplete = false

caches = append(caches, ch)
} }
return nil
})
if err != nil {
return err
isComplete = false
} }


// 然后向代理端发送移动文件的请求 // 然后向代理端发送移动文件的请求
agentClient, err := agtcli.NewAgentClient(entry.NodeID, &config.Cfg().RabbitMQ)
agentClient, err := agtcli.NewAgentClient(t.NodeID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", entry.NodeID, err)
logger.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error())
return
} }
defer agentClient.Close() defer agentClient.Close()


@@ -123,8 +88,7 @@ func (t *AgentCheckCacheTask) checkOneAgentCache(entry AgentCheckCacheTaskEntry,
execOpts.IsEmergency, // 继承本任务的执行选项 execOpts.IsEmergency, // 继承本任务的执行选项
execOpts.DontMerge)) execOpts.DontMerge))
if err != nil { if err != nil {
return fmt.Errorf("request to agent %d failed, err: %w", entry.NodeID, err)
logger.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error())
return
} }

return nil
} }

Loading…
Cancel
Save