You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_check_storage.go 3.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package event
  2. import (
  3. "database/sql"
  4. "github.com/samber/lo"
  5. "gitlink.org.cn/cloudream/common/consts"
  6. "gitlink.org.cn/cloudream/common/utils/logger"
  7. "gitlink.org.cn/cloudream/db/model"
  8. mysql "gitlink.org.cn/cloudream/db/sql"
  9. agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
  10. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  11. agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
  12. "gitlink.org.cn/cloudream/scanner/internal/config"
  13. )
  14. type AgentCheckStorage struct {
  15. StorageID int
  16. ObjectIDs []int // 需要检查的Object文件列表,如果为nil(不是为空),则代表进行全量检查
  17. }
  18. func NewAgentCheckStorage(storageID int, objectIDs []int) *AgentCheckStorage {
  19. return &AgentCheckStorage{
  20. StorageID: storageID,
  21. ObjectIDs: objectIDs,
  22. }
  23. }
  24. func (t *AgentCheckStorage) TryMerge(other Event) bool {
  25. event, ok := other.(*AgentCheckStorage)
  26. if !ok {
  27. return false
  28. }
  29. if t.StorageID != event.StorageID {
  30. return false
  31. }
  32. // ObjectIDs为nil时代表全量检查
  33. if event.ObjectIDs == nil {
  34. t.ObjectIDs = nil
  35. } else if t.ObjectIDs != nil {
  36. t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs)
  37. }
  38. return true
  39. }
  40. func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
  41. stg, err := mysql.Storage.GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID)
  42. if err != nil {
  43. if err != sql.ErrNoRows {
  44. logger.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error())
  45. }
  46. return
  47. }
  48. node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID)
  49. if err != nil {
  50. if err != sql.ErrNoRows {
  51. logger.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error())
  52. }
  53. return
  54. }
  55. // TODO unavailable的节点需不需要发送任务?
  56. if node.State != consts.NODE_STATE_NORMAL {
  57. return
  58. }
  59. // 获取对象信息
  60. var isComplete bool
  61. var objects []model.StorageObject
  62. if t.ObjectIDs == nil {
  63. var err error
  64. objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID)
  65. if err != nil {
  66. logger.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error())
  67. return
  68. }
  69. isComplete = true
  70. } else {
  71. for _, objID := range t.ObjectIDs {
  72. obj, err := mysql.StorageObject.Get(execCtx.Args.DB.SQLCtx(), t.StorageID, objID)
  73. if err == sql.ErrNoRows {
  74. continue
  75. }
  76. if err != nil {
  77. logger.WithField("StorageID", t.StorageID).
  78. WithField("ObjectID", objID).
  79. Warnf("get storage object failed, err: %s", err.Error())
  80. return
  81. }
  82. objects = append(objects, obj)
  83. }
  84. isComplete = false
  85. }
  86. // 投递任务
  87. agentClient, err := agtcli.NewAgentClient(stg.NodeID, &config.Cfg().RabbitMQ)
  88. if err != nil {
  89. logger.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error())
  90. return
  91. }
  92. defer agentClient.Close()
  93. err = agentClient.PostEvent(agtmsg.NewPostEventBody(
  94. agtevt.NewCheckStorage(stg.Directory, isComplete, objects),
  95. execCtx.Option.IsEmergency, // 继承本任务的执行选项
  96. execCtx.Option.DontMerge))
  97. if err != nil {
  98. logger.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", stg.NodeID, err.Error())
  99. }
  100. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。