You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ec.go 8.1 kB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package ops2
  2. import (
  3. "fmt"
  4. "io"
  5. "gitlink.org.cn/cloudream/common/pkgs/future"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
  9. "gitlink.org.cn/cloudream/common/utils/io2"
  10. "gitlink.org.cn/cloudream/common/utils/math2"
  11. "gitlink.org.cn/cloudream/common/utils/sync2"
  12. clitypes "gitlink.org.cn/cloudream/storage2/client/types"
  13. "gitlink.org.cn/cloudream/storage2/common/pkgs/ec"
  14. "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
  15. "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types"
  16. )
  17. func init() {
  18. exec.UseOp[*ECMultiply]()
  19. exec.UseOp[*CallECMultiplier]()
  20. }
  21. type ECMultiply struct {
  22. Coef [][]byte `json:"coef"`
  23. Inputs []exec.VarID `json:"inputs"`
  24. Outputs []exec.VarID `json:"outputs"`
  25. ChunkSize int `json:"chunkSize"`
  26. }
  27. func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  28. inputs, err := exec.BindArray[*exec.StreamValue](e, ctx.Context, o.Inputs)
  29. if err != nil {
  30. return err
  31. }
  32. defer func() {
  33. for _, s := range inputs {
  34. s.Stream.Close()
  35. }
  36. }()
  37. outputWrs := make([]*io.PipeWriter, len(o.Outputs))
  38. outputVars := make([]*exec.StreamValue, len(o.Outputs))
  39. for i := range o.Outputs {
  40. rd, wr := io.Pipe()
  41. outputVars[i] = &exec.StreamValue{Stream: rd}
  42. outputWrs[i] = wr
  43. }
  44. inputChunks := make([][]byte, len(o.Inputs))
  45. for i := range o.Inputs {
  46. inputChunks[i] = make([]byte, math2.Min(o.ChunkSize, 64*1024))
  47. }
  48. // 输出用两个缓冲轮换
  49. outputBufPool := sync2.NewBucketPool[[][]byte]()
  50. for i := 0; i < 2; i++ {
  51. outputChunks := make([][]byte, len(o.Outputs))
  52. for i := range o.Outputs {
  53. outputChunks[i] = make([]byte, math2.Min(o.ChunkSize, 64*1024))
  54. }
  55. outputBufPool.PutEmpty(outputChunks)
  56. }
  57. fut := future.NewSetVoid()
  58. go func() {
  59. mul := ec.GaloisMultiplier().BuildGalois()
  60. defer outputBufPool.Close()
  61. readLens := math2.SplitLessThan(o.ChunkSize, 64*1024)
  62. readLenIdx := 0
  63. for {
  64. curReadLen := readLens[readLenIdx]
  65. for i := range inputChunks {
  66. inputChunks[i] = inputChunks[i][:curReadLen]
  67. }
  68. err := sync2.ParallelDo(inputs, func(s *exec.StreamValue, i int) error {
  69. _, err := io.ReadFull(s.Stream, inputChunks[i])
  70. return err
  71. })
  72. if err == io.EOF {
  73. fut.SetVoid()
  74. return
  75. }
  76. if err != nil {
  77. fut.SetError(err)
  78. return
  79. }
  80. outputBuf, ok := outputBufPool.GetEmpty()
  81. if !ok {
  82. return
  83. }
  84. for i := range outputBuf {
  85. outputBuf[i] = outputBuf[i][:curReadLen]
  86. }
  87. err = mul.Multiply(o.Coef, inputChunks, outputBuf)
  88. if err != nil {
  89. fut.SetError(err)
  90. return
  91. }
  92. outputBufPool.PutFilled(outputBuf)
  93. readLenIdx = (readLenIdx + 1) % len(readLens)
  94. }
  95. }()
  96. go func() {
  97. defer outputBufPool.Close()
  98. for {
  99. outputChunks, ok := outputBufPool.GetFilled()
  100. if !ok {
  101. return
  102. }
  103. for i := range o.Outputs {
  104. err := io2.WriteAll(outputWrs[i], outputChunks[i])
  105. if err != nil {
  106. fut.SetError(err)
  107. return
  108. }
  109. }
  110. outputBufPool.PutEmpty(outputChunks)
  111. }
  112. }()
  113. exec.PutArray(e, o.Outputs, outputVars)
  114. err = fut.Wait(ctx.Context)
  115. if err != nil {
  116. for _, wr := range outputWrs {
  117. wr.CloseWithError(err)
  118. }
  119. return err
  120. }
  121. for _, wr := range outputWrs {
  122. wr.Close()
  123. }
  124. return nil
  125. }
  126. func (o *ECMultiply) String() string {
  127. return fmt.Sprintf(
  128. "ECMultiply(coef=%v) (%v) -> (%v)",
  129. o.Coef,
  130. utils.FormatVarIDs(o.Inputs),
  131. utils.FormatVarIDs(o.Outputs),
  132. )
  133. }
  134. type CallECMultiplier struct {
  135. UserSpace clitypes.UserSpaceDetail
  136. Coef [][]byte
  137. Inputs []exec.VarID
  138. Outputs []exec.VarID
  139. BypassCallbacks []exec.VarID
  140. ChunkSize int
  141. }
  142. func (o *CallECMultiplier) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  143. stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
  144. if err != nil {
  145. return fmt.Errorf("getting storage pool: %w", err)
  146. }
  147. ecMul, err := stgPool.GetECMultiplier(&o.UserSpace)
  148. if err != nil {
  149. return err
  150. }
  151. inputs, err := exec.BindArray[*HTTPRequestValue](e, ctx.Context, o.Inputs)
  152. if err != nil {
  153. return err
  154. }
  155. reqs := make([]types.HTTPRequest, 0, len(inputs))
  156. for _, input := range inputs {
  157. reqs = append(reqs, input.HTTPRequest)
  158. }
  159. outputs, err := ecMul.Multiply(o.Coef, reqs, o.ChunkSize)
  160. if err != nil {
  161. return err
  162. }
  163. defer ecMul.Abort()
  164. outputVals := make([]*BypassUploadedFileValue, 0, len(outputs))
  165. for _, output := range outputs {
  166. outputVals = append(outputVals, &BypassUploadedFileValue{
  167. BypassUploadedFile: output,
  168. })
  169. }
  170. exec.PutArray(e, o.Outputs, outputVals)
  171. callbacks, err := exec.BindArray[*BypassHandleResultValue](e, ctx.Context, o.BypassCallbacks)
  172. if err != nil {
  173. return err
  174. }
  175. allSuc := true
  176. for _, callback := range callbacks {
  177. if !callback.Commited {
  178. allSuc = false
  179. }
  180. }
  181. if allSuc {
  182. ecMul.Complete()
  183. }
  184. return nil
  185. }
  186. func (o *CallECMultiplier) String() string {
  187. return fmt.Sprintf(
  188. "CallECMultiplier(userSpace=%v, coef=%v) (%v) -> (%v)",
  189. o.Coef,
  190. o.UserSpace,
  191. utils.FormatVarIDs(o.Inputs),
  192. utils.FormatVarIDs(o.Outputs),
  193. )
  194. }
  195. type ECMultiplyNode struct {
  196. dag.NodeBase
  197. EC clitypes.ECRedundancy
  198. InputIndexes []int
  199. OutputIndexes []int
  200. }
  201. func (b *GraphNodeBuilder) NewECMultiply(ec clitypes.ECRedundancy) *ECMultiplyNode {
  202. node := &ECMultiplyNode{
  203. EC: ec,
  204. }
  205. b.AddNode(node)
  206. return node
  207. }
  208. func (t *ECMultiplyNode) AddInput(str *dag.StreamVar, dataIndex int) {
  209. t.InputIndexes = append(t.InputIndexes, dataIndex)
  210. idx := t.InputStreams().EnlargeOne()
  211. str.To(t, idx)
  212. }
  213. func (t *ECMultiplyNode) RemoveAllInputs() {
  214. t.InputStreams().ClearAllInput(t)
  215. t.InputStreams().Slots.Resize(0)
  216. t.InputIndexes = nil
  217. }
  218. func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar {
  219. t.OutputIndexes = append(t.OutputIndexes, dataIndex)
  220. return t.OutputStreams().AppendNew(t).Var()
  221. }
  222. func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) {
  223. rs, err := ec.NewRs(t.EC.K, t.EC.N)
  224. if err != nil {
  225. return nil, err
  226. }
  227. coef, err := rs.GenerateMatrix(t.InputIndexes, t.OutputIndexes)
  228. if err != nil {
  229. return nil, err
  230. }
  231. return &ECMultiply{
  232. Coef: coef,
  233. Inputs: t.InputStreams().GetVarIDs(),
  234. Outputs: t.OutputStreams().GetVarIDs(),
  235. ChunkSize: t.EC.ChunkSize,
  236. }, nil
  237. }
  238. // func (t *MultiplyType) String() string {
  239. // return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node))
  240. // }
  241. type CallECMultiplierNode struct {
  242. dag.NodeBase
  243. UserSpace clitypes.UserSpaceDetail
  244. EC clitypes.ECRedundancy
  245. InputIndexes []int
  246. OutputIndexes []int
  247. }
  248. func (b *GraphNodeBuilder) NewCallECMultiplier(userSpace clitypes.UserSpaceDetail) *CallECMultiplierNode {
  249. node := &CallECMultiplierNode{
  250. UserSpace: userSpace,
  251. }
  252. b.AddNode(node)
  253. return node
  254. }
  255. func (t *CallECMultiplierNode) InitFrom(node *ECMultiplyNode) {
  256. t.EC = node.EC
  257. t.InputIndexes = node.InputIndexes
  258. t.OutputIndexes = node.OutputIndexes
  259. t.InputValues().Init(len(t.InputIndexes) + len(t.OutputIndexes)) // 流的输出+回调的输入
  260. t.OutputValues().Init(t, len(t.OutputIndexes))
  261. }
  262. func (t *CallECMultiplierNode) InputSlot(idx int) dag.ValueInputSlot {
  263. return dag.ValueInputSlot{
  264. Node: t,
  265. Index: idx,
  266. }
  267. }
  268. func (t *CallECMultiplierNode) OutputVar(idx int) dag.ValueOutputSlot {
  269. return dag.ValueOutputSlot{
  270. Node: t,
  271. Index: idx,
  272. }
  273. }
  274. func (t *CallECMultiplierNode) BypassCallbackSlot(idx int) dag.ValueInputSlot {
  275. return dag.ValueInputSlot{
  276. Node: t,
  277. Index: idx + len(t.InputIndexes),
  278. }
  279. }
  280. func (t *CallECMultiplierNode) GenerateOp() (exec.Op, error) {
  281. rs, err := ec.NewRs(t.EC.K, t.EC.N)
  282. if err != nil {
  283. return nil, err
  284. }
  285. coef, err := rs.GenerateMatrix(t.InputIndexes, t.OutputIndexes)
  286. if err != nil {
  287. return nil, err
  288. }
  289. return &CallECMultiplier{
  290. UserSpace: t.UserSpace,
  291. Coef: coef,
  292. Inputs: t.InputValues().GetVarIDsRanged(0, len(t.InputIndexes)),
  293. Outputs: t.OutputValues().GetVarIDs(),
  294. BypassCallbacks: t.InputValues().GetVarIDsStart(len(t.InputIndexes)),
  295. ChunkSize: t.EC.ChunkSize,
  296. }, nil
  297. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。