You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

parser.go 15 kB


  1. package plans
  2. import (
  3. "fmt"
  4. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  5. "gitlink.org.cn/cloudream/common/utils/lo2"
  6. )
  7. type FromToParser interface {
  8. Parse(ft FromTo, blder *PlanBuilder) error
  9. }
  10. type DefaultParser struct {
  11. EC *cdssdk.ECRedundancy
  12. }
  13. type ParseContext struct {
  14. Ft FromTo
  15. Ops []*Node
  16. }
  17. func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error {
  18. ctx := ParseContext{Ft: ft}
  19. // 分成两个阶段:
  20. // 1. 基于From和To生成更多指令,初步匹配to的需求
  21. err := p.extend(&ctx, ft, blder)
  22. if err != nil {
  23. return err
  24. }
  25. // 2. 优化上一步生成的指令
  26. // 对于删除指令的优化,需要反复进行,直到没有变化为止。
  27. // 从目前实现上来说不会死循环
  28. for {
  29. opted := false
  30. if p.removeUnusedJoin(&ctx) {
  31. opted = true
  32. }
  33. if p.removeUnusedMultiplyOutput(&ctx) {
  34. opted = true
  35. }
  36. if p.removeUnusedSplit(&ctx) {
  37. opted = true
  38. }
  39. if p.omitSplitJoin(&ctx) {
  40. opted = true
  41. }
  42. if !opted {
  43. break
  44. }
  45. }
  46. // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
  47. // 从目前实现上来说不会死循环
  48. for {
  49. opted := false
  50. if p.pinIPFSRead(&ctx) {
  51. opted = true
  52. }
  53. if p.pinJoin(&ctx) {
  54. opted = true
  55. }
  56. if p.pinMultiply(&ctx) {
  57. opted = true
  58. }
  59. if p.pinSplit(&ctx) {
  60. opted = true
  61. }
  62. if !opted {
  63. break
  64. }
  65. }
  66. // 下面这些只需要执行一次,但需要按顺序
  67. p.dropUnused(&ctx)
  68. p.storeIPFSWriteResult(&ctx)
  69. p.generateClone(&ctx)
  70. p.generateSend(&ctx)
  71. return p.buildPlan(&ctx, blder)
  72. }
  73. func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *StreamVar {
  74. for _, op := range ctx.Ops {
  75. for _, o := range op.OutputStreams {
  76. if o.DataIndex == dataIndex {
  77. return o
  78. }
  79. }
  80. }
  81. return nil
  82. }
  83. func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) error {
  84. for _, f := range ft.Froms {
  85. o := f.BuildOp()
  86. ctx.Ops = append(ctx.Ops, &o)
  87. // 对于完整文件的From,生成Split指令
  88. if f.GetDataIndex() == -1 {
  89. splitOp := &Node{
  90. Env: nil,
  91. Type: &ChunkedSplitOp{ChunkSize: p.EC.ChunkSize, PaddingZeros: true},
  92. }
  93. splitOp.AddInput(o.OutputStreams[0])
  94. for i := 0; i < p.EC.K; i++ {
  95. splitOp.NewOutput(i)
  96. }
  97. ctx.Ops = append(ctx.Ops, splitOp)
  98. }
  99. }
  100. // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
  101. ecInputStrs := make(map[int]*StreamVar)
  102. loop:
  103. for _, o := range ctx.Ops {
  104. for _, s := range o.OutputStreams {
  105. if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil {
  106. ecInputStrs[s.DataIndex] = s
  107. if len(ecInputStrs) == p.EC.K {
  108. break loop
  109. }
  110. }
  111. }
  112. }
  113. if len(ecInputStrs) == p.EC.K {
  114. mulOp := &Node{
  115. Env: nil,
  116. Type: &MultiplyOp{ChunkSize: p.EC.ChunkSize},
  117. }
  118. for _, s := range ecInputStrs {
  119. mulOp.AddInput(s)
  120. }
  121. for i := 0; i < p.EC.N; i++ {
  122. mulOp.NewOutput(i)
  123. }
  124. ctx.Ops = append(ctx.Ops, mulOp)
  125. joinOp := &Node{
  126. Env: nil,
  127. Type: &ChunkedJoinOp{p.EC.ChunkSize},
  128. }
  129. for i := 0; i < p.EC.K; i++ {
  130. // 不可能找不到流
  131. joinOp.AddInput(p.findOutputStream(ctx, i))
  132. }
  133. joinOp.NewOutput(-1)
  134. }
  135. // 为每一个To找到一个输入流
  136. for _, t := range ft.Tos {
  137. o := t.BuildOp()
  138. ctx.Ops = append(ctx.Ops, &o)
  139. str := p.findOutputStream(ctx, t.GetDataIndex())
  140. if str == nil {
  141. return fmt.Errorf("no output stream found for data index %d", t.GetDataIndex())
  142. }
  143. o.AddInput(str)
  144. }
  145. return nil
  146. }
  147. // 删除输出流未被使用的Join指令
  148. func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
  149. opted := false
  150. for i, op := range ctx.Ops {
  151. _, ok := op.Type.(*ChunkedJoinOp)
  152. if !ok {
  153. continue
  154. }
  155. if len(op.OutputStreams[0].Toes) > 0 {
  156. continue
  157. }
  158. for _, in := range op.InputStreams {
  159. in.RemoveTo(op)
  160. }
  161. ctx.Ops[i] = nil
  162. opted = true
  163. }
  164. ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
  165. return opted
  166. }
  167. // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
  168. func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
  169. opted := false
  170. for i, op := range ctx.Ops {
  171. _, ok := op.Type.(*MultiplyOp)
  172. if !ok {
  173. continue
  174. }
  175. for i2, out := range op.OutputStreams {
  176. if len(out.Toes) > 0 {
  177. continue
  178. }
  179. op.OutputStreams[i2] = nil
  180. }
  181. op.OutputStreams = lo2.RemoveAllDefault(op.OutputStreams)
  182. if len(op.OutputStreams) == 0 {
  183. for _, in := range op.InputStreams {
  184. in.RemoveTo(op)
  185. }
  186. ctx.Ops[i] = nil
  187. }
  188. opted = true
  189. }
  190. ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
  191. return opted
  192. }
  193. // 删除未使用的Split指令
  194. func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
  195. opted := false
  196. for i, op := range ctx.Ops {
  197. _, ok := op.Type.(*ChunkedSplitOp)
  198. if !ok {
  199. continue
  200. }
  201. // Split出来的每一个流都没有被使用,才能删除这个指令
  202. isAllUnused := true
  203. for _, out := range op.OutputStreams {
  204. if len(out.Toes) > 0 {
  205. isAllUnused = false
  206. break
  207. }
  208. }
  209. if isAllUnused {
  210. op.InputStreams[0].RemoveTo(op)
  211. ctx.Ops[i] = nil
  212. opted = true
  213. }
  214. }
  215. ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
  216. return opted
  217. }
  218. // 如果Split的结果被完全用于Join,则省略Split和Join指令
  219. func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
  220. opted := false
  221. loop:
  222. for iSplit, splitOp := range ctx.Ops {
  223. // 进行合并操作时会删除多个指令,因此这里存在splitOp == nil的情况
  224. if splitOp == nil {
  225. continue
  226. }
  227. _, ok := splitOp.Type.(*ChunkedSplitOp)
  228. if !ok {
  229. continue
  230. }
  231. // Split指令的每一个输出都有且只有一个目的地
  232. var joinOp *Node
  233. for _, out := range splitOp.OutputStreams {
  234. if len(out.Toes) != 1 {
  235. continue
  236. }
  237. if joinOp == nil {
  238. joinOp = out.Toes[0]
  239. } else if joinOp != out.Toes[0] {
  240. continue loop
  241. }
  242. }
  243. if joinOp == nil {
  244. continue
  245. }
  246. // 且这个目的地要是一个Join指令
  247. _, ok = joinOp.Type.(*ChunkedJoinOp)
  248. if !ok {
  249. continue
  250. }
  251. // 同时这个Join指令的输入也必须全部来自Split指令的输出。
  252. // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可
  253. if len(joinOp.InputStreams) != len(splitOp.OutputStreams) {
  254. continue
  255. }
  256. // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
  257. // F->Split->Join->T 变换为:F->T
  258. splitOp.InputStreams[0].RemoveTo(splitOp)
  259. for _, to := range joinOp.OutputStreams[0].Toes {
  260. to.ReplaceInput(joinOp.OutputStreams[0], splitOp.InputStreams[0])
  261. }
  262. // 并删除这两个指令
  263. ctx.Ops[iSplit] = nil
  264. lo2.Clear(ctx.Ops, joinOp)
  265. opted = true
  266. }
  267. ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
  268. return opted
  269. }
  270. // 确定Split命令的执行位置
  271. func (p *DefaultParser) pinSplit(ctx *ParseContext) bool {
  272. opted := false
  273. for _, op := range ctx.Ops {
  274. _, ok := op.Type.(*ChunkedSplitOp)
  275. if !ok {
  276. continue
  277. }
  278. // 如果Split的每一个流的目的地都是同一个,则将Split固定在这个地方执行
  279. var toEnv OpEnv
  280. useToEnv := true
  281. for _, out := range op.OutputStreams {
  282. for _, to := range out.Toes {
  283. // 如果某个流的目的地也不确定,则将其视为与其他流的目的地相同
  284. if to.Env == nil {
  285. continue
  286. }
  287. if toEnv == nil {
  288. toEnv = to.Env
  289. } else if toEnv.Equals(to.Env) {
  290. useToEnv = false
  291. break
  292. }
  293. }
  294. if !useToEnv {
  295. break
  296. }
  297. }
  298. // 所有输出流的目的地都不确定,那么就不能根据输出流去固定
  299. if toEnv == nil {
  300. useToEnv = false
  301. }
  302. if useToEnv {
  303. if op.Env == nil || !op.Env.Equals(toEnv) {
  304. opted = true
  305. }
  306. op.Env = toEnv
  307. continue
  308. }
  309. // 此时查看输入流的始发地是否可以确定,可以的话使用这个位置
  310. fromEnv := op.InputStreams[0].From.Env
  311. if fromEnv != nil {
  312. if op.Env == nil || !op.Env.Equals(fromEnv) {
  313. opted = true
  314. }
  315. op.Env = fromEnv
  316. }
  317. }
  318. return opted
  319. }
  320. // 确定Join命令的执行位置,策略与固定Split类似
  321. func (p *DefaultParser) pinJoin(ctx *ParseContext) bool {
  322. opted := false
  323. for _, op := range ctx.Ops {
  324. _, ok := op.Type.(*ChunkedJoinOp)
  325. if !ok {
  326. continue
  327. }
  328. // 先查看输出流的目的地是否可以确定,可以的话使用这个位置
  329. var toEnv OpEnv
  330. for _, to := range op.OutputStreams[0].Toes {
  331. if to.Env == nil {
  332. continue
  333. }
  334. if toEnv == nil {
  335. toEnv = to.Env
  336. } else if !toEnv.Equals(to.Env) {
  337. toEnv = nil
  338. break
  339. }
  340. }
  341. if toEnv != nil {
  342. if op.Env == nil || !op.Env.Equals(toEnv) {
  343. opted = true
  344. }
  345. op.Env = toEnv
  346. continue
  347. }
  348. // 否则根据输入流的始发地来固定
  349. var fromEnv OpEnv
  350. for _, in := range op.InputStreams {
  351. if in.From.Env == nil {
  352. continue
  353. }
  354. if fromEnv == nil {
  355. fromEnv = in.From.Env
  356. } else if !fromEnv.Equals(in.From.Env) {
  357. // 输入流的始发地不同,那也必须选一个作为固定位置
  358. break
  359. }
  360. }
  361. // 所有输入流的始发地都不确定,那没办法了
  362. if fromEnv != nil {
  363. if op.Env == nil || !op.Env.Equals(fromEnv) {
  364. opted = true
  365. }
  366. op.Env = fromEnv
  367. continue
  368. }
  369. }
  370. return opted
  371. }
  372. // 确定Multiply命令的执行位置
  373. func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool {
  374. opted := false
  375. for _, op := range ctx.Ops {
  376. _, ok := op.Type.(*MultiplyOp)
  377. if !ok {
  378. continue
  379. }
  380. var toEnv OpEnv
  381. for _, out := range op.OutputStreams {
  382. for _, to := range out.Toes {
  383. if to.Env == nil {
  384. continue
  385. }
  386. if toEnv == nil {
  387. toEnv = to.Env
  388. } else if !toEnv.Equals(to.Env) {
  389. toEnv = nil
  390. break
  391. }
  392. }
  393. }
  394. if toEnv != nil {
  395. if op.Env == nil || !op.Env.Equals(toEnv) {
  396. opted = true
  397. }
  398. op.Env = toEnv
  399. continue
  400. }
  401. // 否则根据输入流的始发地来固定
  402. var fromEnv OpEnv
  403. for _, in := range op.InputStreams {
  404. if in.From.Env == nil {
  405. continue
  406. }
  407. if fromEnv == nil {
  408. fromEnv = in.From.Env
  409. } else if !fromEnv.Equals(in.From.Env) {
  410. // 输入流的始发地不同,那也必须选一个作为固定位置
  411. break
  412. }
  413. }
  414. // 所有输入流的始发地都不确定,那没办法了
  415. if fromEnv != nil {
  416. if op.Env == nil || !op.Env.Equals(fromEnv) {
  417. opted = true
  418. }
  419. op.Env = fromEnv
  420. continue
  421. }
  422. }
  423. return opted
  424. }
  425. // 确定IPFS读取指令的执行位置
  426. func (p *DefaultParser) pinIPFSRead(ctx *ParseContext) bool {
  427. opted := false
  428. for _, op := range ctx.Ops {
  429. _, ok := op.Type.(*IPFSReadType)
  430. if !ok {
  431. continue
  432. }
  433. if op.Env != nil {
  434. continue
  435. }
  436. var toEnv OpEnv
  437. for _, to := range op.OutputStreams[0].Toes {
  438. if to.Env == nil {
  439. continue
  440. }
  441. if toEnv == nil {
  442. toEnv = to.Env
  443. } else if !toEnv.Equals(to.Env) {
  444. toEnv = nil
  445. break
  446. }
  447. }
  448. if toEnv != nil {
  449. if op.Env == nil || !op.Env.Equals(toEnv) {
  450. opted = true
  451. }
  452. op.Env = toEnv
  453. }
  454. }
  455. return opted
  456. }
  457. // 对于所有未使用的流,增加Drop指令
  458. func (p *DefaultParser) dropUnused(ctx *ParseContext) {
  459. for _, op := range ctx.Ops {
  460. for _, out := range op.OutputStreams {
  461. if len(out.Toes) == 0 {
  462. dropOp := &Node{
  463. Env: nil,
  464. Type: &DropOp{},
  465. }
  466. dropOp.AddInput(out)
  467. ctx.Ops = append(ctx.Ops, dropOp)
  468. }
  469. }
  470. }
  471. }
  472. // 为IPFS写入指令存储结果
  473. func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
  474. for _, op := range ctx.Ops {
  475. w, ok := op.Type.(*IPFSWriteType)
  476. if !ok {
  477. continue
  478. }
  479. if w.FileHashStoreKey == "" {
  480. continue
  481. }
  482. storeOp := &Node{
  483. Env: &ExecutorEnv{},
  484. Type: &StoreOp{
  485. StoreKey: w.FileHashStoreKey,
  486. },
  487. }
  488. storeOp.AddInputVar(op.OutputValues[0])
  489. ctx.Ops = append(ctx.Ops, storeOp)
  490. }
  491. }
  492. // 生成Clone指令
  493. func (p *DefaultParser) generateClone(ctx *ParseContext) {
  494. for _, op := range ctx.Ops {
  495. for _, out := range op.OutputStreams {
  496. if len(out.Toes) <= 1 {
  497. continue
  498. }
  499. cloneOp := &Node{
  500. Env: op.Env,
  501. Type: &CloneStreamOp{},
  502. }
  503. for _, to := range out.Toes {
  504. to.ReplaceInput(out, cloneOp.NewOutput(out.DataIndex))
  505. }
  506. out.Toes = nil
  507. cloneOp.AddInput(out)
  508. ctx.Ops = append(ctx.Ops, cloneOp)
  509. }
  510. for _, out := range op.OutputValues {
  511. if len(out.Toes) <= 1 {
  512. continue
  513. }
  514. cloneOp := &Node{
  515. Env: op.Env,
  516. Type: &CloneVarOp{},
  517. }
  518. for _, to := range out.Toes {
  519. to.ReplaceInputVar(out, cloneOp.NewOutputVar(out.Type))
  520. }
  521. out.Toes = nil
  522. cloneOp.AddInputVar(out)
  523. }
  524. }
  525. }
  526. // 生成Send指令
  527. func (p *DefaultParser) generateSend(ctx *ParseContext) {
  528. for _, op := range ctx.Ops {
  529. for _, out := range op.OutputStreams {
  530. to := out.Toes[0]
  531. if to.Env.Equals(op.Env) {
  532. continue
  533. }
  534. switch to.Env.(type) {
  535. case *ExecutorEnv:
  536. // 如果是要送到Executor,则只能由Executor主动去拉取
  537. getStrOp := &Node{
  538. Env: &ExecutorEnv{},
  539. Type: &GetStreamOp{},
  540. }
  541. out.Toes = nil
  542. getStrOp.AddInput(out)
  543. to.ReplaceInput(out, getStrOp.NewOutput(out.DataIndex))
  544. ctx.Ops = append(ctx.Ops, getStrOp)
  545. case *AgentEnv:
  546. // 如果是要送到Agent,则可以直接发送
  547. sendStrOp := &Node{
  548. Env: op.Env,
  549. Type: &SendStreamOp{},
  550. }
  551. out.Toes = nil
  552. sendStrOp.AddInput(out)
  553. to.ReplaceInput(out, sendStrOp.NewOutput(out.DataIndex))
  554. ctx.Ops = append(ctx.Ops, sendStrOp)
  555. }
  556. }
  557. for _, out := range op.OutputValues {
  558. to := out.Toes[0]
  559. if to.Env.Equals(op.Env) {
  560. continue
  561. }
  562. switch to.Env.(type) {
  563. case *ExecutorEnv:
  564. // 如果是要送到Executor,则只能由Executor主动去拉取
  565. getVarOp := &Node{
  566. Env: &ExecutorEnv{},
  567. Type: &GetVarOp{},
  568. }
  569. out.Toes = nil
  570. getVarOp.AddInputVar(out)
  571. to.ReplaceInputVar(out, getVarOp.NewOutputVar(out.Type))
  572. ctx.Ops = append(ctx.Ops, getVarOp)
  573. case *AgentEnv:
  574. // 如果是要送到Agent,则可以直接发送
  575. sendVarOp := &Node{
  576. Env: op.Env,
  577. Type: &SendVarOp{},
  578. }
  579. out.Toes = nil
  580. sendVarOp.AddInputVar(out)
  581. to.ReplaceInputVar(out, sendVarOp.NewOutputVar(out.Type))
  582. ctx.Ops = append(ctx.Ops, sendVarOp)
  583. }
  584. }
  585. }
  586. }
  587. // 生成Plan
  588. func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *PlanBuilder) error {
  589. for _, op := range ctx.Ops {
  590. for _, out := range op.OutputStreams {
  591. if out.Var != nil {
  592. continue
  593. }
  594. out.Var = blder.NewStreamVar()
  595. }
  596. for _, in := range op.InputStreams {
  597. if in.Var != nil {
  598. continue
  599. }
  600. in.Var = blder.NewStreamVar()
  601. }
  602. for _, out := range op.OutputValues {
  603. if out.Var != nil {
  604. continue
  605. }
  606. switch out.Type {
  607. case StringValueVar:
  608. out.Var = blder.NewStringVar()
  609. }
  610. }
  611. for _, in := range op.InputValues {
  612. if in.Var != nil {
  613. continue
  614. }
  615. switch in.Type {
  616. case StringValueVar:
  617. in.Var = blder.NewStringVar()
  618. }
  619. }
  620. if err := op.Type.GenerateOp(op, blder); err != nil {
  621. return err
  622. }
  623. }
  624. return nil
  625. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。