You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_check_storage.go 3.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package event
  2. import (
  3. "database/sql"
  4. "github.com/samber/lo"
  5. "gitlink.org.cn/cloudream/common/consts"
  6. "gitlink.org.cn/cloudream/common/utils/logger"
  7. "gitlink.org.cn/cloudream/db/model"
  8. mysql "gitlink.org.cn/cloudream/db/sql"
  9. agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
  10. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  11. agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
  12. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  13. "gitlink.org.cn/cloudream/scanner/internal/config"
  14. )
  15. type AgentCheckStorage struct {
  16. scevt.AgentCheckStorage
  17. }
  18. func NewAgentCheckStorage(storageID int, objectIDs []int) *AgentCheckStorage {
  19. return &AgentCheckStorage{
  20. AgentCheckStorage: scevt.NewAgentCheckStorage(storageID, objectIDs),
  21. }
  22. }
  23. func (t *AgentCheckStorage) TryMerge(other Event) bool {
  24. event, ok := other.(*AgentCheckStorage)
  25. if !ok {
  26. return false
  27. }
  28. if t.StorageID != event.StorageID {
  29. return false
  30. }
  31. // ObjectIDs为nil时代表全量检查
  32. if event.ObjectIDs == nil {
  33. t.ObjectIDs = nil
  34. } else if t.ObjectIDs != nil {
  35. t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs)
  36. }
  37. return true
  38. }
  39. func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
  40. stg, err := mysql.Storage.GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID)
  41. if err != nil {
  42. if err != sql.ErrNoRows {
  43. logger.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error())
  44. }
  45. return
  46. }
  47. node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID)
  48. if err != nil {
  49. if err != sql.ErrNoRows {
  50. logger.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error())
  51. }
  52. return
  53. }
  54. // TODO unavailable的节点需不需要发送任务?
  55. if node.State != consts.NODE_STATE_NORMAL {
  56. return
  57. }
  58. // 获取对象信息
  59. var isComplete bool
  60. var objects []model.StorageObject
  61. if t.ObjectIDs == nil {
  62. var err error
  63. objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID)
  64. if err != nil {
  65. logger.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error())
  66. return
  67. }
  68. isComplete = true
  69. } else {
  70. for _, objID := range t.ObjectIDs {
  71. objs, err := mysql.StorageObject.GetAllByStorageAndObjectID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID)
  72. if err != nil {
  73. logger.WithField("StorageID", t.StorageID).
  74. WithField("ObjectID", objID).
  75. Warnf("get storage object failed, err: %s", err.Error())
  76. return
  77. }
  78. objects = append(objects, objs...)
  79. }
  80. isComplete = false
  81. }
  82. // 投递任务
  83. agentClient, err := agtcli.NewAgentClient(stg.NodeID, &config.Cfg().RabbitMQ)
  84. if err != nil {
  85. logger.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error())
  86. return
  87. }
  88. defer agentClient.Close()
  89. err = agentClient.PostEvent(agtmsg.NewPostEventBody(
  90. agtevt.NewCheckStorage(stg.Directory, isComplete, objects),
  91. execCtx.Option.IsEmergency, // 继承本任务的执行选项
  92. execCtx.Option.DontMerge))
  93. if err != nil {
  94. logger.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", stg.NodeID, err.Error())
  95. }
  96. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。