You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

mq.go 3.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package mq
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  7. "gitlink.org.cn/cloudream/storage/datamap/internal/config"
  8. "gitlink.org.cn/cloudream/storage/datamap/internal/models"
  9. "log"
  10. )
  11. func InitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) {
  12. conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/",
  13. cfg.User, cfg.Password, cfg.Host, cfg.Port))
  14. if err != nil {
  15. return nil, err
  16. }
  17. // 启动队列监听
  18. go listenQueues(conn)
  19. return conn, nil
  20. }
  21. func listenQueues(conn *amqp.Connection) {
  22. queues := []string{
  23. "datamap_storageinfo",
  24. "datamap_hubtransfer",
  25. "datamap_blocktransfer",
  26. "datamap_blockdistribution",
  27. }
  28. for _, queue := range queues {
  29. go func(q string) {
  30. ch, err := conn.Channel()
  31. if err != nil {
  32. log.Printf("Failed to open channel for queue %s: %v", q, err)
  33. return
  34. }
  35. defer ch.Close()
  36. msgs, err := ch.Consume(q, "", true, false, false, false, nil)
  37. if err != nil {
  38. log.Printf("Failed to register consumer for queue %s: %v", q, err)
  39. return
  40. }
  41. for msg := range msgs {
  42. processMessage(q, msg.Body)
  43. }
  44. }(queue)
  45. }
  46. }
  47. func processMessage(queue string, body []byte) {
  48. switch queue {
  49. case "datamap_hubinfo":
  50. var data stgmod.HubInfo
  51. if err := json.Unmarshal(body, &data); err != nil {
  52. log.Printf("Failed to unmarshal HubInfo: %v, body: %s", err, body)
  53. return
  54. }
  55. models.ProcessHubInfo(data)
  56. case "datamap_storageinfo":
  57. var data stgmod.StorageInfo
  58. if err := json.Unmarshal(body, &data); err != nil {
  59. log.Printf("Failed to unmarshal StorageInfo: %v, body: %s", err, body)
  60. return
  61. }
  62. //models.ProcessStorageInfo(data)
  63. case "datamap_storagestats":
  64. var data stgmod.StorageStats
  65. if err := json.Unmarshal(body, &data); err != nil {
  66. log.Printf("Failed to unmarshal StorageStats: %v, body: %s", err, body)
  67. return
  68. }
  69. //models.ProcessStorageInfo(data)
  70. case "datamap_hubtransferstats":
  71. var data stgmod.HubTransferStats
  72. err := json.Unmarshal(body, &data)
  73. if err != nil {
  74. log.Printf("Failed to unmarshal HubTransferStats: %v, body: %s", err, body)
  75. return
  76. }
  77. models.ProcessHubTransfer(data)
  78. case "datamap_hubstoragetransferstats":
  79. var data stgmod.HubStorageTransferStats
  80. err := json.Unmarshal(body, &data)
  81. if err != nil {
  82. log.Printf("Failed to unmarshal HubStorageTransferStats: %v, body: %s", err, body)
  83. return
  84. }
  85. //models.ProcessHubTransfer(data)
  86. case "datamap_blocktransfer":
  87. var data stgmod.BlockTransfer
  88. err := json.Unmarshal(body, &data)
  89. if err != nil {
  90. log.Printf("Failed to unmarshal BlockTransfer: %v, body: %s", err, body)
  91. return
  92. }
  93. models.ProcessBlockTransfer(data)
  94. case "datamap_blockdistribution":
  95. var data stgmod.BlockDistribution
  96. err := json.Unmarshal(body, &data)
  97. if err != nil {
  98. log.Printf("Failed to unmarshal BlockDistribution: %v, body: %s", err, body)
  99. return
  100. }
  101. models.ProcessBlockDistribution(data)
  102. default:
  103. log.Printf("Unknown queue: %s", queue)
  104. }
  105. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。