You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

download_object_iterator.go 9.9 kB

2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package iterator
  2. import (
  3. "fmt"
  4. "io"
  5. "math"
  6. "reflect"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/bitmap"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. myio "gitlink.org.cn/cloudream/common/utils/io"
  12. mysort "gitlink.org.cn/cloudream/common/utils/sort"
  13. "gitlink.org.cn/cloudream/storage/common/consts"
  14. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  15. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  16. stgmodels "gitlink.org.cn/cloudream/storage/common/models"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
  19. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  20. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  21. )
  22. type DownloadingObjectIterator = Iterator[*IterDownloadingObject]
  23. type IterDownloadingObject struct {
  24. Object model.Object
  25. File io.ReadCloser
  26. }
  27. type DownloadNodeInfo struct {
  28. Node cdssdk.Node
  29. ObjectPinned bool
  30. Blocks []stgmod.ObjectBlock
  31. Distance float64
  32. }
  33. type DownloadContext struct {
  34. Distlock *distlock.Service
  35. }
  36. type DownloadObjectIterator struct {
  37. OnClosing func()
  38. objectDetails []stgmodels.ObjectDetail
  39. currentIndex int
  40. downloadCtx *DownloadContext
  41. }
  42. func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator {
  43. return &DownloadObjectIterator{
  44. objectDetails: objectDetails,
  45. downloadCtx: downloadCtx,
  46. }
  47. }
  48. func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) {
  49. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  50. if err != nil {
  51. return nil, fmt.Errorf("new coordinator client: %w", err)
  52. }
  53. defer stgglb.CoordinatorMQPool.Release(coorCli)
  54. if i.currentIndex >= len(i.objectDetails) {
  55. return nil, ErrNoMoreItem
  56. }
  57. item, err := i.doMove(coorCli)
  58. i.currentIndex++
  59. return item, err
  60. }
  61. func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
  62. obj := iter.objectDetails[iter.currentIndex]
  63. switch red := obj.Object.Redundancy.(type) {
  64. case *cdssdk.NoneRedundancy:
  65. reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj)
  66. if err != nil {
  67. return nil, fmt.Errorf("downloading object: %w", err)
  68. }
  69. return &IterDownloadingObject{
  70. Object: obj.Object,
  71. File: reader,
  72. }, nil
  73. case *cdssdk.RepRedundancy:
  74. reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj)
  75. if err != nil {
  76. return nil, fmt.Errorf("downloading rep object: %w", err)
  77. }
  78. return &IterDownloadingObject{
  79. Object: obj.Object,
  80. File: reader,
  81. }, nil
  82. case *cdssdk.ECRedundancy:
  83. reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red)
  84. if err != nil {
  85. return nil, fmt.Errorf("downloading ec object: %w", err)
  86. }
  87. return &IterDownloadingObject{
  88. Object: obj.Object,
  89. File: reader,
  90. }, nil
  91. }
  92. return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy))
  93. }
  94. func (i *DownloadObjectIterator) Close() {
  95. if i.OnClosing != nil {
  96. i.OnClosing()
  97. }
  98. }
  99. func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
  100. allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj)
  101. if err != nil {
  102. return nil, err
  103. }
  104. bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
  105. osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
  106. if bsc < osc {
  107. return downloadFile(ctx, blocks[0].Node, blocks[0].Block.FileHash)
  108. }
  109. // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
  110. if osc == math.MaxFloat64 {
  111. return nil, fmt.Errorf("no node has this object")
  112. }
  113. return downloadFile(ctx, *node, obj.Object.FileHash)
  114. }
  115. func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
  116. allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj)
  117. if err != nil {
  118. return nil, err
  119. }
  120. bsc, blocks := iter.getMinReadingBlockSolution(allNodes, ecRed.K)
  121. osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K)
  122. if bsc < osc {
  123. var fileStrs []io.ReadCloser
  124. rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
  125. if err != nil {
  126. return nil, fmt.Errorf("new rs: %w", err)
  127. }
  128. for i, b := range blocks {
  129. str, err := downloadFile(ctx, b.Node, b.Block.FileHash)
  130. if err != nil {
  131. for i -= 1; i >= 0; i-- {
  132. fileStrs[i].Close()
  133. }
  134. return nil, fmt.Errorf("donwloading file: %w", err)
  135. }
  136. fileStrs = append(fileStrs, str)
  137. }
  138. fileReaders, filesCloser := myio.ToReaders(fileStrs)
  139. var indexes []int
  140. for _, b := range blocks {
  141. indexes = append(indexes, b.Block.Index)
  142. }
  143. outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
  144. return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
  145. filesCloser()
  146. outputsCloser()
  147. }), nil
  148. }
  149. // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
  150. if osc == math.MaxFloat64 {
  151. return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
  152. }
  153. return downloadFile(ctx, *node, obj.Object.FileHash)
  154. }
  155. func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) {
  156. var nodeIDs []cdssdk.NodeID
  157. for _, id := range obj.PinnedAt {
  158. if !lo.Contains(nodeIDs, id) {
  159. nodeIDs = append(nodeIDs, id)
  160. }
  161. }
  162. for _, b := range obj.Blocks {
  163. if !lo.Contains(nodeIDs, b.NodeID) {
  164. nodeIDs = append(nodeIDs, b.NodeID)
  165. }
  166. }
  167. getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs))
  168. if err != nil {
  169. return nil, fmt.Errorf("getting nodes: %w", err)
  170. }
  171. downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo)
  172. for _, id := range obj.PinnedAt {
  173. node, ok := downloadNodeMap[id]
  174. if !ok {
  175. mod := *getNodes.GetNode(id)
  176. node = &DownloadNodeInfo{
  177. Node: mod,
  178. ObjectPinned: true,
  179. Distance: iter.getNodeDistance(mod),
  180. }
  181. downloadNodeMap[id] = node
  182. }
  183. node.ObjectPinned = true
  184. }
  185. for _, b := range obj.Blocks {
  186. node, ok := downloadNodeMap[b.NodeID]
  187. if !ok {
  188. mod := *getNodes.GetNode(b.NodeID)
  189. node = &DownloadNodeInfo{
  190. Node: mod,
  191. Distance: iter.getNodeDistance(mod),
  192. }
  193. downloadNodeMap[b.NodeID] = node
  194. }
  195. node.Blocks = append(node.Blocks, b)
  196. }
  197. return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *DownloadNodeInfo) int {
  198. return mysort.Cmp(left.Distance, right.Distance)
  199. }), nil
  200. }
  201. type downloadBlock struct {
  202. Node cdssdk.Node
  203. Block stgmod.ObjectBlock
  204. }
  205. func (iter *DownloadObjectIterator) getMinReadingBlockSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, []downloadBlock) {
  206. gotBlocksMap := bitmap.Bitmap64(0)
  207. var gotBlocks []downloadBlock
  208. dist := float64(0.0)
  209. for _, n := range sortedNodes {
  210. for _, b := range n.Blocks {
  211. if !gotBlocksMap.Get(b.Index) {
  212. gotBlocks = append(gotBlocks, downloadBlock{
  213. Node: n.Node,
  214. Block: b,
  215. })
  216. gotBlocksMap.Set(b.Index, true)
  217. dist += n.Distance
  218. }
  219. if len(gotBlocks) >= k {
  220. return dist, gotBlocks
  221. }
  222. }
  223. }
  224. return math.MaxFloat64, gotBlocks
  225. }
  226. func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, *cdssdk.Node) {
  227. dist := math.MaxFloat64
  228. var downloadNode *cdssdk.Node
  229. for _, n := range sortedNodes {
  230. if n.ObjectPinned && float64(k)*n.Distance < dist {
  231. dist = float64(k) * n.Distance
  232. downloadNode = &n.Node
  233. }
  234. }
  235. return dist, downloadNode
  236. }
  237. func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 {
  238. if stgglb.Local.NodeID != nil {
  239. if node.NodeID == *stgglb.Local.NodeID {
  240. return consts.NodeDistanceSameNode
  241. }
  242. }
  243. if node.LocationID == stgglb.Local.LocationID {
  244. return consts.NodeDistanceSameLocation
  245. }
  246. return consts.NodeDistanceOther
  247. }
  248. func downloadFile(ctx *DownloadContext, node cdssdk.Node, fileHash string) (io.ReadCloser, error) {
  249. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  250. nodeIP := node.ExternalIP
  251. grpcPort := node.ExternalGRPCPort
  252. if node.LocationID == stgglb.Local.LocationID {
  253. nodeIP = node.LocalIP
  254. grpcPort = node.LocalGRPCPort
  255. logger.Infof("client and node %d are at the same location, use local ip", node.NodeID)
  256. }
  257. if stgglb.IPFSPool != nil {
  258. logger.Infof("try to use local IPFS to download file")
  259. reader, err := downloadFromLocalIPFS(ctx, fileHash)
  260. if err == nil {
  261. return reader, nil
  262. }
  263. logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
  264. }
  265. return downloadFromNode(ctx, node.NodeID, nodeIP, grpcPort, fileHash)
  266. }
  267. func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {
  268. agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
  269. if err != nil {
  270. return nil, fmt.Errorf("new agent grpc client: %w", err)
  271. }
  272. reader, err := agtCli.GetIPFSFile(fileHash)
  273. if err != nil {
  274. return nil, fmt.Errorf("getting ipfs file: %w", err)
  275. }
  276. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  277. agtCli.Close()
  278. })
  279. return reader, nil
  280. }
  281. func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
  282. ipfsCli, err := stgglb.IPFSPool.Acquire()
  283. if err != nil {
  284. return nil, fmt.Errorf("new ipfs client: %w", err)
  285. }
  286. reader, err := ipfsCli.OpenRead(fileHash)
  287. if err != nil {
  288. return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
  289. }
  290. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  291. ipfsCli.Close()
  292. })
  293. return reader, nil
  294. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。