You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

parser.go 15 kB

1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. package parser
  2. import (
  3. "fmt"
  4. "math"
  5. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
  8. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  9. "gitlink.org.cn/cloudream/common/utils/lo2"
  10. "gitlink.org.cn/cloudream/common/utils/math2"
  11. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
  14. )
  15. type DefaultParser struct {
  16. EC cdssdk.ECRedundancy
  17. }
  18. func NewParser(ec cdssdk.ECRedundancy) *DefaultParser {
  19. return &DefaultParser{
  20. EC: ec,
  21. }
  22. }
  23. type IndexedStream struct {
  24. Stream *dag.Var
  25. DataIndex int
  26. }
  27. type ParseContext struct {
  28. Ft ioswitch2.FromTo
  29. DAG *ops2.GraphNodeBuilder
  30. // 为了产生所有To所需的数据范围,而需要From打开的范围。
  31. // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。
  32. ToNodes map[ioswitch2.To]ops2.ToNode
  33. IndexedStreams []IndexedStream
  34. StreamRange exec.Range
  35. }
  36. func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
  37. ctx := ParseContext{
  38. Ft: ft,
  39. DAG: ops2.NewGraphNodeBuilder(),
  40. ToNodes: make(map[ioswitch2.To]ops2.ToNode),
  41. }
  42. // 分成两个阶段:
  43. // 1. 基于From和To生成更多指令,初步匹配to的需求
  44. // 计算一下打开流的范围
  45. p.calcStreamRange(&ctx)
  46. err := p.extend(&ctx)
  47. if err != nil {
  48. return err
  49. }
  50. // 2. 优化上一步生成的指令
  51. // 对于删除指令的优化,需要反复进行,直到没有变化为止。
  52. // 从目前实现上来说不会死循环
  53. for {
  54. opted := false
  55. if p.removeUnusedJoin(&ctx) {
  56. opted = true
  57. }
  58. if p.removeUnusedMultiplyOutput(&ctx) {
  59. opted = true
  60. }
  61. if p.removeUnusedSplit(&ctx) {
  62. opted = true
  63. }
  64. if p.omitSplitJoin(&ctx) {
  65. opted = true
  66. }
  67. if !opted {
  68. break
  69. }
  70. }
  71. // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
  72. for p.pin(&ctx) {
  73. }
  74. // 下面这些只需要执行一次,但需要按顺序
  75. p.dropUnused(&ctx)
  76. p.storeIPFSWriteResult(&ctx)
  77. p.generateClone(&ctx)
  78. p.generateRange(&ctx)
  79. return plan.Generate(ctx.DAG.Graph, blder)
  80. }
  81. func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.Var {
  82. var ret *dag.Var
  83. for _, s := range ctx.IndexedStreams {
  84. if s.DataIndex == streamIndex {
  85. ret = s.Stream
  86. break
  87. }
  88. }
  89. return ret
  90. }
  91. // 计算输入流的打开范围。会把流的范围按条带大小取整
  92. func (p *DefaultParser) calcStreamRange(ctx *ParseContext) {
  93. stripSize := int64(p.EC.ChunkSize * p.EC.K)
  94. rng := exec.Range{
  95. Offset: math.MaxInt64,
  96. }
  97. for _, to := range ctx.Ft.Toes {
  98. if to.GetDataIndex() == -1 {
  99. toRng := to.GetRange()
  100. rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
  101. if toRng.Length != nil {
  102. rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize))
  103. } else {
  104. rng.Length = nil
  105. }
  106. } else {
  107. toRng := to.GetRange()
  108. blkStartIndex := math2.FloorDiv(toRng.Offset, int64(p.EC.ChunkSize))
  109. rng.ExtendStart(blkStartIndex * stripSize)
  110. if toRng.Length != nil {
  111. blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(p.EC.ChunkSize))
  112. rng.ExtendEnd(blkEndIndex * stripSize)
  113. } else {
  114. rng.Length = nil
  115. }
  116. }
  117. }
  118. ctx.StreamRange = rng
  119. }
  120. func (p *DefaultParser) extend(ctx *ParseContext) error {
  121. for _, fr := range ctx.Ft.Froms {
  122. frNode, err := p.buildFromNode(ctx, fr)
  123. if err != nil {
  124. return err
  125. }
  126. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  127. Stream: frNode.Output().Var,
  128. DataIndex: fr.GetDataIndex(),
  129. })
  130. // 对于完整文件的From,生成Split指令
  131. if fr.GetDataIndex() == -1 {
  132. splitNode := ctx.DAG.NewChunkedSplit(p.EC.ChunkSize)
  133. splitNode.Split(frNode.Output().Var, p.EC.K)
  134. for i := 0; i < p.EC.K; i++ {
  135. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  136. Stream: splitNode.SubStream(i),
  137. DataIndex: i,
  138. })
  139. }
  140. }
  141. }
  142. // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
  143. ecInputStrs := make(map[int]*dag.Var)
  144. for _, s := range ctx.IndexedStreams {
  145. if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil {
  146. ecInputStrs[s.DataIndex] = s.Stream
  147. if len(ecInputStrs) == p.EC.K {
  148. break
  149. }
  150. }
  151. }
  152. if len(ecInputStrs) == p.EC.K {
  153. mulNode := ctx.DAG.NewECMultiply(p.EC)
  154. for i, s := range ecInputStrs {
  155. mulNode.AddInput(s, i)
  156. }
  157. for i := 0; i < p.EC.N; i++ {
  158. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  159. Stream: mulNode.NewOutput(i),
  160. DataIndex: i,
  161. })
  162. }
  163. joinNode := ctx.DAG.NewChunkedJoin(p.EC.ChunkSize)
  164. for i := 0; i < p.EC.K; i++ {
  165. // 不可能找不到流
  166. joinNode.AddInput(p.findOutputStream(ctx, i))
  167. }
  168. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  169. Stream: joinNode.Joined(),
  170. DataIndex: -1,
  171. })
  172. }
  173. // 为每一个To找到一个输入流
  174. for _, to := range ctx.Ft.Toes {
  175. toNode, err := p.buildToNode(ctx, to)
  176. if err != nil {
  177. return err
  178. }
  179. ctx.ToNodes[to] = toNode
  180. str := p.findOutputStream(ctx, to.GetDataIndex())
  181. if str == nil {
  182. return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex())
  183. }
  184. toNode.SetInput(str)
  185. }
  186. return nil
  187. }
  188. func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) {
  189. var repRange exec.Range
  190. var blkRange exec.Range
  191. repRange.Offset = ctx.StreamRange.Offset
  192. blkRange.Offset = ctx.StreamRange.Offset / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize)
  193. if ctx.StreamRange.Length != nil {
  194. repRngLen := *ctx.StreamRange.Length
  195. repRange.Length = &repRngLen
  196. blkRngLen := *ctx.StreamRange.Length / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize)
  197. blkRange.Length = &blkRngLen
  198. }
  199. switch f := f.(type) {
  200. case *ioswitch2.FromShardstore:
  201. t := ctx.DAG.NewShardRead(f.Storage.StorageID, types.NewOpen(f.FileHash))
  202. if f.DataIndex == -1 {
  203. t.Open.WithNullableLength(repRange.Offset, repRange.Length)
  204. } else {
  205. t.Open.WithNullableLength(blkRange.Offset, blkRange.Length)
  206. }
  207. switch addr := f.Hub.Address.(type) {
  208. case *cdssdk.HttpAddressInfo:
  209. t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.Hub})
  210. t.Env().Pinned = true
  211. case *cdssdk.GRPCAddressInfo:
  212. t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: f.Hub, Address: *addr})
  213. t.Env().Pinned = true
  214. default:
  215. return nil, fmt.Errorf("unsupported node address type %T", addr)
  216. }
  217. return t, nil
  218. case *ioswitch2.FromDriver:
  219. n := ctx.DAG.NewFromDriver(f.Handle)
  220. n.Env().ToEnvDriver()
  221. n.Env().Pinned = true
  222. if f.DataIndex == -1 {
  223. f.Handle.RangeHint.Offset = repRange.Offset
  224. f.Handle.RangeHint.Length = repRange.Length
  225. } else {
  226. f.Handle.RangeHint.Offset = blkRange.Offset
  227. f.Handle.RangeHint.Length = blkRange.Length
  228. }
  229. return n, nil
  230. default:
  231. return nil, fmt.Errorf("unsupported from type %T", f)
  232. }
  233. }
  234. func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) {
  235. switch t := t.(type) {
  236. case *ioswitch2.ToShardStore:
  237. n := ctx.DAG.NewShardWrite(t.Storage.StorageID, t.FileHashStoreKey)
  238. switch addr := t.Hub.Address.(type) {
  239. case *cdssdk.HttpAddressInfo:
  240. n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: t.Hub})
  241. case *cdssdk.GRPCAddressInfo:
  242. n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: t.Hub, Address: *addr})
  243. default:
  244. return nil, fmt.Errorf("unsupported node address type %T", addr)
  245. }
  246. n.Env().Pinned = true
  247. return n, nil
  248. case *ioswitch2.ToDriver:
  249. n := ctx.DAG.NewToDriver(t.Handle)
  250. n.Env().ToEnvDriver()
  251. n.Env().Pinned = true
  252. return n, nil
  253. default:
  254. return nil, fmt.Errorf("unsupported to type %T", t)
  255. }
  256. }
  257. // 删除输出流未被使用的Join指令
  258. func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
  259. changed := false
  260. dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool {
  261. if node.InputStreams().Len() > 0 {
  262. return true
  263. }
  264. node.RemoveAllInputs()
  265. ctx.DAG.RemoveNode(node)
  266. return true
  267. })
  268. return changed
  269. }
  270. // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
  271. func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
  272. changed := false
  273. dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool {
  274. outArr := node.OutputStreams().RawArray()
  275. for i2, out := range outArr {
  276. if out.To().Len() > 0 {
  277. continue
  278. }
  279. outArr[i2] = nil
  280. node.OutputIndexes[i2] = -2
  281. changed = true
  282. }
  283. node.OutputStreams().SetRawArray(lo2.RemoveAllDefault(outArr))
  284. node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2)
  285. // 如果所有输出流都被删除,则删除该指令
  286. if node.OutputStreams().Len() == 0 {
  287. node.RemoveAllInputs()
  288. ctx.DAG.RemoveNode(node)
  289. changed = true
  290. }
  291. return true
  292. })
  293. return changed
  294. }
  295. // 删除未使用的Split指令
  296. func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
  297. changed := false
  298. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool {
  299. // Split出来的每一个流都没有被使用,才能删除这个指令
  300. for _, out := range typ.OutputStreams().RawArray() {
  301. if out.To().Len() > 0 {
  302. return true
  303. }
  304. }
  305. typ.Clear()
  306. ctx.DAG.RemoveNode(typ)
  307. changed = true
  308. return true
  309. })
  310. return changed
  311. }
  312. // 如果Split的结果被完全用于Join,则省略Split和Join指令
  313. func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
  314. changed := false
  315. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool {
  316. // Split指令的每一个输出都有且只有一个目的地
  317. var dstNode dag.Node
  318. for _, out := range splitNode.OutputStreams().RawArray() {
  319. if out.To().Len() != 1 {
  320. return true
  321. }
  322. if dstNode == nil {
  323. dstNode = out.To().Get(0).Node
  324. } else if dstNode != out.To().Get(0).Node {
  325. return true
  326. }
  327. }
  328. if dstNode == nil {
  329. return true
  330. }
  331. // 且这个目的地要是一个Join指令
  332. joinNode, ok := dstNode.(*ops2.ChunkedJoinNode)
  333. if !ok {
  334. return true
  335. }
  336. // 同时这个Join指令的输入也必须全部来自Split指令的输出。
  337. // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可
  338. if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() {
  339. return true
  340. }
  341. // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
  342. // F->Split->Join->T 变换为:F->T
  343. splitInput := splitNode.InputStreams().Get(0)
  344. for _, to := range joinNode.Joined().To().RawArray() {
  345. splitInput.StreamTo(to.Node, to.SlotIndex)
  346. }
  347. splitInput.StreamNotTo(splitNode, 0)
  348. // 并删除这两个指令
  349. ctx.DAG.RemoveNode(joinNode)
  350. ctx.DAG.RemoveNode(splitNode)
  351. changed = true
  352. return true
  353. })
  354. return changed
  355. }
  356. // 通过流的输入输出位置来确定指令的执行位置。
  357. // To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG,
  358. // 所以理论上不会出现有指令的位置始终无法确定的情况。
  359. func (p *DefaultParser) pin(ctx *ParseContext) bool {
  360. changed := false
  361. ctx.DAG.Walk(func(node dag.Node) bool {
  362. if node.Env().Pinned {
  363. return true
  364. }
  365. var toEnv *dag.NodeEnv
  366. for _, out := range node.OutputStreams().RawArray() {
  367. for _, to := range out.To().RawArray() {
  368. if to.Node.Env().Type == dag.EnvUnknown {
  369. continue
  370. }
  371. if toEnv == nil {
  372. toEnv = to.Node.Env()
  373. } else if !toEnv.Equals(to.Node.Env()) {
  374. toEnv = nil
  375. break
  376. }
  377. }
  378. }
  379. if toEnv != nil {
  380. if !node.Env().Equals(toEnv) {
  381. changed = true
  382. }
  383. *node.Env() = *toEnv
  384. return true
  385. }
  386. // 否则根据输入流的始发地来固定
  387. var fromEnv *dag.NodeEnv
  388. for _, in := range node.InputStreams().RawArray() {
  389. if in.From().Node.Env().Type == dag.EnvUnknown {
  390. continue
  391. }
  392. if fromEnv == nil {
  393. fromEnv = in.From().Node.Env()
  394. } else if !fromEnv.Equals(in.From().Node.Env()) {
  395. fromEnv = nil
  396. break
  397. }
  398. }
  399. if fromEnv != nil {
  400. if !node.Env().Equals(fromEnv) {
  401. changed = true
  402. }
  403. *node.Env() = *fromEnv
  404. }
  405. return true
  406. })
  407. return changed
  408. }
  409. // 对于所有未使用的流,增加Drop指令
  410. func (p *DefaultParser) dropUnused(ctx *ParseContext) {
  411. ctx.DAG.Walk(func(node dag.Node) bool {
  412. for _, out := range node.OutputStreams().RawArray() {
  413. if out.To().Len() == 0 {
  414. n := ctx.DAG.NewDropStream()
  415. *n.Env() = *node.Env()
  416. n.SetInput(out)
  417. }
  418. }
  419. return true
  420. })
  421. }
  422. // 为IPFS写入指令存储结果
  423. func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
  424. dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool {
  425. if n.FileHashStoreKey == "" {
  426. return true
  427. }
  428. storeNode := ctx.DAG.NewStore()
  429. storeNode.Env().ToEnvDriver()
  430. storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
  431. return true
  432. })
  433. }
  434. // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
  435. func (p *DefaultParser) generateRange(ctx *ParseContext) {
  436. for i := 0; i < len(ctx.Ft.Toes); i++ {
  437. to := ctx.Ft.Toes[i]
  438. toNode := ctx.ToNodes[to]
  439. toDataIdx := to.GetDataIndex()
  440. toRng := to.GetRange()
  441. if toDataIdx == -1 {
  442. n := ctx.DAG.NewRange()
  443. toInput := toNode.Input()
  444. *n.Env() = *toInput.Var.From().Node.Env()
  445. rnged := n.RangeStream(toInput.Var, exec.Range{
  446. Offset: toRng.Offset - ctx.StreamRange.Offset,
  447. Length: toRng.Length,
  448. })
  449. toInput.Var.StreamNotTo(toNode, toInput.Index)
  450. toNode.SetInput(rnged)
  451. } else {
  452. stripSize := int64(p.EC.ChunkSize * p.EC.K)
  453. blkStartIdx := ctx.StreamRange.Offset / stripSize
  454. blkStart := blkStartIdx * int64(p.EC.ChunkSize)
  455. n := ctx.DAG.NewRange()
  456. toInput := toNode.Input()
  457. *n.Env() = *toInput.Var.From().Node.Env()
  458. rnged := n.RangeStream(toInput.Var, exec.Range{
  459. Offset: toRng.Offset - blkStart,
  460. Length: toRng.Length,
  461. })
  462. toInput.Var.StreamNotTo(toNode, toInput.Index)
  463. toNode.SetInput(rnged)
  464. }
  465. }
  466. }
  467. // 生成Clone指令
  468. func (p *DefaultParser) generateClone(ctx *ParseContext) {
  469. ctx.DAG.Walk(func(node dag.Node) bool {
  470. for _, out := range node.OutputStreams().RawArray() {
  471. if out.To().Len() <= 1 {
  472. continue
  473. }
  474. c := ctx.DAG.NewCloneStream()
  475. *c.Env() = *node.Env()
  476. for _, to := range out.To().RawArray() {
  477. c.NewOutput().StreamTo(to.Node, to.SlotIndex)
  478. }
  479. out.To().Resize(0)
  480. c.SetInput(out)
  481. }
  482. for _, out := range node.OutputValues().RawArray() {
  483. if out.To().Len() <= 1 {
  484. continue
  485. }
  486. t := ctx.DAG.NewCloneValue()
  487. *t.Env() = *node.Env()
  488. for _, to := range out.To().RawArray() {
  489. t.NewOutput().ValueTo(to.Node, to.SlotIndex)
  490. }
  491. out.To().Resize(0)
  492. t.SetInput(out)
  493. }
  494. return true
  495. })
  496. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。