You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

stream_rs.go 6.2 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package ec
  2. import (
  3. "io"
  4. "github.com/klauspost/reedsolomon"
  5. "gitlink.org.cn/cloudream/common/utils/io2"
  6. )
  7. type StreamRs struct {
  8. encoder reedsolomon.Encoder
  9. ecN int
  10. ecK int
  11. ecP int
  12. chunkSize int
  13. }
  14. func NewStreamRs(k int, n int, chunkSize int) (*StreamRs, error) {
  15. enc := StreamRs{
  16. ecN: n,
  17. ecK: k,
  18. ecP: n - k,
  19. chunkSize: chunkSize,
  20. }
  21. encoder, err := reedsolomon.New(k, n-k)
  22. enc.encoder = encoder
  23. return &enc, err
  24. }
  25. // 编码。仅输出校验块
  26. func (r *StreamRs) Encode(input []io.Reader) []io.ReadCloser {
  27. outReaders := make([]io.ReadCloser, r.ecP)
  28. outWriters := make([]*io.PipeWriter, r.ecP)
  29. for i := 0; i < r.ecP; i++ {
  30. outReaders[i], outWriters[i] = io.Pipe()
  31. }
  32. go func() {
  33. chunks := make([][]byte, r.ecN)
  34. for idx := 0; idx < r.ecN; idx++ {
  35. chunks[idx] = make([]byte, r.chunkSize)
  36. }
  37. var closeErr error
  38. loop:
  39. for {
  40. //读块到buff
  41. for i := 0; i < r.ecK; i++ {
  42. _, err := io.ReadFull(input[i], chunks[i])
  43. if err != nil {
  44. closeErr = err
  45. break loop
  46. }
  47. }
  48. err := r.encoder.Encode(chunks)
  49. if err != nil {
  50. return
  51. }
  52. //输出到outWriter
  53. for i := range outWriters {
  54. err := io2.WriteAll(outWriters[i], chunks[i+r.ecK])
  55. if err != nil {
  56. closeErr = err
  57. break loop
  58. }
  59. }
  60. }
  61. for i := range outWriters {
  62. outWriters[i].CloseWithError(closeErr)
  63. }
  64. }()
  65. return outReaders
  66. }
  67. // 编码。输出包含所有的数据块和校验块
  68. func (r *StreamRs) EncodeAll(input []io.Reader) []io.ReadCloser {
  69. outReaders := make([]io.ReadCloser, r.ecN)
  70. outWriters := make([]*io.PipeWriter, r.ecN)
  71. for i := 0; i < r.ecN; i++ {
  72. outReaders[i], outWriters[i] = io.Pipe()
  73. }
  74. go func() {
  75. chunks := make([][]byte, r.ecN)
  76. for idx := 0; idx < r.ecN; idx++ {
  77. chunks[idx] = make([]byte, r.chunkSize)
  78. }
  79. var closeErr error
  80. loop:
  81. for {
  82. //读块到buff
  83. for i := 0; i < r.ecK; i++ {
  84. _, err := io.ReadFull(input[i], chunks[i])
  85. if err != nil {
  86. closeErr = err
  87. break loop
  88. }
  89. }
  90. err := r.encoder.Encode(chunks)
  91. if err != nil {
  92. return
  93. }
  94. //输出到outWriter
  95. for i := range outWriters {
  96. err := io2.WriteAll(outWriters[i], chunks[i])
  97. if err != nil {
  98. closeErr = err
  99. break loop
  100. }
  101. }
  102. }
  103. for i := range outWriters {
  104. outWriters[i].CloseWithError(closeErr)
  105. }
  106. }()
  107. return outReaders
  108. }
  109. // 降级读,任意k个块恢复出所有原始的数据块。
  110. func (r *StreamRs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadCloser {
  111. outIndexes := make([]int, r.ecK)
  112. for i := 0; i < r.ecK; i++ {
  113. outIndexes[i] = i
  114. }
  115. return r.ReconstructSome(input, inBlockIdx, outIndexes)
  116. }
  117. // 修复,任意k个块恢复指定的数据块。
  118. // 调用者应该保证input的每一个流长度相同,且均为chunkSize的整数倍
  119. func (r *StreamRs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) []io.ReadCloser {
  120. outReaders := make([]io.ReadCloser, len(outBlockIdx))
  121. outWriters := make([]*io.PipeWriter, len(outBlockIdx))
  122. for i := 0; i < len(outBlockIdx); i++ {
  123. outReaders[i], outWriters[i] = io.Pipe()
  124. }
  125. go func() {
  126. chunks := make([][]byte, r.ecN)
  127. // 只初始化输入的buf,输出的buf在调用重建函数之后,会自动建立出来
  128. for _, idx := range inBlockIdx {
  129. chunks[idx] = make([]byte, r.chunkSize)
  130. }
  131. //outBools:要输出的若干块idx
  132. outBools := make([]bool, r.ecN)
  133. for _, idx := range outBlockIdx {
  134. outBools[idx] = true
  135. }
  136. inBools := make([]bool, r.ecN)
  137. for _, idx := range inBlockIdx {
  138. inBools[idx] = true
  139. }
  140. var closeErr error
  141. loop:
  142. for {
  143. //读块到buff
  144. for i := 0; i < r.ecK; i++ {
  145. _, err := io.ReadFull(input[i], chunks[inBlockIdx[i]])
  146. if err != nil {
  147. closeErr = err
  148. break loop
  149. }
  150. }
  151. err := r.encoder.ReconstructSome(chunks, outBools)
  152. if err != nil {
  153. return
  154. }
  155. //输出到outWriter
  156. for i := range outBlockIdx {
  157. err := io2.WriteAll(outWriters[i], chunks[outBlockIdx[i]])
  158. if err != nil {
  159. closeErr = err
  160. break loop
  161. }
  162. // 设置buf长度为0,cap不会受影响。注:如果一个块既是输入又是输出,那不能清空这个块
  163. if !inBools[outBlockIdx[i]] {
  164. chunks[outBlockIdx[i]] = chunks[outBlockIdx[i]][:0]
  165. }
  166. }
  167. }
  168. for i := range outWriters {
  169. outWriters[i].CloseWithError(closeErr)
  170. }
  171. }()
  172. return outReaders
  173. }
  174. // 重建任意块,包括数据块和校验块。
  175. // 当前的实现会把不需要的块都重建出来,所以应该避免使用这个函数。
  176. func (r *StreamRs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes []int) []io.ReadCloser {
  177. outReaders := make([]io.ReadCloser, len(outBlockIdxes))
  178. outWriters := make([]*io.PipeWriter, len(outBlockIdxes))
  179. for i := 0; i < len(outBlockIdxes); i++ {
  180. outReaders[i], outWriters[i] = io.Pipe()
  181. }
  182. go func() {
  183. chunks := make([][]byte, r.ecN)
  184. // 只初始化输入的buf,输出的buf在调用重建函数之后,会自动建立出来
  185. for _, idx := range inBlockIdxes {
  186. chunks[idx] = make([]byte, r.chunkSize)
  187. }
  188. //outBools:要输出的若干块idx
  189. outBools := make([]bool, r.ecN)
  190. for _, idx := range outBlockIdxes {
  191. outBools[idx] = true
  192. }
  193. inBools := make([]bool, r.ecN)
  194. for _, idx := range inBlockIdxes {
  195. inBools[idx] = true
  196. }
  197. var closeErr error
  198. loop:
  199. for {
  200. //读块到buff
  201. for i := 0; i < r.ecK; i++ {
  202. _, err := io.ReadFull(input[i], chunks[inBlockIdxes[i]])
  203. if err != nil {
  204. closeErr = err
  205. break loop
  206. }
  207. }
  208. err := r.encoder.Reconstruct(chunks)
  209. if err != nil {
  210. return
  211. }
  212. //输出到outWriter
  213. for i := range outBlockIdxes {
  214. outIndex := outBlockIdxes[i]
  215. err := io2.WriteAll(outWriters[i], chunks[outIndex])
  216. if err != nil {
  217. closeErr = err
  218. break loop
  219. }
  220. // 设置buf长度为0,cap不会受影响。注:如果一个块既是输入又是输出,那不能清空这个块
  221. if !inBools[outIndex] {
  222. chunks[outIndex] = chunks[outIndex][:0]
  223. }
  224. }
  225. }
  226. for i := range outWriters {
  227. outWriters[i].CloseWithError(closeErr)
  228. }
  229. }()
  230. return outReaders
  231. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。