You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

download_object_iterator.go 10 kB

1 year ago
2 years ago
2 years ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. package iterator
  2. import (
  3. "fmt"
  4. "io"
  5. "math"
  6. "reflect"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/bitmap"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. myio "gitlink.org.cn/cloudream/common/utils/io"
  12. "gitlink.org.cn/cloudream/common/utils/sort2"
  13. "gitlink.org.cn/cloudream/storage/common/consts"
  14. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  15. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  16. stgmodels "gitlink.org.cn/cloudream/storage/common/models"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  19. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  20. )
  21. type DownloadingObjectIterator = Iterator[*IterDownloadingObject]
  22. type IterDownloadingObject struct {
  23. Object cdssdk.Object
  24. File io.ReadCloser
  25. }
  26. type DownloadNodeInfo struct {
  27. Node cdssdk.Node
  28. ObjectPinned bool
  29. Blocks []stgmod.ObjectBlock
  30. Distance float64
  31. }
  32. type DownloadContext struct {
  33. Distlock *distlock.Service
  34. }
  35. type DownloadObjectIterator struct {
  36. OnClosing func()
  37. objectDetails []stgmodels.ObjectDetail
  38. currentIndex int
  39. inited bool
  40. downloadCtx *DownloadContext
  41. coorCli *coormq.Client
  42. allNodes map[cdssdk.NodeID]cdssdk.Node
  43. }
  44. func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator {
  45. return &DownloadObjectIterator{
  46. objectDetails: objectDetails,
  47. downloadCtx: downloadCtx,
  48. }
  49. }
  50. func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) {
  51. if !i.inited {
  52. if err := i.init(); err != nil {
  53. return nil, err
  54. }
  55. i.inited = true
  56. }
  57. if i.currentIndex >= len(i.objectDetails) {
  58. return nil, ErrNoMoreItem
  59. }
  60. item, err := i.doMove()
  61. i.currentIndex++
  62. return item, err
  63. }
  64. func (i *DownloadObjectIterator) init() error {
  65. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  66. if err != nil {
  67. return fmt.Errorf("new coordinator client: %w", err)
  68. }
  69. i.coorCli = coorCli
  70. allNodeIDs := make(map[cdssdk.NodeID]bool)
  71. for _, obj := range i.objectDetails {
  72. for _, p := range obj.PinnedAt {
  73. allNodeIDs[p] = true
  74. }
  75. for _, b := range obj.Blocks {
  76. allNodeIDs[b.NodeID] = true
  77. }
  78. }
  79. getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Keys(allNodeIDs)))
  80. if err != nil {
  81. return fmt.Errorf("getting nodes: %w", err)
  82. }
  83. i.allNodes = make(map[cdssdk.NodeID]cdssdk.Node)
  84. for _, n := range getNodes.Nodes {
  85. i.allNodes[n.NodeID] = n
  86. }
  87. return nil
  88. }
  89. func (iter *DownloadObjectIterator) doMove() (*IterDownloadingObject, error) {
  90. obj := iter.objectDetails[iter.currentIndex]
  91. switch red := obj.Object.Redundancy.(type) {
  92. case *cdssdk.NoneRedundancy:
  93. reader, err := iter.downloadNoneOrRepObject(obj)
  94. if err != nil {
  95. return nil, fmt.Errorf("downloading object: %w", err)
  96. }
  97. return &IterDownloadingObject{
  98. Object: obj.Object,
  99. File: reader,
  100. }, nil
  101. case *cdssdk.RepRedundancy:
  102. reader, err := iter.downloadNoneOrRepObject(obj)
  103. if err != nil {
  104. return nil, fmt.Errorf("downloading rep object: %w", err)
  105. }
  106. return &IterDownloadingObject{
  107. Object: obj.Object,
  108. File: reader,
  109. }, nil
  110. case *cdssdk.ECRedundancy:
  111. reader, err := iter.downloadECObject(obj, red)
  112. if err != nil {
  113. return nil, fmt.Errorf("downloading ec object: %w", err)
  114. }
  115. return &IterDownloadingObject{
  116. Object: obj.Object,
  117. File: reader,
  118. }, nil
  119. }
  120. return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy))
  121. }
  122. func (i *DownloadObjectIterator) Close() {
  123. if i.OnClosing != nil {
  124. i.OnClosing()
  125. }
  126. }
  127. func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
  128. allNodes, err := iter.sortDownloadNodes(obj)
  129. if err != nil {
  130. return nil, err
  131. }
  132. bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
  133. osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
  134. if bsc < osc {
  135. return downloadFile(iter.downloadCtx, blocks[0].Node, blocks[0].Block.FileHash)
  136. }
  137. // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
  138. if osc == math.MaxFloat64 {
  139. return nil, fmt.Errorf("no node has this object")
  140. }
  141. return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash)
  142. }
  143. func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
  144. allNodes, err := iter.sortDownloadNodes(obj)
  145. if err != nil {
  146. return nil, err
  147. }
  148. bsc, blocks := iter.getMinReadingBlockSolution(allNodes, ecRed.K)
  149. osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K)
  150. if bsc < osc {
  151. var fileStrs []io.ReadCloser
  152. rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
  153. if err != nil {
  154. return nil, fmt.Errorf("new rs: %w", err)
  155. }
  156. for i, b := range blocks {
  157. str, err := downloadFile(iter.downloadCtx, b.Node, b.Block.FileHash)
  158. if err != nil {
  159. for i -= 1; i >= 0; i-- {
  160. fileStrs[i].Close()
  161. }
  162. return nil, fmt.Errorf("donwloading file: %w", err)
  163. }
  164. fileStrs = append(fileStrs, str)
  165. }
  166. fileReaders, filesCloser := myio.ToReaders(fileStrs)
  167. var indexes []int
  168. for _, b := range blocks {
  169. indexes = append(indexes, b.Block.Index)
  170. }
  171. outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
  172. return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
  173. filesCloser()
  174. outputsCloser()
  175. }), nil
  176. }
  177. // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
  178. if osc == math.MaxFloat64 {
  179. return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
  180. }
  181. return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash)
  182. }
  183. func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) {
  184. var nodeIDs []cdssdk.NodeID
  185. for _, id := range obj.PinnedAt {
  186. if !lo.Contains(nodeIDs, id) {
  187. nodeIDs = append(nodeIDs, id)
  188. }
  189. }
  190. for _, b := range obj.Blocks {
  191. if !lo.Contains(nodeIDs, b.NodeID) {
  192. nodeIDs = append(nodeIDs, b.NodeID)
  193. }
  194. }
  195. downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo)
  196. for _, id := range obj.PinnedAt {
  197. node, ok := downloadNodeMap[id]
  198. if !ok {
  199. mod := iter.allNodes[id]
  200. node = &DownloadNodeInfo{
  201. Node: mod,
  202. ObjectPinned: true,
  203. Distance: iter.getNodeDistance(mod),
  204. }
  205. downloadNodeMap[id] = node
  206. }
  207. node.ObjectPinned = true
  208. }
  209. for _, b := range obj.Blocks {
  210. node, ok := downloadNodeMap[b.NodeID]
  211. if !ok {
  212. mod := iter.allNodes[b.NodeID]
  213. node = &DownloadNodeInfo{
  214. Node: mod,
  215. Distance: iter.getNodeDistance(mod),
  216. }
  217. downloadNodeMap[b.NodeID] = node
  218. }
  219. node.Blocks = append(node.Blocks, b)
  220. }
  221. return sort2.Sort(lo.Values(downloadNodeMap), func(left, right *DownloadNodeInfo) int {
  222. return sort2.Cmp(left.Distance, right.Distance)
  223. }), nil
  224. }
  225. type downloadBlock struct {
  226. Node cdssdk.Node
  227. Block stgmod.ObjectBlock
  228. }
  229. func (iter *DownloadObjectIterator) getMinReadingBlockSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, []downloadBlock) {
  230. gotBlocksMap := bitmap.Bitmap64(0)
  231. var gotBlocks []downloadBlock
  232. dist := float64(0.0)
  233. for _, n := range sortedNodes {
  234. for _, b := range n.Blocks {
  235. if !gotBlocksMap.Get(b.Index) {
  236. gotBlocks = append(gotBlocks, downloadBlock{
  237. Node: n.Node,
  238. Block: b,
  239. })
  240. gotBlocksMap.Set(b.Index, true)
  241. dist += n.Distance
  242. }
  243. if len(gotBlocks) >= k {
  244. return dist, gotBlocks
  245. }
  246. }
  247. }
  248. return math.MaxFloat64, gotBlocks
  249. }
  250. func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, *cdssdk.Node) {
  251. dist := math.MaxFloat64
  252. var downloadNode *cdssdk.Node
  253. for _, n := range sortedNodes {
  254. if n.ObjectPinned && float64(k)*n.Distance < dist {
  255. dist = float64(k) * n.Distance
  256. downloadNode = &n.Node
  257. }
  258. }
  259. return dist, downloadNode
  260. }
  261. func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 {
  262. if stgglb.Local.NodeID != nil {
  263. if node.NodeID == *stgglb.Local.NodeID {
  264. return consts.NodeDistanceSameNode
  265. }
  266. }
  267. if node.LocationID == stgglb.Local.LocationID {
  268. return consts.NodeDistanceSameLocation
  269. }
  270. return consts.NodeDistanceOther
  271. }
  272. func downloadFile(ctx *DownloadContext, node cdssdk.Node, fileHash string) (io.ReadCloser, error) {
  273. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  274. nodeIP := node.ExternalIP
  275. grpcPort := node.ExternalGRPCPort
  276. if node.LocationID == stgglb.Local.LocationID {
  277. nodeIP = node.LocalIP
  278. grpcPort = node.LocalGRPCPort
  279. logger.Infof("client and node %d are at the same location, use local ip", node.NodeID)
  280. }
  281. if stgglb.IPFSPool != nil {
  282. logger.Infof("try to use local IPFS to download file")
  283. reader, err := downloadFromLocalIPFS(ctx, fileHash)
  284. if err == nil {
  285. return reader, nil
  286. }
  287. logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
  288. }
  289. return downloadFromNode(ctx, node.NodeID, nodeIP, grpcPort, fileHash)
  290. }
  291. func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {
  292. agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
  293. if err != nil {
  294. return nil, fmt.Errorf("new agent grpc client: %w", err)
  295. }
  296. reader, err := agtCli.GetIPFSFile(fileHash)
  297. if err != nil {
  298. return nil, fmt.Errorf("getting ipfs file: %w", err)
  299. }
  300. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  301. agtCli.Close()
  302. })
  303. return reader, nil
  304. }
  305. func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
  306. ipfsCli, err := stgglb.IPFSPool.Acquire()
  307. if err != nil {
  308. return nil, fmt.Errorf("new ipfs client: %w", err)
  309. }
  310. reader, err := ipfsCli.OpenRead(fileHash)
  311. if err != nil {
  312. return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
  313. }
  314. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  315. ipfsCli.Close()
  316. })
  317. return reader, nil
  318. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。