You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 4.1 kB

2 years ago
11 months ago
2 years ago
2 years ago
11 months ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. "gitlink.org.cn/cloudream/common/consts/errorcode"
  7. "gitlink.org.cn/cloudream/common/pkgs/logger"
  8. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  9. "gorm.io/gorm"
  10. "gitlink.org.cn/cloudream/common/pkgs/mq"
  11. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/db2"
  13. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  14. )
  15. func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) {
  16. stg, err := svc.db2.Storage().GetUserStorage(svc.db2.DefCtx(), msg.UserID, msg.StorageID)
  17. if err != nil {
  18. logger.Warnf("getting user storage: %s", err.Error())
  19. return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed")
  20. }
  21. return mq.ReplyOK(coormq.RespGetStorage(stg))
  22. }
  23. func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.GetStorageDetailsResp, *mq.CodeMessage) {
  24. stgsMp := make(map[cdssdk.StorageID]*stgmod.StorageDetail)
  25. svc.db2.DoTx(func(tx db2.SQLContext) error {
  26. stgs, err := svc.db2.Storage().BatchGetByID(tx, msg.StorageIDs)
  27. if err != nil && err != gorm.ErrRecordNotFound {
  28. return fmt.Errorf("getting storage: %w", err)
  29. }
  30. details := make([]stgmod.StorageDetail, len(stgs))
  31. for i, stg := range stgs {
  32. details[i] = stgmod.StorageDetail{
  33. Storage: stg,
  34. }
  35. stgsMp[stg.StorageID] = &details[i]
  36. }
  37. err = svc.db2.Storage().FillDetails(tx, details)
  38. if err != nil {
  39. return err
  40. }
  41. return nil
  42. })
  43. ret := make([]*stgmod.StorageDetail, len(msg.StorageIDs))
  44. for i, id := range msg.StorageIDs {
  45. stg, ok := stgsMp[id]
  46. if !ok {
  47. ret[i] = nil
  48. continue
  49. }
  50. ret[i] = stg
  51. }
  52. return mq.ReplyOK(coormq.RespGetStorageDetails(ret))
  53. }
  54. func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*coormq.GetUserStorageDetailsResp, *mq.CodeMessage) {
  55. var ret []stgmod.StorageDetail
  56. svc.db2.DoTx(func(tx db2.SQLContext) error {
  57. stgs, err := svc.db2.Storage().GetUserStorages(tx, msg.UserID)
  58. if err != nil && err != gorm.ErrRecordNotFound {
  59. return fmt.Errorf("getting user storages: %w", err)
  60. }
  61. for _, stg := range stgs {
  62. ret = append(ret, stgmod.StorageDetail{
  63. Storage: stg,
  64. })
  65. }
  66. err = svc.db2.Storage().FillDetails(tx, ret)
  67. if err != nil {
  68. return err
  69. }
  70. return nil
  71. })
  72. return mq.ReplyOK(coormq.RespGetUserStorageDetails(ret))
  73. }
  74. func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetStorageByNameResp, *mq.CodeMessage) {
  75. stg, err := svc.db2.Storage().GetUserStorageByName(svc.db2.DefCtx(), msg.UserID, msg.Name)
  76. if err != nil {
  77. logger.Warnf("getting user storage by name: %s", err.Error())
  78. if errors.Is(err, gorm.ErrRecordNotFound) {
  79. return nil, mq.Failed(errorcode.DataNotFound, "storage not found")
  80. }
  81. return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed")
  82. }
  83. return mq.ReplyOK(coormq.RespGetStorageByNameResp(stg))
  84. }
  85. func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) {
  86. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  87. // TODO 权限检查
  88. exists, err := svc.db2.Object().BatchTestObjectID(tx, msg.PinnedObjects)
  89. if err != nil {
  90. return fmt.Errorf("testing object id: %w", err)
  91. }
  92. pinned := make([]cdssdk.PinnedObject, 0, len(msg.PinnedObjects))
  93. for _, obj := range msg.PinnedObjects {
  94. if exists[obj] {
  95. pinned = append(pinned, cdssdk.PinnedObject{
  96. StorageID: msg.StorageID,
  97. ObjectID: obj,
  98. CreateTime: time.Now(),
  99. })
  100. }
  101. }
  102. err = svc.db2.PinnedObject().BatchTryCreate(tx, pinned)
  103. if err != nil {
  104. return fmt.Errorf("batch creating pinned object: %w", err)
  105. }
  106. return nil
  107. })
  108. if err != nil {
  109. logger.WithField("UserID", msg.UserID).
  110. WithField("StorageID", msg.StorageID).
  111. WithField("PackageID", msg.PackageID).
  112. Warn(err.Error())
  113. return nil, mq.Failed(errorcode.OperationFailed, "user load package to storage failed")
  114. }
  115. return mq.ReplyOK(coormq.RespStoragePackageLoaded())
  116. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。