You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_check_cache.go 3.4 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package task
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/db/model"
  8. mysql "gitlink.org.cn/cloudream/db/sql"
  9. "gitlink.org.cn/cloudream/scanner/internal/config"
  10. log "gitlink.org.cn/cloudream/utils/logger"
  11. agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
  12. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  13. agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task"
  14. )
  15. type AgentCheckCacheTaskEntry struct {
  16. NodeID int
  17. FileHashes []string // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查
  18. }
  19. func NewAgentCheckCacheTaskEntry(nodeID int, fileHashes []string) AgentCheckCacheTaskEntry {
  20. return AgentCheckCacheTaskEntry{
  21. NodeID: nodeID,
  22. FileHashes: fileHashes,
  23. }
  24. }
  25. type AgentCheckCacheTask struct {
  26. Entries []AgentCheckCacheTaskEntry
  27. }
  28. func NewAgentCheckCacheTask(entries []AgentCheckCacheTaskEntry) *AgentCheckCacheTask {
  29. return &AgentCheckCacheTask{
  30. Entries: entries,
  31. }
  32. }
  33. func (t *AgentCheckCacheTask) TryMerge(other Task) bool {
  34. chkTask, ok := other.(*AgentCheckCacheTask)
  35. if !ok {
  36. return false
  37. }
  38. for _, entry := range chkTask.Entries {
  39. _, index, ok := lo.FindIndexOf(t.Entries, func(e AgentCheckCacheTaskEntry) bool { return e.NodeID == entry.NodeID })
  40. if ok {
  41. myEntry := &t.Entries[index]
  42. // FileHashes为nil时代表全量检查
  43. if entry.FileHashes == nil {
  44. myEntry.FileHashes = nil
  45. } else if myEntry.FileHashes != nil {
  46. myEntry.FileHashes = lo.Union(myEntry.FileHashes, entry.FileHashes)
  47. }
  48. } else {
  49. t.Entries = append(t.Entries, entry)
  50. }
  51. }
  52. return true
  53. }
  54. func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
  55. for _, entry := range t.Entries {
  56. err := t.checkOneAgentCache(entry, execCtx, execOpts)
  57. if err != nil {
  58. log.Warnf("let agent check cache failed, err: %s", err.Error())
  59. continue
  60. }
  61. }
  62. }
  63. func (t *AgentCheckCacheTask) checkOneAgentCache(entry AgentCheckCacheTaskEntry, execCtx *ExecuteContext, execOpts ExecuteOption) error {
  64. var isComplete bool
  65. var caches []model.Cache
  66. err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  67. // TODO unavailable的节点需不需要发送任务?
  68. if entry.FileHashes == nil {
  69. var err error
  70. caches, err = mysql.Cache.GetNodeCaches(tx, entry.NodeID)
  71. if err != nil {
  72. return fmt.Errorf("get node caches failed, err: %w", err)
  73. }
  74. isComplete = true
  75. } else {
  76. for _, hash := range entry.FileHashes {
  77. ch, err := mysql.Cache.Get(tx, hash, entry.NodeID)
  78. // 记录不存在则跳过
  79. if err == sql.ErrNoRows {
  80. continue
  81. }
  82. if err != nil {
  83. return fmt.Errorf("get cache failed, err: %w", err)
  84. }
  85. caches = append(caches, ch)
  86. }
  87. isComplete = false
  88. }
  89. return nil
  90. })
  91. if err != nil {
  92. return err
  93. }
  94. // 然后向代理端发送移动文件的请求
  95. agentClient, err := agtcli.NewAgentClient(entry.NodeID, &config.Cfg().RabbitMQ)
  96. if err != nil {
  97. return fmt.Errorf("create agent client to %d failed, err: %w", entry.NodeID, err)
  98. }
  99. defer agentClient.Close()
  100. err = agentClient.PostTask(agtmsg.NewPostTaskBody(
  101. agttsk.NewCheckCacheTask(isComplete, caches),
  102. execOpts.IsEmergency, // 继承本任务的执行选项
  103. execOpts.DontMerge))
  104. if err != nil {
  105. return fmt.Errorf("request to agent %d failed, err: %w", entry.NodeID, err)
  106. }
  107. return nil
  108. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。