You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ec_object_iterator.go 6.5 kB

2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package iterator
  2. import (
  3. "fmt"
  4. "io"
  5. "math/rand"
  6. "os"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
  10. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  11. stgmodels "gitlink.org.cn/cloudream/storage/common/models"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  14. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  15. )
  16. type ECObjectIterator struct {
  17. OnClosing func()
  18. objects []model.Object
  19. objectECData []stgmodels.ObjectECData
  20. currentIndex int
  21. inited bool
  22. ecInfo stgsdk.ECRedundancyInfo
  23. ec model.Ec
  24. downloadCtx *DownloadContext
  25. cliLocation model.Location
  26. }
  27. func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo stgsdk.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator {
  28. return &ECObjectIterator{
  29. objects: objects,
  30. objectECData: objectECData,
  31. ecInfo: ecInfo,
  32. ec: ec,
  33. downloadCtx: downloadCtx,
  34. }
  35. }
  36. func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) {
  37. // TODO 加锁
  38. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  39. if err != nil {
  40. return nil, fmt.Errorf("new coordinator client: %w", err)
  41. }
  42. defer stgglb.CoordinatorMQPool.Release(coorCli)
  43. if !i.inited {
  44. i.inited = true
  45. findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
  46. if err != nil {
  47. return nil, fmt.Errorf("finding client location: %w", err)
  48. }
  49. i.cliLocation = findCliLocResp.Location
  50. }
  51. if i.currentIndex >= len(i.objects) {
  52. return nil, ErrNoMoreItem
  53. }
  54. item, err := i.doMove(coorCli)
  55. i.currentIndex++
  56. return item, err
  57. }
  58. func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
  59. obj := iter.objects[iter.currentIndex]
  60. ecData := iter.objectECData[iter.currentIndex]
  61. blocks := ecData.Blocks
  62. ec := iter.ec
  63. ecK := ec.EcK
  64. ecN := ec.EcN
  65. //采取直接读,优先选内网节点
  66. hashs := make([]string, ecK)
  67. nds := make([]DownloadNodeInfo, ecK)
  68. for i := 0; i < ecK; i++ {
  69. hashs[i] = blocks[i].FileHash
  70. getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(blocks[i].NodeIDs))
  71. if err != nil {
  72. return nil, fmt.Errorf("getting nodes: %w", err)
  73. }
  74. downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
  75. return DownloadNodeInfo{
  76. Node: node,
  77. IsSameLocation: node.LocationID == iter.cliLocation.LocationID,
  78. }
  79. })
  80. nds[i] = iter.chooseDownloadNode(downloadNodes)
  81. }
  82. //nodeIDs, nodeIPs直接按照第1~ecK个排列
  83. nodeIDs := make([]int64, ecK)
  84. nodeIPs := make([]string, ecK)
  85. grpcPorts := make([]int, ecK)
  86. for i := 0; i < ecK; i++ {
  87. nodeIDs[i] = nds[i].Node.NodeID
  88. nodeIPs[i] = nds[i].Node.ExternalIP
  89. grpcPorts[i] = nds[i].Node.ExternalGRPCPort
  90. if nds[i].IsSameLocation {
  91. nodeIPs[i] = nds[i].Node.LocalIP
  92. grpcPorts[i] = nds[i].Node.LocalGRPCPort
  93. logger.Infof("client and node %d are at the same location, use local ip", nds[i].Node.NodeID)
  94. }
  95. }
  96. fileSize := obj.Size
  97. blockIDs := make([]int, ecK)
  98. for i := 0; i < ecK; i++ {
  99. blockIDs[i] = i
  100. }
  101. reader, err := iter.downloadEcObject(fileSize, ecK, ecN, blockIDs, nodeIDs, nodeIPs, grpcPorts, hashs)
  102. if err != nil {
  103. return nil, fmt.Errorf("ec read failed, err: %w", err)
  104. }
  105. return &IterDownloadingObject{
  106. File: reader,
  107. }, nil
  108. }
  109. func (i *ECObjectIterator) Close() {
  110. if i.OnClosing != nil {
  111. i.OnClosing()
  112. }
  113. }
  114. // chooseDownloadNode 选择一个下载节点
  115. // 1. 从与当前客户端相同地域的节点中随机选一个
  116. // 2. 没有用的话从所有节点中随机选一个
  117. func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo {
  118. sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation })
  119. if len(sameLocationEntries) > 0 {
  120. return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
  121. }
  122. return entries[rand.Intn(len(entries))]
  123. }
  124. func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, grpcPorts []int, hashs []string) (io.ReadCloser, error) {
  125. // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构
  126. //wg := sync.WaitGroup{}
  127. numPacket := (fileSize + int64(ecK)*iter.ecInfo.PacketSize - 1) / (int64(ecK) * iter.ecInfo.PacketSize)
  128. getBufs := make([]chan []byte, ecN)
  129. decodeBufs := make([]chan []byte, ecK)
  130. for i := 0; i < ecN; i++ {
  131. getBufs[i] = make(chan []byte)
  132. }
  133. for i := 0; i < ecK; i++ {
  134. decodeBufs[i] = make(chan []byte)
  135. }
  136. for idx := 0; idx < len(blockIDs); idx++ {
  137. i := idx
  138. go func() {
  139. // TODO 处理错误
  140. file, _ := downloadFile(iter.downloadCtx, nodeIDs[i], nodeIPs[i], grpcPorts[i], hashs[i])
  141. for p := int64(0); p < numPacket; p++ {
  142. buf := make([]byte, iter.ecInfo.PacketSize)
  143. // TODO 处理错误
  144. io.ReadFull(file, buf)
  145. getBufs[blockIDs[i]] <- buf
  146. }
  147. }()
  148. }
  149. print(numPacket)
  150. go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket)
  151. r, w := io.Pipe()
  152. //persist函数,将解码得到的文件写入pipe
  153. go func() {
  154. for i := 0; int64(i) < numPacket; i++ {
  155. for j := 0; j < len(decodeBufs); j++ {
  156. tmp := <-decodeBufs[j]
  157. _, err := w.Write(tmp)
  158. if err != nil {
  159. fmt.Errorf("persist file falied, err:%w", err)
  160. }
  161. }
  162. }
  163. w.Close()
  164. }()
  165. return r, nil
  166. }
  167. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
  168. fmt.Println("decode ")
  169. var tmpIn [][]byte
  170. var zeroPkt []byte
  171. tmpIn = make([][]byte, len(inBufs))
  172. hasBlock := map[int]bool{}
  173. for j := 0; j < len(blockSeq); j++ {
  174. hasBlock[blockSeq[j]] = true
  175. }
  176. needRepair := false //检测是否传入了所有数据块
  177. for j := 0; j < len(outBufs); j++ {
  178. if blockSeq[j] != j {
  179. needRepair = true
  180. }
  181. }
  182. enc := ec.NewRsEnc(ecK, len(inBufs))
  183. for i := 0; int64(i) < numPacket; i++ {
  184. print("!!!!!")
  185. for j := 0; j < len(inBufs); j++ {
  186. if hasBlock[j] {
  187. tmpIn[j] = <-inBufs[j]
  188. } else {
  189. tmpIn[j] = zeroPkt
  190. }
  191. }
  192. if needRepair {
  193. err := enc.Repair(tmpIn)
  194. if err != nil {
  195. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  196. }
  197. }
  198. for j := 0; j < len(outBufs); j++ {
  199. outBufs[j] <- tmpIn[j]
  200. }
  201. }
  202. for i := 0; i < len(outBufs); i++ {
  203. close(outBufs[i])
  204. }
  205. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。