You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

download_object_iterator.go 8.5 kB

2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package iterator
  2. import (
  3. "fmt"
  4. "io"
  5. "math/rand"
  6. "reflect"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  10. myio "gitlink.org.cn/cloudream/common/utils/io"
  11. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  12. stgmodels "gitlink.org.cn/cloudream/storage/common/models"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  16. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  17. )
  18. type DownloadingObjectIterator = Iterator[*IterDownloadingObject]
  19. type IterDownloadingObject struct {
  20. Object model.Object
  21. File io.ReadCloser
  22. }
  23. type DownloadNodeInfo struct {
  24. Node model.Node
  25. IsSameLocation bool
  26. }
  27. type DownloadContext struct {
  28. Distlock *distlock.Service
  29. }
  30. type DownloadObjectIterator struct {
  31. OnClosing func()
  32. objectDetails []stgmodels.ObjectDetail
  33. currentIndex int
  34. downloadCtx *DownloadContext
  35. }
  36. func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator {
  37. return &DownloadObjectIterator{
  38. objectDetails: objectDetails,
  39. downloadCtx: downloadCtx,
  40. }
  41. }
  42. func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) {
  43. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  44. if err != nil {
  45. return nil, fmt.Errorf("new coordinator client: %w", err)
  46. }
  47. defer stgglb.CoordinatorMQPool.Release(coorCli)
  48. if i.currentIndex >= len(i.objectDetails) {
  49. return nil, ErrNoMoreItem
  50. }
  51. item, err := i.doMove(coorCli)
  52. i.currentIndex++
  53. return item, err
  54. }
  55. func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
  56. obj := iter.objectDetails[iter.currentIndex]
  57. switch red := obj.Object.Redundancy.(type) {
  58. case *cdssdk.NoneRedundancy:
  59. reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj)
  60. if err != nil {
  61. return nil, fmt.Errorf("downloading object: %w", err)
  62. }
  63. return &IterDownloadingObject{
  64. Object: obj.Object,
  65. File: reader,
  66. }, nil
  67. case *cdssdk.RepRedundancy:
  68. reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj)
  69. if err != nil {
  70. return nil, fmt.Errorf("downloading rep object: %w", err)
  71. }
  72. return &IterDownloadingObject{
  73. Object: obj.Object,
  74. File: reader,
  75. }, nil
  76. case *cdssdk.ECRedundancy:
  77. reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red)
  78. if err != nil {
  79. return nil, fmt.Errorf("downloading ec object: %w", err)
  80. }
  81. return &IterDownloadingObject{
  82. Object: obj.Object,
  83. File: reader,
  84. }, nil
  85. }
  86. return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy))
  87. }
  88. func (i *DownloadObjectIterator) Close() {
  89. if i.OnClosing != nil {
  90. i.OnClosing()
  91. }
  92. }
  93. // chooseDownloadNode 选择一个下载节点
  94. // 1. 从与当前客户端相同地域的节点中随机选一个
  95. // 2. 没有用的话从所有节点中随机选一个
  96. func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo {
  97. sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation })
  98. if len(sameLocationEntries) > 0 {
  99. return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
  100. }
  101. return entries[rand.Intn(len(entries))]
  102. }
  103. func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
  104. //采取直接读,优先选内网节点
  105. var chosenNodes []DownloadNodeInfo
  106. for i := range obj.Blocks {
  107. if len(obj.Blocks[i].CachedNodeIDs) == 0 {
  108. return nil, fmt.Errorf("no node has block %d", obj.Blocks[i].Index)
  109. }
  110. getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs))
  111. if err != nil {
  112. continue
  113. }
  114. downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
  115. return DownloadNodeInfo{
  116. Node: node,
  117. IsSameLocation: node.LocationID == stgglb.Local.LocationID,
  118. }
  119. })
  120. chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))
  121. }
  122. var fileStrs []io.ReadCloser
  123. for i := range obj.Blocks {
  124. str, err := downloadFile(ctx, chosenNodes[i], obj.Blocks[i].FileHash)
  125. if err != nil {
  126. for i -= 1; i >= 0; i-- {
  127. fileStrs[i].Close()
  128. }
  129. return nil, fmt.Errorf("donwloading file: %w", err)
  130. }
  131. fileStrs = append(fileStrs, str)
  132. }
  133. fileReaders, filesCloser := myio.ToReaders(fileStrs)
  134. return myio.AfterReadClosed(myio.Length(myio.Join(fileReaders), obj.Object.Size), func(c io.ReadCloser) {
  135. filesCloser()
  136. }), nil
  137. }
  138. func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
  139. //采取直接读,优先选内网节点
  140. var chosenNodes []DownloadNodeInfo
  141. var chosenBlocks []stgmodels.ObjectBlockDetail
  142. for i := range obj.Blocks {
  143. if len(chosenBlocks) == ecRed.K {
  144. break
  145. }
  146. // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行
  147. if len(obj.Blocks[i].CachedNodeIDs) == 0 {
  148. continue
  149. }
  150. getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs))
  151. if err != nil {
  152. continue
  153. }
  154. downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
  155. return DownloadNodeInfo{
  156. Node: node,
  157. IsSameLocation: node.LocationID == stgglb.Local.LocationID,
  158. }
  159. })
  160. chosenBlocks = append(chosenBlocks, obj.Blocks[i])
  161. chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))
  162. }
  163. if len(chosenBlocks) < ecRed.K {
  164. return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks))
  165. }
  166. var fileStrs []io.ReadCloser
  167. rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
  168. if err != nil {
  169. return nil, fmt.Errorf("new rs: %w", err)
  170. }
  171. for i := range chosenBlocks {
  172. str, err := downloadFile(ctx, chosenNodes[i], chosenBlocks[i].FileHash)
  173. if err != nil {
  174. for i -= 1; i >= 0; i-- {
  175. fileStrs[i].Close()
  176. }
  177. return nil, fmt.Errorf("donwloading file: %w", err)
  178. }
  179. fileStrs = append(fileStrs, str)
  180. }
  181. fileReaders, filesCloser := myio.ToReaders(fileStrs)
  182. var indexes []int
  183. for _, b := range chosenBlocks {
  184. indexes = append(indexes, b.Index)
  185. }
  186. outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
  187. return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
  188. filesCloser()
  189. outputsCloser()
  190. }), nil
  191. }
  192. func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) {
  193. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  194. nodeIP := node.Node.ExternalIP
  195. grpcPort := node.Node.ExternalGRPCPort
  196. if node.IsSameLocation {
  197. nodeIP = node.Node.LocalIP
  198. grpcPort = node.Node.LocalGRPCPort
  199. logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID)
  200. }
  201. if stgglb.IPFSPool != nil {
  202. logger.Infof("try to use local IPFS to download file")
  203. reader, err := downloadFromLocalIPFS(ctx, fileHash)
  204. if err == nil {
  205. return reader, nil
  206. }
  207. logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
  208. }
  209. return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash)
  210. }
  211. func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {
  212. agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
  213. if err != nil {
  214. return nil, fmt.Errorf("new agent grpc client: %w", err)
  215. }
  216. reader, err := agtCli.GetIPFSFile(fileHash)
  217. if err != nil {
  218. return nil, fmt.Errorf("getting ipfs file: %w", err)
  219. }
  220. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  221. agtCli.Close()
  222. })
  223. return reader, nil
  224. }
  225. func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
  226. ipfsCli, err := stgglb.IPFSPool.Acquire()
  227. if err != nil {
  228. return nil, fmt.Errorf("new ipfs client: %w", err)
  229. }
  230. reader, err := ipfsCli.OpenRead(fileHash)
  231. if err != nil {
  232. return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
  233. }
  234. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  235. ipfsCli.Close()
  236. })
  237. return reader, nil
  238. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。