You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

parser.go 31 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago

  1. package parser
  2. import (
  3. "fmt"
  4. "math"
  5. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
  8. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  9. "gitlink.org.cn/cloudream/common/utils/lo2"
  10. "gitlink.org.cn/cloudream/common/utils/math2"
  11. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
  15. )
  16. type IndexedStream struct {
  17. Stream *dag.StreamVar
  18. StreamIndex ioswitch2.StreamIndex
  19. }
  20. type ParseContext struct {
  21. Ft ioswitch2.FromTo
  22. DAG *ops2.GraphNodeBuilder
  23. // 为了产生所有To所需的数据范围,而需要From打开的范围。
  24. // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。
  25. ToNodes map[ioswitch2.To]ops2.ToNode
  26. FromNodes map[ioswitch2.From]ops2.FromNode
  27. IndexedStreams []IndexedStream
  28. StreamRange math2.Range
  29. UseEC bool // 是否使用纠删码
  30. UseSegment bool // 是否使用分段
  31. }
  32. func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
  33. ctx := ParseContext{
  34. Ft: ft,
  35. DAG: ops2.NewGraphNodeBuilder(),
  36. ToNodes: make(map[ioswitch2.To]ops2.ToNode),
  37. FromNodes: make(map[ioswitch2.From]ops2.FromNode),
  38. }
  39. // 分成两个阶段:
  40. // 1. 基于From和To生成更多指令,初步匹配to的需求
  41. err := checkEncodingParams(&ctx)
  42. if err != nil {
  43. return err
  44. }
  45. // 计算一下打开流的范围
  46. calcStreamRange(&ctx)
  47. err = extend(&ctx)
  48. if err != nil {
  49. return err
  50. }
  51. // 2. 优化上一步生成的指令
  52. err = fixSegmentJoin(&ctx)
  53. if err != nil {
  54. return err
  55. }
  56. err = fixSegmentSplit(&ctx)
  57. if err != nil {
  58. return err
  59. }
  60. // 对于删除指令的优化,需要反复进行,直到没有变化为止。
  61. // 从目前实现上来说不会死循环
  62. for {
  63. opted := false
  64. if removeUnusedJoin(&ctx) {
  65. opted = true
  66. }
  67. if removeUnusedMultiplyOutput(&ctx) {
  68. opted = true
  69. }
  70. if removeUnusedSplit(&ctx) {
  71. opted = true
  72. }
  73. if omitSplitJoin(&ctx) {
  74. opted = true
  75. }
  76. if removeUnusedSegmentJoin(&ctx) {
  77. opted = true
  78. }
  79. if removeUnusedSegmentSplit(&ctx) {
  80. opted = true
  81. }
  82. if omitSegmentSplitJoin(&ctx) {
  83. opted = true
  84. }
  85. if !opted {
  86. break
  87. }
  88. }
  89. // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
  90. for pin(&ctx) {
  91. }
  92. // 下面这些只需要执行一次,但需要按顺序
  93. removeUnusedFromNode(&ctx)
  94. useS2STransfer(&ctx)
  95. useMultipartUploadToShardStore(&ctx)
  96. dropUnused(&ctx)
  97. storeShardWriteResult(&ctx)
  98. generateRange(&ctx)
  99. generateClone(&ctx)
  100. return plan.Generate(ctx.DAG.Graph, blder)
  101. }
  102. func findOutputStream(ctx *ParseContext, streamIndex ioswitch2.StreamIndex) *dag.StreamVar {
  103. var ret *dag.StreamVar
  104. for _, s := range ctx.IndexedStreams {
  105. if s.StreamIndex == streamIndex {
  106. ret = s.Stream
  107. break
  108. }
  109. }
  110. return ret
  111. }
  112. // 检查使用不同编码时参数是否设置到位
  113. func checkEncodingParams(ctx *ParseContext) error {
  114. for _, f := range ctx.Ft.Froms {
  115. if f.GetStreamIndex().IsEC() {
  116. ctx.UseEC = true
  117. if ctx.Ft.ECParam == nil {
  118. return fmt.Errorf("EC encoding parameters not set")
  119. }
  120. }
  121. if f.GetStreamIndex().IsSegment() {
  122. ctx.UseSegment = true
  123. if ctx.Ft.SegmentParam == nil {
  124. return fmt.Errorf("segment parameters not set")
  125. }
  126. }
  127. }
  128. for _, t := range ctx.Ft.Toes {
  129. if t.GetStreamIndex().IsEC() {
  130. ctx.UseEC = true
  131. if ctx.Ft.ECParam == nil {
  132. return fmt.Errorf("EC encoding parameters not set")
  133. }
  134. }
  135. if t.GetStreamIndex().IsSegment() {
  136. ctx.UseSegment = true
  137. if ctx.Ft.SegmentParam == nil {
  138. return fmt.Errorf("segment parameters not set")
  139. }
  140. }
  141. }
  142. return nil
  143. }
  144. // 计算输入流的打开范围。如果From或者To中包含EC的流,则会将打开范围扩大到条带大小的整数倍。
  145. func calcStreamRange(ctx *ParseContext) {
  146. rng := math2.NewRange(math.MaxInt64, 0)
  147. for _, to := range ctx.Ft.Toes {
  148. strIdx := to.GetStreamIndex()
  149. if strIdx.IsRaw() {
  150. toRng := to.GetRange()
  151. rng.ExtendStart(toRng.Offset)
  152. if toRng.Length != nil {
  153. rng.ExtendEnd(toRng.Offset + *toRng.Length)
  154. } else {
  155. rng.Length = nil
  156. }
  157. } else if strIdx.IsEC() {
  158. toRng := to.GetRange()
  159. stripSize := ctx.Ft.ECParam.StripSize()
  160. blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.Ft.ECParam.ChunkSize))
  161. rng.ExtendStart(blkStartIndex * stripSize)
  162. if toRng.Length != nil {
  163. blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.Ft.ECParam.ChunkSize))
  164. rng.ExtendEnd(blkEndIndex * stripSize)
  165. } else {
  166. rng.Length = nil
  167. }
  168. } else if strIdx.IsSegment() {
  169. // Segment节点的Range是相对于本段的,需要加上本段的起始位置
  170. toRng := to.GetRange()
  171. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(strIdx.Index)
  172. offset := toRng.Offset + segStart
  173. rng.ExtendStart(offset)
  174. if toRng.Length != nil {
  175. rng.ExtendEnd(offset + *toRng.Length)
  176. } else {
  177. rng.Length = nil
  178. }
  179. }
  180. }
  181. if ctx.UseEC {
  182. stripSize := ctx.Ft.ECParam.StripSize()
  183. rng.ExtendStart(math2.Floor(rng.Offset, stripSize))
  184. if rng.Length != nil {
  185. rng.ExtendEnd(math2.Ceil(rng.Offset+*rng.Length, stripSize))
  186. }
  187. }
  188. ctx.StreamRange = rng
  189. }
  190. func extend(ctx *ParseContext) error {
  191. for _, fr := range ctx.Ft.Froms {
  192. frNode, err := buildFromNode(ctx, fr)
  193. if err != nil {
  194. return err
  195. }
  196. ctx.FromNodes[fr] = frNode
  197. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  198. Stream: frNode.Output().Var(),
  199. StreamIndex: fr.GetStreamIndex(),
  200. })
  201. // 对于完整文件的From,生成Split指令
  202. if fr.GetStreamIndex().IsRaw() {
  203. // 只有输入输出需要EC编码的块时,才生成相关指令
  204. if ctx.UseEC {
  205. splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K)
  206. splitNode.Split(frNode.Output().Var())
  207. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  208. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  209. Stream: splitNode.SubStream(i),
  210. StreamIndex: ioswitch2.ECStream(i),
  211. })
  212. }
  213. }
  214. // 同上
  215. if ctx.UseSegment {
  216. splitNode := ctx.DAG.NewSegmentSplit(ctx.Ft.SegmentParam.Segments)
  217. frNode.Output().Var().ToSlot(splitNode.InputSlot())
  218. for i := 0; i < len(ctx.Ft.SegmentParam.Segments); i++ {
  219. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  220. Stream: splitNode.Segment(i),
  221. StreamIndex: ioswitch2.SegmentStream(i),
  222. })
  223. }
  224. }
  225. }
  226. }
  227. if ctx.UseEC {
  228. // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
  229. ecInputStrs := make(map[int]*dag.StreamVar)
  230. for _, s := range ctx.IndexedStreams {
  231. if s.StreamIndex.IsEC() && ecInputStrs[s.StreamIndex.Index] == nil {
  232. ecInputStrs[s.StreamIndex.Index] = s.Stream
  233. if len(ecInputStrs) == ctx.Ft.ECParam.K {
  234. break
  235. }
  236. }
  237. }
  238. if len(ecInputStrs) == ctx.Ft.ECParam.K {
  239. mulNode := ctx.DAG.NewECMultiply(*ctx.Ft.ECParam)
  240. for i, s := range ecInputStrs {
  241. mulNode.AddInput(s, i)
  242. }
  243. for i := 0; i < ctx.Ft.ECParam.N; i++ {
  244. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  245. Stream: mulNode.NewOutput(i),
  246. StreamIndex: ioswitch2.ECStream(i),
  247. })
  248. }
  249. joinNode := ctx.DAG.NewChunkedJoin(ctx.Ft.ECParam.ChunkSize)
  250. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  251. // 不可能找不到流
  252. joinNode.AddInput(findOutputStream(ctx, ioswitch2.ECStream(i)))
  253. }
  254. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  255. Stream: joinNode.Joined(),
  256. StreamIndex: ioswitch2.RawStream(),
  257. })
  258. }
  259. }
  260. if ctx.UseSegment {
  261. // 先假设有所有的顺序分段,生成Join指令,后续根据Range再实际计算是否缺少流
  262. joinNode := ctx.DAG.NewSegmentJoin(ctx.Ft.SegmentParam.Segments)
  263. for i := 0; i < ctx.Ft.SegmentParam.SegmentCount(); i++ {
  264. str := findOutputStream(ctx, ioswitch2.SegmentStream(i))
  265. if str != nil {
  266. str.ToSlot(joinNode.InputSlot(i))
  267. }
  268. }
  269. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  270. Stream: joinNode.Joined(),
  271. StreamIndex: ioswitch2.RawStream(),
  272. })
  273. // SegmentJoin生成的Join指令可以用来生成EC块
  274. if ctx.UseEC {
  275. splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K)
  276. splitNode.Split(joinNode.Joined())
  277. mulNode := ctx.DAG.NewECMultiply(*ctx.Ft.ECParam)
  278. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  279. mulNode.AddInput(splitNode.SubStream(i), i)
  280. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  281. Stream: splitNode.SubStream(i),
  282. StreamIndex: ioswitch2.ECStream(i),
  283. })
  284. }
  285. for i := 0; i < ctx.Ft.ECParam.N; i++ {
  286. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  287. Stream: mulNode.NewOutput(i),
  288. StreamIndex: ioswitch2.ECStream(i),
  289. })
  290. }
  291. }
  292. }
  293. // 为每一个To找到一个输入流
  294. for _, to := range ctx.Ft.Toes {
  295. toNode, err := buildToNode(ctx, to)
  296. if err != nil {
  297. return err
  298. }
  299. ctx.ToNodes[to] = toNode
  300. str := findOutputStream(ctx, to.GetStreamIndex())
  301. if str == nil {
  302. return fmt.Errorf("no output stream found for data index %d", to.GetStreamIndex())
  303. }
  304. toNode.SetInput(str)
  305. }
  306. return nil
  307. }
  308. func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) {
  309. var repRange math2.Range
  310. repRange.Offset = ctx.StreamRange.Offset
  311. if ctx.StreamRange.Length != nil {
  312. repRngLen := *ctx.StreamRange.Length
  313. repRange.Length = &repRngLen
  314. }
  315. var blkRange math2.Range
  316. if ctx.UseEC {
  317. blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.Ft.ECParam.ChunkSize*ctx.Ft.ECParam.K) * int64(ctx.Ft.ECParam.ChunkSize)
  318. if ctx.StreamRange.Length != nil {
  319. blkRngLen := *ctx.StreamRange.Length / int64(ctx.Ft.ECParam.ChunkSize*ctx.Ft.ECParam.K) * int64(ctx.Ft.ECParam.ChunkSize)
  320. blkRange.Length = &blkRngLen
  321. }
  322. }
  323. switch f := f.(type) {
  324. case *ioswitch2.FromShardstore:
  325. t := ctx.DAG.NewShardRead(f, f.Storage.Storage.StorageID, types.NewOpen(f.FileHash))
  326. if f.StreamIndex.IsRaw() {
  327. t.Open.WithNullableLength(repRange.Offset, repRange.Length)
  328. } else if f.StreamIndex.IsEC() {
  329. t.Open.WithNullableLength(blkRange.Offset, blkRange.Length)
  330. } else if f.StreamIndex.IsSegment() {
  331. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(f.StreamIndex.Index)
  332. segLen := ctx.Ft.SegmentParam.Segments[f.StreamIndex.Index]
  333. segEnd := segStart + segLen
  334. // 打开的范围不超过本段的范围
  335. openOff := ctx.StreamRange.Offset - segStart
  336. openOff = math2.Clamp(openOff, 0, segLen)
  337. openLen := segLen
  338. if ctx.StreamRange.Length != nil {
  339. strEnd := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  340. openEnd := math2.Min(strEnd, segEnd)
  341. openLen = openEnd - segStart - openOff
  342. }
  343. t.Open.WithNullableLength(openOff, &openLen)
  344. }
  345. switch addr := f.Hub.Address.(type) {
  346. case *cdssdk.HttpAddressInfo:
  347. t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.Hub})
  348. t.Env().Pinned = true
  349. case *cdssdk.GRPCAddressInfo:
  350. t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: f.Hub, Address: *addr})
  351. t.Env().Pinned = true
  352. default:
  353. return nil, fmt.Errorf("unsupported node address type %T", addr)
  354. }
  355. return t, nil
  356. case *ioswitch2.FromDriver:
  357. n := ctx.DAG.NewFromDriver(f, f.Handle)
  358. n.Env().ToEnvDriver()
  359. n.Env().Pinned = true
  360. if f.StreamIndex.IsRaw() {
  361. f.Handle.RangeHint.Offset = repRange.Offset
  362. f.Handle.RangeHint.Length = repRange.Length
  363. } else if f.StreamIndex.IsEC() {
  364. f.Handle.RangeHint.Offset = blkRange.Offset
  365. f.Handle.RangeHint.Length = blkRange.Length
  366. } else if f.StreamIndex.IsSegment() {
  367. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(f.StreamIndex.Index)
  368. segLen := ctx.Ft.SegmentParam.Segments[f.StreamIndex.Index]
  369. segEnd := segStart + segLen
  370. // 打开的范围不超过本段的范围
  371. openOff := repRange.Offset - segStart
  372. openOff = math2.Clamp(openOff, 0, segLen)
  373. openLen := segLen
  374. if repRange.Length != nil {
  375. repEnd := repRange.Offset + *repRange.Length
  376. openEnd := math2.Min(repEnd, segEnd)
  377. openLen = openEnd - openOff
  378. }
  379. f.Handle.RangeHint.Offset = openOff
  380. f.Handle.RangeHint.Length = &openLen
  381. }
  382. return n, nil
  383. default:
  384. return nil, fmt.Errorf("unsupported from type %T", f)
  385. }
  386. }
  387. func buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) {
  388. switch t := t.(type) {
  389. case *ioswitch2.ToShardStore:
  390. n := ctx.DAG.NewShardWrite(t, t.Storage, t.FileHashStoreKey)
  391. if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
  392. return nil, err
  393. }
  394. n.Env().Pinned = true
  395. return n, nil
  396. case *ioswitch2.ToDriver:
  397. n := ctx.DAG.NewToDriver(t, t.Handle)
  398. n.Env().ToEnvDriver()
  399. n.Env().Pinned = true
  400. return n, nil
  401. case *ioswitch2.LoadToShared:
  402. n := ctx.DAG.NewSharedLoad(t, t.Storage, t.ObjectPath)
  403. if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
  404. return nil, err
  405. }
  406. n.Env().Pinned = true
  407. return n, nil
  408. default:
  409. return nil, fmt.Errorf("unsupported to type %T", t)
  410. }
  411. }
  412. func setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error {
  413. switch addr := addr.(type) {
  414. case *cdssdk.HttpAddressInfo:
  415. n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub})
  416. case *cdssdk.GRPCAddressInfo:
  417. n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: hub, Address: *addr})
  418. default:
  419. return fmt.Errorf("unsupported node address type %T", addr)
  420. }
  421. return nil
  422. }
  423. // 根据StreamRange,调整SegmentSplit中分段的个数和每段的大小
  424. func fixSegmentSplit(ctx *ParseContext) error {
  425. var err error
  426. dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(node *ops2.SegmentSplitNode) bool {
  427. var strEnd *int64
  428. if ctx.StreamRange.Length != nil {
  429. e := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  430. strEnd = &e
  431. }
  432. startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(ctx.StreamRange.Offset, strEnd)
  433. // 关闭超出范围的分段
  434. for i := endSeg; i < len(node.Segments); i++ {
  435. node.OutputStreams().Get(i).ClearAllDst()
  436. }
  437. node.OutputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  438. node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  439. for i := 0; i < startSeg; i++ {
  440. node.OutputStreams().Get(i).ClearAllDst()
  441. }
  442. node.OutputStreams().Slots.RemoveRange(0, startSeg)
  443. node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg)
  444. // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离
  445. startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg)
  446. node.Segments[0] -= ctx.StreamRange.Offset - startSegStart
  447. // StreamRange结束的位置可能在某个分段的中间,此时这个分段的大小就等于流结束位置到分段起始位置的距离
  448. if strEnd != nil {
  449. endSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(endSeg - 1)
  450. node.Segments[len(node.Segments)-1] = *strEnd - endSegStart
  451. }
  452. return true
  453. })
  454. return err
  455. }
  456. // 从SegmentJoin中删除未使用的分段
  457. func fixSegmentJoin(ctx *ParseContext) error {
  458. var err error
  459. dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool {
  460. start := ctx.StreamRange.Offset
  461. var end *int64
  462. if ctx.StreamRange.Length != nil {
  463. e := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  464. end = &e
  465. }
  466. startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end)
  467. // 关闭超出范围的分段
  468. for i := endSeg; i < len(node.Segments); i++ {
  469. node.InputStreams().Get(i).NotTo(node)
  470. }
  471. node.InputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  472. node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  473. for i := 0; i < startSeg; i++ {
  474. node.InputStreams().Get(i).NotTo(node)
  475. }
  476. node.InputStreams().Slots.RemoveRange(0, startSeg)
  477. node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg)
  478. // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离
  479. startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg)
  480. node.Segments[0] -= ctx.StreamRange.Offset - startSegStart
  481. // 检查一下必须的分段是否都被加入到Join中
  482. for i := 0; i < node.InputStreams().Len(); i++ {
  483. if node.InputStreams().Get(i) == nil {
  484. err = fmt.Errorf("segment %v missed to join an raw stream", i+startSeg)
  485. return false
  486. }
  487. }
  488. return true
  489. })
  490. return err
  491. }
  492. // 删除未使用的SegmentJoin
  493. func removeUnusedSegmentJoin(ctx *ParseContext) bool {
  494. changed := false
  495. dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool {
  496. if node.Joined().Dst.Len() > 0 {
  497. return true
  498. }
  499. node.RemoveAllInputs()
  500. ctx.DAG.RemoveNode(node)
  501. return true
  502. })
  503. return changed
  504. }
  505. // 删除未使用的SegmentSplit
  506. func removeUnusedSegmentSplit(ctx *ParseContext) bool {
  507. changed := false
  508. dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(typ *ops2.SegmentSplitNode) bool {
  509. // Split出来的每一个流都没有被使用,才能删除这个指令
  510. for _, out := range typ.OutputStreams().Slots.RawArray() {
  511. if out.Dst.Len() > 0 {
  512. return true
  513. }
  514. }
  515. typ.RemoveAllStream()
  516. ctx.DAG.RemoveNode(typ)
  517. changed = true
  518. return true
  519. })
  520. return changed
  521. }
  522. // 如果Split的结果被完全用于Join,则省略Split和Join指令
  523. func omitSegmentSplitJoin(ctx *ParseContext) bool {
  524. changed := false
  525. dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(splitNode *ops2.SegmentSplitNode) bool {
  526. // 随便找一个输出流的目的地
  527. splitOut := splitNode.OutputStreams().Get(0)
  528. if splitOut.Dst.Len() != 1 {
  529. return true
  530. }
  531. dstNode := splitOut.Dst.Get(0)
  532. // 这个目的地要是一个Join指令
  533. joinNode, ok := dstNode.(*ops2.SegmentJoinNode)
  534. if !ok {
  535. return true
  536. }
  537. if splitNode.OutputStreams().Len() != joinNode.Joined().Dst.Len() {
  538. return true
  539. }
  540. // Join指令的输入必须全部来自Split指令的输出,且位置要相同
  541. for i := 0; i < splitNode.OutputStreams().Len(); i++ {
  542. splitOut := splitNode.OutputStreams().Get(i)
  543. joinIn := joinNode.InputStreams().Get(i)
  544. if splitOut != joinIn {
  545. return true
  546. }
  547. if splitOut != nil && splitOut.Dst.Len() != 1 {
  548. return true
  549. }
  550. }
  551. // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
  552. // F->Split->Join->T 变换为:F->T
  553. splitInput := splitNode.InputStreams().Get(0)
  554. for _, to := range joinNode.Joined().Dst.RawArray() {
  555. splitInput.To(to, to.InputStreams().IndexOf(joinNode.Joined()))
  556. }
  557. splitInput.NotTo(splitNode)
  558. // 并删除这两个指令
  559. ctx.DAG.RemoveNode(joinNode)
  560. ctx.DAG.RemoveNode(splitNode)
  561. changed = true
  562. return true
  563. })
  564. return changed
  565. }
  566. // 删除输出流未被使用的Join指令
  567. func removeUnusedJoin(ctx *ParseContext) bool {
  568. changed := false
  569. dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool {
  570. if node.Joined().Dst.Len() > 0 {
  571. return true
  572. }
  573. node.RemoveAllInputs()
  574. ctx.DAG.RemoveNode(node)
  575. return true
  576. })
  577. return changed
  578. }
  579. // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
  580. func removeUnusedMultiplyOutput(ctx *ParseContext) bool {
  581. changed := false
  582. dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool {
  583. outArr := node.OutputStreams().Slots.RawArray()
  584. for i2, out := range outArr {
  585. if out.Dst.Len() > 0 {
  586. continue
  587. }
  588. outArr[i2] = nil
  589. node.OutputIndexes[i2] = -2
  590. changed = true
  591. }
  592. node.OutputStreams().Slots.SetRawArray(lo2.RemoveAllDefault(outArr))
  593. node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2)
  594. // 如果所有输出流都被删除,则删除该指令
  595. if node.OutputStreams().Len() == 0 {
  596. node.RemoveAllInputs()
  597. ctx.DAG.RemoveNode(node)
  598. changed = true
  599. }
  600. return true
  601. })
  602. return changed
  603. }
  604. // 删除未使用的Split指令
  605. func removeUnusedSplit(ctx *ParseContext) bool {
  606. changed := false
  607. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool {
  608. // Split出来的每一个流都没有被使用,才能删除这个指令
  609. for _, out := range typ.OutputStreams().Slots.RawArray() {
  610. if out.Dst.Len() > 0 {
  611. return true
  612. }
  613. }
  614. typ.RemoveAllStream()
  615. ctx.DAG.RemoveNode(typ)
  616. changed = true
  617. return true
  618. })
  619. return changed
  620. }
  621. // 如果Split的结果被完全用于Join,则省略Split和Join指令
  622. func omitSplitJoin(ctx *ParseContext) bool {
  623. changed := false
  624. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool {
  625. // Split指令的每一个输出都有且只有一个目的地
  626. var dstNode dag.Node
  627. for _, out := range splitNode.OutputStreams().Slots.RawArray() {
  628. if out.Dst.Len() != 1 {
  629. return true
  630. }
  631. if dstNode == nil {
  632. dstNode = out.Dst.Get(0)
  633. } else if dstNode != out.Dst.Get(0) {
  634. return true
  635. }
  636. }
  637. if dstNode == nil {
  638. return true
  639. }
  640. // 且这个目的地要是一个Join指令
  641. joinNode, ok := dstNode.(*ops2.ChunkedJoinNode)
  642. if !ok {
  643. return true
  644. }
  645. // 同时这个Join指令的输入也必须全部来自Split指令的输出。
  646. // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可
  647. if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() {
  648. return true
  649. }
  650. // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
  651. // F->Split->Join->T 变换为:F->T
  652. splitInput := splitNode.InputStreams().Get(0)
  653. for _, to := range joinNode.Joined().Dst.RawArray() {
  654. splitInput.To(to, to.InputStreams().IndexOf(joinNode.Joined()))
  655. }
  656. splitInput.NotTo(splitNode)
  657. // 并删除这两个指令
  658. ctx.DAG.RemoveNode(joinNode)
  659. ctx.DAG.RemoveNode(splitNode)
  660. changed = true
  661. return true
  662. })
  663. return changed
  664. }
  665. // 通过流的输入输出位置来确定指令的执行位置。
  666. // To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG,
  667. // 所以理论上不会出现有指令的位置始终无法确定的情况。
  668. func pin(ctx *ParseContext) bool {
  669. changed := false
  670. ctx.DAG.Walk(func(node dag.Node) bool {
  671. if node.Env().Pinned {
  672. return true
  673. }
  674. var toEnv *dag.NodeEnv
  675. for _, out := range node.OutputStreams().Slots.RawArray() {
  676. for _, to := range out.Dst.RawArray() {
  677. if to.Env().Type == dag.EnvUnknown {
  678. continue
  679. }
  680. if toEnv == nil {
  681. toEnv = to.Env()
  682. } else if !toEnv.Equals(to.Env()) {
  683. toEnv = nil
  684. break
  685. }
  686. }
  687. }
  688. if toEnv != nil {
  689. if !node.Env().Equals(toEnv) {
  690. changed = true
  691. }
  692. *node.Env() = *toEnv
  693. return true
  694. }
  695. // 否则根据输入流的始发地来固定
  696. var fromEnv *dag.NodeEnv
  697. for _, in := range node.InputStreams().Slots.RawArray() {
  698. if in.Src.Env().Type == dag.EnvUnknown {
  699. continue
  700. }
  701. if fromEnv == nil {
  702. fromEnv = in.Src.Env()
  703. } else if !fromEnv.Equals(in.Src.Env()) {
  704. fromEnv = nil
  705. break
  706. }
  707. }
  708. if fromEnv != nil {
  709. if !node.Env().Equals(fromEnv) {
  710. changed = true
  711. }
  712. *node.Env() = *fromEnv
  713. }
  714. return true
  715. })
  716. return changed
  717. }
  718. // 删除未使用的From流,不会删除FromDriver
  719. func removeUnusedFromNode(ctx *ParseContext) {
  720. dag.WalkOnlyType[ops2.FromNode](ctx.DAG.Graph, func(node ops2.FromNode) bool {
  721. if _, ok := node.(*ops2.FromDriverNode); ok {
  722. return true
  723. }
  724. if node.Output().Var().Dst.Len() == 0 {
  725. ctx.DAG.RemoveNode(node)
  726. }
  727. return true
  728. })
  729. }
  730. // 对于所有未使用的流,增加Drop指令
  731. func dropUnused(ctx *ParseContext) {
  732. ctx.DAG.Walk(func(node dag.Node) bool {
  733. for _, out := range node.OutputStreams().Slots.RawArray() {
  734. if out.Dst.Len() == 0 {
  735. n := ctx.DAG.NewDropStream()
  736. *n.Env() = *node.Env()
  737. n.SetInput(out)
  738. }
  739. }
  740. return true
  741. })
  742. }
  743. // 将SegmentJoin指令替换成分片上传指令
  744. func useMultipartUploadToShardStore(ctx *ParseContext) {
  745. dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(joinNode *ops2.SegmentJoinNode) bool {
  746. if joinNode.Joined().Dst.Len() != 1 {
  747. return true
  748. }
  749. joinDst := joinNode.Joined().Dst.Get(0)
  750. shardNode, ok := joinDst.(*ops2.ShardWriteNode)
  751. if !ok {
  752. return true
  753. }
  754. // SegmentJoin的输出流的范围必须与ToShardStore的输入流的范围相同,
  755. // 虽然可以通过调整SegmentJoin的输入流来调整范围,但太复杂,暂不支持
  756. toStrIdx := shardNode.GetTo().GetStreamIndex()
  757. toStrRng := shardNode.GetTo().GetRange()
  758. if toStrIdx.IsRaw() {
  759. if !toStrRng.Equals(ctx.StreamRange) {
  760. return true
  761. }
  762. } else {
  763. return true
  764. }
  765. // Join的目的地必须支持MultipartUpload功能才能替换成分片上传
  766. multiUpload, err := factory.GetBuilder(shardNode.Storage).CreateMultiparter()
  767. if err != nil {
  768. return true
  769. }
  770. // Join的每一个段的大小必须超过最小分片大小。
  771. // 目前只支持拆分超过最大分片的流,不支持合并多个小段流以达到最小分片大小。
  772. for _, size := range joinNode.Segments {
  773. if size < multiUpload.MinPartSize() {
  774. return true
  775. }
  776. }
  777. initNode := ctx.DAG.NewMultipartInitiator(shardNode.Storage)
  778. initNode.Env().CopyFrom(shardNode.Env())
  779. partNumber := 1
  780. for i, size := range joinNode.Segments {
  781. joinInput := joinNode.InputSlot(i)
  782. if size > multiUpload.MaxPartSize() {
  783. // 如果一个分段的大小大于最大分片大小,则需要拆分为多个小段上传
  784. // 拆分以及上传指令直接在流的产生节点执行
  785. splits := math2.SplitLessThan(size, multiUpload.MaxPartSize())
  786. splitNode := ctx.DAG.NewSegmentSplit(splits)
  787. splitNode.Env().CopyFrom(joinInput.Var().Src.Env())
  788. joinInput.Var().ToSlot(splitNode.InputSlot())
  789. for i2 := 0; i2 < len(splits); i2++ {
  790. uploadNode := ctx.DAG.NewMultipartUpload(shardNode.Storage, partNumber, splits[i2])
  791. uploadNode.Env().CopyFrom(joinInput.Var().Src.Env())
  792. initNode.UploadArgsVar().ToSlot(uploadNode.UploadArgsSlot())
  793. splitNode.SegmentVar(i2).ToSlot(uploadNode.PartStreamSlot())
  794. uploadNode.UploadResultVar().ToSlot(initNode.AppendPartInfoSlot())
  795. partNumber++
  796. }
  797. } else {
  798. // 否则直接上传整个分段
  799. uploadNode := ctx.DAG.NewMultipartUpload(shardNode.Storage, partNumber, size)
  800. // 上传指令直接在流的产生节点执行
  801. uploadNode.Env().CopyFrom(joinInput.Var().Src.Env())
  802. initNode.UploadArgsVar().ToSlot(uploadNode.UploadArgsSlot())
  803. joinInput.Var().ToSlot(uploadNode.PartStreamSlot())
  804. uploadNode.UploadResultVar().ToSlot(initNode.AppendPartInfoSlot())
  805. partNumber++
  806. }
  807. joinInput.Var().NotTo(joinNode)
  808. }
  809. bypassNode := ctx.DAG.NewBypassToShardStore(shardNode.Storage.Storage.StorageID, shardNode.FileHashStoreKey)
  810. bypassNode.Env().CopyFrom(shardNode.Env())
  811. // 分片上传Node产生的结果送到bypassNode,bypassNode将处理结果再送回分片上传Node
  812. initNode.BypassFileInfoVar().ToSlot(bypassNode.BypassFileInfoSlot())
  813. bypassNode.BypassCallbackVar().ToSlot(initNode.BypassCallbackSlot())
  814. // 最后删除Join指令和ToShardStore指令
  815. ctx.DAG.RemoveNode(joinNode)
  816. ctx.DAG.RemoveNode(shardNode)
  817. delete(ctx.ToNodes, shardNode.GetTo())
  818. return true
  819. })
  820. }
  821. // 为IPFS写入指令存储结果
  822. func storeShardWriteResult(ctx *ParseContext) {
  823. dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool {
  824. if n.FileHashStoreKey == "" {
  825. return true
  826. }
  827. storeNode := ctx.DAG.NewStore()
  828. storeNode.Env().ToEnvDriver()
  829. storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
  830. return true
  831. })
  832. dag.WalkOnlyType[*ops2.BypassToShardStoreNode](ctx.DAG.Graph, func(n *ops2.BypassToShardStoreNode) bool {
  833. if n.FileHashStoreKey == "" {
  834. return true
  835. }
  836. storeNode := ctx.DAG.NewStore()
  837. storeNode.Env().ToEnvDriver()
  838. storeNode.Store(n.FileHashStoreKey, n.FileHashVar().Var())
  839. return true
  840. })
  841. }
  842. // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
  843. func generateRange(ctx *ParseContext) {
  844. for to, toNode := range ctx.ToNodes {
  845. toStrIdx := to.GetStreamIndex()
  846. toRng := to.GetRange()
  847. if toStrIdx.IsRaw() {
  848. n := ctx.DAG.NewRange()
  849. toInput := toNode.Input()
  850. *n.Env() = *toInput.Var().Src.Env()
  851. rnged := n.RangeStream(toInput.Var(), math2.Range{
  852. Offset: toRng.Offset - ctx.StreamRange.Offset,
  853. Length: toRng.Length,
  854. })
  855. toInput.Var().NotTo(toNode)
  856. toNode.SetInput(rnged)
  857. } else if toStrIdx.IsEC() {
  858. stripSize := int64(ctx.Ft.ECParam.ChunkSize * ctx.Ft.ECParam.K)
  859. blkStartIdx := ctx.StreamRange.Offset / stripSize
  860. blkStart := blkStartIdx * int64(ctx.Ft.ECParam.ChunkSize)
  861. n := ctx.DAG.NewRange()
  862. toInput := toNode.Input()
  863. *n.Env() = *toInput.Var().Src.Env()
  864. rnged := n.RangeStream(toInput.Var(), math2.Range{
  865. Offset: toRng.Offset - blkStart,
  866. Length: toRng.Length,
  867. })
  868. toInput.Var().NotTo(toNode)
  869. toNode.SetInput(rnged)
  870. } else if toStrIdx.IsSegment() {
  871. // if frNode, ok := toNode.Input().Var().From().Node.(ops2.FromNode); ok {
  872. // // 目前只有To也是分段时,才可能对接一个提供分段的From,此时不需要再生成Range指令
  873. // if frNode.GetFrom().GetStreamIndex().IsSegment() {
  874. // continue
  875. // }
  876. // }
  877. // segStart := ctx.Ft.SegmentParam.CalcSegmentStart(toStrIdx.Index)
  878. // strStart := segStart + toRng.Offset
  879. // n := ctx.DAG.NewRange()
  880. // toInput := toNode.Input()
  881. // *n.Env() = *toInput.Var().From().Node.Env()
  882. // rnged := n.RangeStream(toInput.Var(), exec.Range{
  883. // Offset: strStart - ctx.StreamRange.Offset,
  884. // Length: toRng.Length,
  885. // })
  886. // toInput.Var().NotTo(toNode, toInput.Index)
  887. // toNode.SetInput(rnged)
  888. }
  889. }
  890. }
  891. // 生成Clone指令
  892. func generateClone(ctx *ParseContext) {
  893. ctx.DAG.Walk(func(node dag.Node) bool {
  894. for _, outVar := range node.OutputStreams().Slots.RawArray() {
  895. if outVar.Dst.Len() <= 1 {
  896. continue
  897. }
  898. c := ctx.DAG.NewCloneStream()
  899. *c.Env() = *node.Env()
  900. for _, dst := range outVar.Dst.RawArray() {
  901. c.NewOutput().To(dst, dst.InputStreams().IndexOf(outVar))
  902. }
  903. outVar.Dst.Resize(0)
  904. c.SetInput(outVar)
  905. }
  906. for _, outVar := range node.OutputValues().Slots.RawArray() {
  907. if outVar.Dst.Len() <= 1 {
  908. continue
  909. }
  910. t := ctx.DAG.NewCloneValue()
  911. *t.Env() = *node.Env()
  912. for _, dst := range outVar.Dst.RawArray() {
  913. t.NewOutput().To(dst, dst.InputValues().IndexOf(outVar))
  914. }
  915. outVar.Dst.Resize(0)
  916. t.SetInput(outVar)
  917. }
  918. return true
  919. })
  920. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。