You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

upload_objects.go 6.7 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package cmd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "time"
  9. "github.com/samber/lo"
  10. "gitlink.org.cn/cloudream/common/pkgs/distlock"
  11. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  12. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  13. "gitlink.org.cn/cloudream/common/utils/sort2"
  14. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  15. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  19. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  20. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
  21. "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
  22. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  23. )
  24. type UploadObjects struct {
  25. userID cdssdk.UserID
  26. packageID cdssdk.PackageID
  27. objectIter iterator.UploadingObjectIterator
  28. nodeAffinity *cdssdk.NodeID
  29. }
  30. type UploadObjectsResult struct {
  31. Objects []ObjectUploadResult
  32. }
  33. type ObjectUploadResult struct {
  34. Info *iterator.IterUploadingObject
  35. Error error
  36. Object cdssdk.Object
  37. }
  38. type UploadStorageInfo struct {
  39. Storage stgmod.StorageDetail
  40. Delay time.Duration
  41. IsSameLocation bool
  42. }
  43. type UploadObjectsContext struct {
  44. Distlock *distlock.Service
  45. Connectivity *connectivity.Collector
  46. }
  47. func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects {
  48. return &UploadObjects{
  49. userID: userID,
  50. packageID: packageID,
  51. objectIter: objIter,
  52. nodeAffinity: nodeAffinity,
  53. }
  54. }
  55. func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult, error) {
  56. defer t.objectIter.Close()
  57. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  58. if err != nil {
  59. return nil, fmt.Errorf("new coordinator client: %w", err)
  60. }
  61. getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(t.userID))
  62. if err != nil {
  63. return nil, fmt.Errorf("getting user nodes: %w", err)
  64. }
  65. cons := ctx.Connectivity.GetAll()
  66. var userStgs []UploadStorageInfo
  67. for _, stg := range getUserStgsResp.Storages {
  68. if stg.MasterHub == nil {
  69. continue
  70. }
  71. delay := time.Duration(math.MaxInt64)
  72. con, ok := cons[stg.MasterHub.NodeID]
  73. if ok && con.Delay != nil {
  74. delay = *con.Delay
  75. }
  76. userStgs = append(userStgs, UploadStorageInfo{
  77. Storage: stg,
  78. Delay: delay,
  79. IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID,
  80. })
  81. }
  82. if len(userStgs) == 0 {
  83. return nil, fmt.Errorf("user no available nodes")
  84. }
  85. // 给上传节点的IPFS加锁
  86. ipfsReqBlder := reqbuilder.NewBuilder()
  87. for _, us := range userStgs {
  88. ipfsReqBlder.Shard().Buzy(us.Storage.Storage.StorageID)
  89. }
  90. // TODO 考虑加Object的Create锁
  91. // 防止上传的副本被清除
  92. ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock)
  93. if err != nil {
  94. return nil, fmt.Errorf("acquire locks failed, err: %w", err)
  95. }
  96. defer ipfsMutex.Unlock()
  97. rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userStgs, t.nodeAffinity)
  98. if err != nil {
  99. return nil, err
  100. }
  101. return &UploadObjectsResult{
  102. Objects: rets,
  103. }, nil
  104. }
  105. // chooseUploadNode 选择一个上传文件的节点
  106. // 1. 选择设置了亲和性的节点
  107. // 2. 从与当前客户端相同地域的节点中随机选一个
  108. // 3. 没有的话从所有节点选择延迟最低的节点
  109. func chooseUploadNode(nodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) UploadStorageInfo {
  110. if nodeAffinity != nil {
  111. aff, ok := lo.Find(nodes, func(node UploadStorageInfo) bool { return node.Storage.MasterHub.NodeID == *nodeAffinity })
  112. if ok {
  113. return aff
  114. }
  115. }
  116. sameLocationNodes := lo.Filter(nodes, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation })
  117. if len(sameLocationNodes) > 0 {
  118. return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
  119. }
  120. // 选择延迟最低的节点
  121. nodes = sort2.Sort(nodes, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
  122. return nodes[0]
  123. }
  124. func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) {
  125. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  126. if err != nil {
  127. return nil, fmt.Errorf("new coordinator client: %w", err)
  128. }
  129. defer stgglb.CoordinatorMQPool.Release(coorCli)
  130. // 为所有文件选择相同的上传节点
  131. uploadNode := chooseUploadNode(userNodes, nodeAffinity)
  132. var uploadRets []ObjectUploadResult
  133. //上传文件夹
  134. var adds []coormq.AddObjectEntry
  135. for {
  136. objInfo, err := objectIter.MoveNext()
  137. if err == iterator.ErrNoMoreItem {
  138. break
  139. }
  140. if err != nil {
  141. return nil, fmt.Errorf("reading object: %w", err)
  142. }
  143. err = func() error {
  144. defer objInfo.File.Close()
  145. uploadTime := time.Now()
  146. fileHash, err := uploadFile(objInfo.File, uploadNode)
  147. if err != nil {
  148. return fmt.Errorf("uploading file: %w", err)
  149. }
  150. uploadRets = append(uploadRets, ObjectUploadResult{
  151. Info: objInfo,
  152. Error: err,
  153. })
  154. adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadTime, uploadNode.Storage.Storage.StorageID))
  155. return nil
  156. }()
  157. if err != nil {
  158. return nil, err
  159. }
  160. }
  161. updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil))
  162. if err != nil {
  163. return nil, fmt.Errorf("updating package: %w", err)
  164. }
  165. updatedObjs := make(map[string]*cdssdk.Object)
  166. for _, obj := range updateResp.Added {
  167. o := obj
  168. updatedObjs[obj.Path] = &o
  169. }
  170. for i := range uploadRets {
  171. obj := updatedObjs[uploadRets[i].Info.Path]
  172. if obj == nil {
  173. uploadRets[i].Error = fmt.Errorf("object %s not found in package", uploadRets[i].Info.Path)
  174. continue
  175. }
  176. uploadRets[i].Object = *obj
  177. }
  178. return uploadRets, nil
  179. }
  180. func uploadFile(file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, error) {
  181. ft := ioswitch2.NewFromTo()
  182. fromExec, hd := ioswitch2.NewFromDriver(-1)
  183. ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*uploadStg.Storage.MasterHub, uploadStg.Storage.Storage, -1, "fileHash"))
  184. parser := parser.NewParser(cdssdk.DefaultECRedundancy)
  185. plans := exec.NewPlanBuilder()
  186. err := parser.Parse(ft, plans)
  187. if err != nil {
  188. return "", fmt.Errorf("parsing plan: %w", err)
  189. }
  190. // TODO2 注入依赖
  191. exeCtx := exec.NewExecContext()
  192. exec := plans.Execute(exeCtx)
  193. exec.BeginWrite(io.NopCloser(file), hd)
  194. ret, err := exec.Wait(context.TODO())
  195. if err != nil {
  196. return "", err
  197. }
  198. return ret["fileHash"].(*ops2.FileHashValue).Hash, nil
  199. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。