You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

uploader.go 5.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package uploader
  2. import (
  3. "fmt"
  4. "math"
  5. "math/rand"
  6. "time"
  7. "github.com/samber/lo"
  8. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  9. "gitlink.org.cn/cloudream/common/utils/sort2"
  10. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  11. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/metacache"
  16. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
  19. )
  20. type Uploader struct {
  21. distlock *distlock.Service
  22. connectivity *connectivity.Collector
  23. stgAgts *agtpool.AgentPool
  24. stgMeta *metacache.StorageMeta
  25. loadTo []cdssdk.StorageID
  26. loadToPath []string
  27. }
  28. func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.StorageMeta) *Uploader {
  29. return &Uploader{
  30. distlock: distlock,
  31. connectivity: connectivity,
  32. stgAgts: stgAgts,
  33. stgMeta: stgMeta,
  34. }
  35. }
  36. func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, affinity cdssdk.StorageID, loadTo []cdssdk.StorageID, loadToPath []string) (*UpdateUploader, error) {
  37. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  38. if err != nil {
  39. return nil, fmt.Errorf("new coordinator client: %w", err)
  40. }
  41. defer stgglb.CoordinatorMQPool.Release(coorCli)
  42. getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID))
  43. if err != nil {
  44. return nil, fmt.Errorf("getting user storages: %w", err)
  45. }
  46. cons := u.connectivity.GetAll()
  47. var userStgs []UploadStorageInfo
  48. for _, stg := range getUserStgsResp.Storages {
  49. if stg.MasterHub == nil {
  50. continue
  51. }
  52. delay := time.Duration(math.MaxInt64)
  53. con, ok := cons[stg.MasterHub.HubID]
  54. if ok && con.Latency != nil {
  55. delay = *con.Latency
  56. }
  57. userStgs = append(userStgs, UploadStorageInfo{
  58. Storage: stg,
  59. Delay: delay,
  60. IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID,
  61. })
  62. }
  63. if len(userStgs) == 0 {
  64. return nil, fmt.Errorf("user no available storages")
  65. }
  66. loadToStgs := make([]stgmod.StorageDetail, len(loadTo))
  67. for i, stgID := range loadTo {
  68. stg, ok := lo.Find(getUserStgsResp.Storages, func(stg stgmod.StorageDetail) bool {
  69. return stg.Storage.StorageID == stgID
  70. })
  71. if !ok {
  72. return nil, fmt.Errorf("load to storage %v not found", stgID)
  73. }
  74. if stg.MasterHub == nil {
  75. return nil, fmt.Errorf("load to storage %v has no master hub", stgID)
  76. }
  77. if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() {
  78. return nil, fmt.Errorf("load to storage %v has no public store", stgID)
  79. }
  80. loadToStgs[i] = stg
  81. }
  82. target := u.chooseUploadStorage(userStgs, affinity)
  83. // 给上传节点的IPFS加锁
  84. // TODO 考虑加Object的Create锁
  85. // 防止上传的副本被清除
  86. distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Storage.Storage.StorageID).MutexLock(u.distlock)
  87. if err != nil {
  88. return nil, fmt.Errorf("acquire distlock: %w", err)
  89. }
  90. return &UpdateUploader{
  91. uploader: u,
  92. pkgID: pkgID,
  93. targetStg: target.Storage,
  94. distMutex: distMutex,
  95. loadToStgs: loadToStgs,
  96. loadToPath: loadToPath,
  97. }, nil
  98. }
  99. // chooseUploadStorage 选择一个上传文件的节点
  100. // 1. 选择设置了亲和性的节点
  101. // 2. 从与当前客户端相同地域的节点中随机选一个
  102. // 3. 没有的话从所有节点选择延迟最低的节点
  103. func (w *Uploader) chooseUploadStorage(storages []UploadStorageInfo, stgAffinity cdssdk.StorageID) UploadStorageInfo {
  104. if stgAffinity > 0 {
  105. aff, ok := lo.Find(storages, func(storage UploadStorageInfo) bool { return storage.Storage.Storage.StorageID == stgAffinity })
  106. if ok {
  107. return aff
  108. }
  109. }
  110. sameLocationStorages := lo.Filter(storages, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation })
  111. if len(sameLocationStorages) > 0 {
  112. return sameLocationStorages[rand.Intn(len(sameLocationStorages))]
  113. }
  114. // 选择延迟最低的节点
  115. storages = sort2.Sort(storages, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
  116. return storages[0]
  117. }
  118. func (u *Uploader) BeginCreateLoad(userID cdssdk.UserID, bktID cdssdk.BucketID, pkgName string, loadTo []cdssdk.StorageID, loadToPath []string) (*CreateLoadUploader, error) {
  119. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  120. if err != nil {
  121. return nil, fmt.Errorf("new coordinator client: %w", err)
  122. }
  123. defer stgglb.CoordinatorMQPool.Release(coorCli)
  124. getStgs := u.stgMeta.GetMany(loadTo)
  125. targetStgs := make([]stgmod.StorageDetail, len(loadTo))
  126. for i, stg := range getStgs {
  127. if stg == nil {
  128. return nil, fmt.Errorf("storage %v not found", loadTo[i])
  129. }
  130. targetStgs[i] = *stg
  131. }
  132. createPkg, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bktID, pkgName))
  133. if err != nil {
  134. return nil, fmt.Errorf("create package: %w", err)
  135. }
  136. reqBld := reqbuilder.NewBuilder()
  137. for _, stg := range targetStgs {
  138. reqBld.Shard().Buzy(stg.Storage.StorageID)
  139. reqBld.Storage().Buzy(stg.Storage.StorageID)
  140. }
  141. lock, err := reqBld.MutexLock(u.distlock)
  142. if err != nil {
  143. return nil, fmt.Errorf("acquire distlock: %w", err)
  144. }
  145. return &CreateLoadUploader{
  146. pkg: createPkg.Package,
  147. userID: userID,
  148. targetStgs: targetStgs,
  149. loadRoots: loadToPath,
  150. uploader: u,
  151. distlock: lock,
  152. }, nil
  153. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。