You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

user_space.go 8.4 kB

6 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
6 months ago
8 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package services
  2. import (
  3. "context"
  4. "fmt"
  5. "path"
  6. "strings"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/trie"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  12. cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
  13. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  14. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
  15. stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder"
  17. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  18. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  19. hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub"
  20. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
  21. )
  22. type UserSpaceService struct {
  23. *Service
  24. }
  25. func (svc *Service) UserSpaceSvc() *UserSpaceService {
  26. return &UserSpaceService{Service: svc}
  27. }
  28. func (svc *UserSpaceService) Get(userspaceID clitypes.UserSpaceID) (clitypes.UserSpace, error) {
  29. return svc.DB.UserSpace().GetByID(svc.DB.DefCtx(), userspaceID)
  30. }
  31. func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error) {
  32. return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
  33. }
  34. func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
  35. coorCli := stgglb.CoordinatorRPCPool.Get()
  36. defer coorCli.Release()
  37. destStg := svc.UserSpaceMeta.Get(userspaceID)
  38. if destStg == nil {
  39. return fmt.Errorf("userspace not found: %d", userspaceID)
  40. }
  41. if destStg.MasterHub == nil {
  42. return fmt.Errorf("userspace %v has no master hub", userspaceID)
  43. }
  44. details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, packageID)
  45. if err != nil {
  46. return err
  47. }
  48. var pinned []clitypes.ObjectID
  49. plans := exec.NewPlanBuilder()
  50. for _, obj := range details {
  51. strg, err := svc.StrategySelector.Select(strategy.Request{
  52. Detail: obj,
  53. DestHub: destStg.MasterHub.HubID,
  54. })
  55. if err != nil {
  56. return fmt.Errorf("select download strategy: %w", err)
  57. }
  58. ft := ioswitch2.NewFromTo()
  59. switch strg := strg.(type) {
  60. case *strategy.DirectStrategy:
  61. ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, *strg.UserSpace.MasterHub, strg.UserSpace, ioswitch2.RawStream()))
  62. case *strategy.ECReconstructStrategy:
  63. for i, b := range strg.Blocks {
  64. ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, *strg.UserSpaces[i].MasterHub, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
  65. ft.ECParam = &strg.Redundancy
  66. }
  67. default:
  68. return fmt.Errorf("unsupported download strategy: %T", strg)
  69. }
  70. ft.AddTo(ioswitch2.NewToPublicStore(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
  71. // 顺便保存到同存储服务的分片存储中
  72. if destStg.UserSpace.ShardStore != nil {
  73. ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), ""))
  74. pinned = append(pinned, obj.Object.ObjectID)
  75. }
  76. err = parser.Parse(ft, plans)
  77. if err != nil {
  78. return fmt.Errorf("parse plan: %w", err)
  79. }
  80. }
  81. mutex, err := reqbuilder.NewBuilder().
  82. Shard().Buzy(userspaceID).
  83. MutexLock(svc.PubLock)
  84. if err != nil {
  85. return fmt.Errorf("acquire locks failed, err: %w", err)
  86. }
  87. defer mutex.Unlock()
  88. // 记录访问统计
  89. for _, obj := range details {
  90. svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
  91. }
  92. drv := plans.Execute(exec.NewExecContext())
  93. _, err = drv.Wait(context.Background())
  94. if err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPath string, dstSpaceID clitypes.UserSpaceID, dstPath string) (clitypes.SpaceToSpaceResult, error) {
  100. srcSpace := svc.UserSpaceMeta.Get(srcSpaceID)
  101. if srcSpace == nil {
  102. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace not found: %d", srcSpaceID)
  103. }
  104. if srcSpace.MasterHub == nil {
  105. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no master hub", srcSpaceID)
  106. }
  107. srcAddr, ok := srcSpace.MasterHub.Address.(*cortypes.GRPCAddressInfo)
  108. if !ok {
  109. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no grpc address", srcSpaceID)
  110. }
  111. srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *srcAddr))
  112. defer srcSpaceCli.Release()
  113. dstSpace := svc.UserSpaceMeta.Get(dstSpaceID)
  114. if dstSpace == nil {
  115. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace not found: %d", dstSpaceID)
  116. }
  117. if dstSpace.MasterHub == nil {
  118. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no master hub", dstSpaceID)
  119. }
  120. dstAddr, ok := dstSpace.MasterHub.Address.(*cortypes.GRPCAddressInfo)
  121. if !ok {
  122. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no grpc address", srcSpaceID)
  123. }
  124. dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*dstSpace.MasterHub, *dstAddr))
  125. defer dstSpaceCli.Release()
  126. srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator)
  127. dstPath = strings.Trim(dstPath, cdssdk.ObjectPathSeparator)
  128. if srcPath == "" {
  129. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source path is empty")
  130. }
  131. if dstPath == "" {
  132. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination path is empty")
  133. }
  134. listAllResp, cerr := srcSpaceCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{
  135. UserSpace: *srcSpace,
  136. Path: srcPath,
  137. })
  138. if cerr != nil {
  139. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", cerr.ToError())
  140. }
  141. srcPathComps := clitypes.SplitObjectPath(srcPath)
  142. srcDirCompLen := len(srcPathComps) - 1
  143. entryTree := trie.NewTrie[*types.PublicStoreEntry]()
  144. for _, e := range listAllResp.Entries {
  145. pa, ok := strings.CutSuffix(e.Path, clitypes.ObjectPathSeparator)
  146. comps := clitypes.SplitObjectPath(pa)
  147. e.Path = pa
  148. e2 := e
  149. entryTree.CreateWords(comps[srcDirCompLen:]).Value = &e2
  150. e2.IsDir = e2.IsDir || ok
  151. }
  152. entryTree.Iterate(func(path []string, node *trie.Node[*types.PublicStoreEntry], isWordNode bool) trie.VisitCtrl {
  153. if node.Value == nil {
  154. return trie.VisitContinue
  155. }
  156. if node.Value.IsDir && len(node.WordNexts) > 0 {
  157. node.Value = nil
  158. return trie.VisitContinue
  159. }
  160. if !node.Value.IsDir && len(node.WordNexts) == 0 {
  161. node.WordNexts = nil
  162. }
  163. return trie.VisitContinue
  164. })
  165. var filePathes []string
  166. var dirPathes []string
  167. entryTree.Iterate(func(path []string, node *trie.Node[*types.PublicStoreEntry], isWordNode bool) trie.VisitCtrl {
  168. if node.Value == nil {
  169. return trie.VisitContinue
  170. }
  171. if node.Value.IsDir {
  172. dirPathes = append(dirPathes, node.Value.Path)
  173. } else {
  174. filePathes = append(filePathes, node.Value.Path)
  175. }
  176. return trie.VisitContinue
  177. })
  178. var success []string
  179. var failed []string
  180. for _, f := range filePathes {
  181. newPath := strings.Replace(f, srcPath, dstPath, 1)
  182. ft := ioswitch2.NewFromTo()
  183. ft.AddFrom(ioswitch2.NewFromPublicStore(*srcSpace.MasterHub, *srcSpace, f))
  184. ft.AddTo(ioswitch2.NewToPublicStore(*dstSpace.MasterHub, *dstSpace, newPath))
  185. plans := exec.NewPlanBuilder()
  186. err := parser.Parse(ft, plans)
  187. if err != nil {
  188. failed = append(failed, f)
  189. logger.Warnf("s2s: parse plan of file %v: %v", f, err)
  190. continue
  191. }
  192. _, cerr := plans.Execute(exec.NewExecContext()).Wait(context.Background())
  193. if cerr != nil {
  194. failed = append(failed, f)
  195. logger.Warnf("s2s: execute plan of file %v: %v", f, cerr)
  196. continue
  197. }
  198. success = append(success, f)
  199. }
  200. newDirPathes := make([]string, 0, len(dirPathes))
  201. for i := range dirPathes {
  202. newDirPathes = append(newDirPathes, strings.Replace(dirPathes[i], srcPath, dstPath, 1))
  203. }
  204. mkdirResp, err := dstSpaceCli.PublicStoreMkdirs(context.Background(), &hubrpc.PublicStoreMkdirs{
  205. UserSpace: *dstSpace,
  206. Pathes: newDirPathes,
  207. })
  208. if err != nil {
  209. failed = append(failed, dirPathes...)
  210. logger.Warnf("s2s: mkdirs to destination userspace: %v", err)
  211. } else {
  212. for i := range dirPathes {
  213. if mkdirResp.Successes[i] {
  214. success = append(success, dirPathes[i])
  215. } else {
  216. failed = append(failed, dirPathes[i])
  217. }
  218. }
  219. }
  220. return clitypes.SpaceToSpaceResult{
  221. Success: success,
  222. Failed: failed,
  223. }, nil
  224. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。