You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

uploader.go 8.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package uploader
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "time"
  9. "github.com/samber/lo"
  10. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  11. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  12. "gitlink.org.cn/cloudream/common/utils/sort2"
  13. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  14. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  19. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  20. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
  21. "gitlink.org.cn/cloudream/storage/common/pkgs/metacache"
  22. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  23. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
  24. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
  25. )
  26. type Uploader struct {
  27. distlock *distlock.Service
  28. connectivity *connectivity.Collector
  29. stgAgts *agtpool.AgentPool
  30. stgMeta *metacache.StorageMeta
  31. }
  32. func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.StorageMeta) *Uploader {
  33. return &Uploader{
  34. distlock: distlock,
  35. connectivity: connectivity,
  36. stgAgts: stgAgts,
  37. stgMeta: stgMeta,
  38. }
  39. }
  40. func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, affinity cdssdk.StorageID, loadTo []cdssdk.StorageID, loadToPath []string) (*UpdateUploader, error) {
  41. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  42. if err != nil {
  43. return nil, fmt.Errorf("new coordinator client: %w", err)
  44. }
  45. defer stgglb.CoordinatorMQPool.Release(coorCli)
  46. getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID))
  47. if err != nil {
  48. return nil, fmt.Errorf("getting user storages: %w", err)
  49. }
  50. cons := u.connectivity.GetAll()
  51. var userStgs []UploadStorageInfo
  52. for _, stg := range getUserStgsResp.Storages {
  53. if stg.MasterHub == nil {
  54. continue
  55. }
  56. delay := time.Duration(math.MaxInt64)
  57. con, ok := cons[stg.MasterHub.HubID]
  58. if ok && con.Latency != nil {
  59. delay = *con.Latency
  60. }
  61. userStgs = append(userStgs, UploadStorageInfo{
  62. Storage: stg,
  63. Delay: delay,
  64. IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID,
  65. })
  66. }
  67. if len(userStgs) == 0 {
  68. return nil, fmt.Errorf("user no available storages")
  69. }
  70. loadToStgs := make([]stgmod.StorageDetail, len(loadTo))
  71. for i, stgID := range loadTo {
  72. stg, ok := lo.Find(getUserStgsResp.Storages, func(stg stgmod.StorageDetail) bool {
  73. return stg.Storage.StorageID == stgID
  74. })
  75. if !ok {
  76. return nil, fmt.Errorf("load to storage %v not found", stgID)
  77. }
  78. if stg.MasterHub == nil {
  79. return nil, fmt.Errorf("load to storage %v has no master hub", stgID)
  80. }
  81. if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() {
  82. return nil, fmt.Errorf("load to storage %v has no public store", stgID)
  83. }
  84. loadToStgs[i] = stg
  85. }
  86. target := u.chooseUploadStorage(userStgs, affinity)
  87. // 给上传节点的IPFS加锁
  88. // TODO 考虑加Object的Create锁
  89. // 防止上传的副本被清除
  90. distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Storage.Storage.StorageID).MutexLock(u.distlock)
  91. if err != nil {
  92. return nil, fmt.Errorf("acquire distlock: %w", err)
  93. }
  94. return &UpdateUploader{
  95. uploader: u,
  96. pkgID: pkgID,
  97. targetStg: target.Storage,
  98. distMutex: distMutex,
  99. loadToStgs: loadToStgs,
  100. loadToPath: loadToPath,
  101. }, nil
  102. }
  103. // chooseUploadStorage 选择一个上传文件的节点
  104. // 1. 选择设置了亲和性的节点
  105. // 2. 从与当前客户端相同地域的节点中随机选一个
  106. // 3. 没有的话从所有节点选择延迟最低的节点
  107. func (w *Uploader) chooseUploadStorage(storages []UploadStorageInfo, stgAffinity cdssdk.StorageID) UploadStorageInfo {
  108. if stgAffinity > 0 {
  109. aff, ok := lo.Find(storages, func(storage UploadStorageInfo) bool { return storage.Storage.Storage.StorageID == stgAffinity })
  110. if ok {
  111. return aff
  112. }
  113. }
  114. sameLocationStorages := lo.Filter(storages, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation })
  115. if len(sameLocationStorages) > 0 {
  116. return sameLocationStorages[rand.Intn(len(sameLocationStorages))]
  117. }
  118. // 选择延迟最低的节点
  119. storages = sort2.Sort(storages, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
  120. return storages[0]
  121. }
  122. func (u *Uploader) BeginCreateLoad(userID cdssdk.UserID, bktID cdssdk.BucketID, pkgName string, loadTo []cdssdk.StorageID, loadToPath []string) (*CreateLoadUploader, error) {
  123. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  124. if err != nil {
  125. return nil, fmt.Errorf("new coordinator client: %w", err)
  126. }
  127. defer stgglb.CoordinatorMQPool.Release(coorCli)
  128. getStgs := u.stgMeta.GetMany(loadTo)
  129. targetStgs := make([]stgmod.StorageDetail, len(loadTo))
  130. for i, stg := range getStgs {
  131. if stg == nil {
  132. return nil, fmt.Errorf("storage %v not found", loadTo[i])
  133. }
  134. targetStgs[i] = *stg
  135. }
  136. createPkg, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bktID, pkgName))
  137. if err != nil {
  138. return nil, fmt.Errorf("create package: %w", err)
  139. }
  140. reqBld := reqbuilder.NewBuilder()
  141. for _, stg := range targetStgs {
  142. reqBld.Shard().Buzy(stg.Storage.StorageID)
  143. reqBld.Storage().Buzy(stg.Storage.StorageID)
  144. }
  145. lock, err := reqBld.MutexLock(u.distlock)
  146. if err != nil {
  147. return nil, fmt.Errorf("acquire distlock: %w", err)
  148. }
  149. return &CreateLoadUploader{
  150. pkg: createPkg.Package,
  151. userID: userID,
  152. targetStgs: targetStgs,
  153. loadRoots: loadToPath,
  154. uploader: u,
  155. distlock: lock,
  156. }, nil
  157. }
  158. func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index int, stream io.Reader) error {
  159. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  160. if err != nil {
  161. return fmt.Errorf("new coordinator client: %w", err)
  162. }
  163. defer stgglb.CoordinatorMQPool.Release(coorCli)
  164. details, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objID}))
  165. if err != nil {
  166. return err
  167. }
  168. if details.Objects[0] == nil {
  169. return fmt.Errorf("object %v not found", objID)
  170. }
  171. objDe := details.Objects[0]
  172. _, ok := objDe.Object.Redundancy.(*cdssdk.MultipartUploadRedundancy)
  173. if !ok {
  174. return fmt.Errorf("object %v is not a multipart upload", objID)
  175. }
  176. var stg stgmod.StorageDetail
  177. if len(objDe.Blocks) > 0 {
  178. cstg := u.stgMeta.Get(objDe.Blocks[0].StorageID)
  179. if cstg == nil {
  180. return fmt.Errorf("storage %v not found", objDe.Blocks[0].StorageID)
  181. }
  182. stg = *cstg
  183. } else {
  184. getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID))
  185. if err != nil {
  186. return fmt.Errorf("getting user storages: %w", err)
  187. }
  188. cons := u.connectivity.GetAll()
  189. var userStgs []UploadStorageInfo
  190. for _, stg := range getUserStgsResp.Storages {
  191. if stg.MasterHub == nil {
  192. continue
  193. }
  194. delay := time.Duration(math.MaxInt64)
  195. con, ok := cons[stg.MasterHub.HubID]
  196. if ok && con.Latency != nil {
  197. delay = *con.Latency
  198. }
  199. userStgs = append(userStgs, UploadStorageInfo{
  200. Storage: stg,
  201. Delay: delay,
  202. IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID,
  203. })
  204. }
  205. if len(userStgs) == 0 {
  206. return fmt.Errorf("user no available storages")
  207. }
  208. stg = u.chooseUploadStorage(userStgs, 0).Storage
  209. }
  210. lock, err := reqbuilder.NewBuilder().Shard().Buzy(stg.Storage.StorageID).MutexLock(u.distlock)
  211. if err != nil {
  212. return fmt.Errorf("acquire distlock: %w", err)
  213. }
  214. defer lock.Unlock()
  215. ft := ioswitch2.NewFromTo()
  216. fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
  217. ft.AddFrom(fromDrv).
  218. AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "shard"))
  219. plans := exec.NewPlanBuilder()
  220. err = parser.Parse(ft, plans)
  221. if err != nil {
  222. return fmt.Errorf("parse fromto: %w", err)
  223. }
  224. exeCtx := exec.NewExecContext()
  225. exec.SetValueByType(exeCtx, u.stgAgts)
  226. exec := plans.Execute(exeCtx)
  227. exec.BeginWrite(io.NopCloser(stream), hd)
  228. ret, err := exec.Wait(context.TODO())
  229. if err != nil {
  230. return fmt.Errorf("executing plan: %w", err)
  231. }
  232. shardInfo := ret["shard"].(*ops2.ShardInfoValue)
  233. _, err = coorCli.AddMultipartUploadPart(coormq.ReqAddMultipartUploadPart(userID, objID, stgmod.ObjectBlock{
  234. ObjectID: objID,
  235. Index: index,
  236. StorageID: stg.Storage.StorageID,
  237. FileHash: shardInfo.Hash,
  238. Size: shardInfo.Size,
  239. }))
  240. return err
  241. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。