You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

parser.go 15 kB

1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. package parser
  2. import (
  3. "fmt"
  4. "math"
  5. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
  8. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  9. "gitlink.org.cn/cloudream/common/utils/lo2"
  10. "gitlink.org.cn/cloudream/common/utils/math2"
  11. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
  14. )
  15. type IndexedStream struct {
  16. Stream *dag.Var
  17. DataIndex int
  18. }
  19. type ParseContext struct {
  20. Ft ioswitch2.FromTo
  21. DAG *ops2.GraphNodeBuilder
  22. // 为了产生所有To所需的数据范围,而需要From打开的范围。
  23. // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。
  24. ToNodes map[ioswitch2.To]ops2.ToNode
  25. IndexedStreams []IndexedStream
  26. StreamRange exec.Range
  27. EC cdssdk.ECRedundancy
  28. }
  29. func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder, ec cdssdk.ECRedundancy) error {
  30. ctx := ParseContext{
  31. Ft: ft,
  32. DAG: ops2.NewGraphNodeBuilder(),
  33. ToNodes: make(map[ioswitch2.To]ops2.ToNode),
  34. EC: ec,
  35. }
  36. // 分成两个阶段:
  37. // 1. 基于From和To生成更多指令,初步匹配to的需求
  38. // 计算一下打开流的范围
  39. calcStreamRange(&ctx)
  40. err := extend(&ctx)
  41. if err != nil {
  42. return err
  43. }
  44. // 2. 优化上一步生成的指令
  45. // 对于删除指令的优化,需要反复进行,直到没有变化为止。
  46. // 从目前实现上来说不会死循环
  47. for {
  48. opted := false
  49. if removeUnusedJoin(&ctx) {
  50. opted = true
  51. }
  52. if removeUnusedMultiplyOutput(&ctx) {
  53. opted = true
  54. }
  55. if removeUnusedSplit(&ctx) {
  56. opted = true
  57. }
  58. if omitSplitJoin(&ctx) {
  59. opted = true
  60. }
  61. if !opted {
  62. break
  63. }
  64. }
  65. // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
  66. for pin(&ctx) {
  67. }
  68. // 下面这些只需要执行一次,但需要按顺序
  69. dropUnused(&ctx)
  70. storeIPFSWriteResult(&ctx)
  71. generateClone(&ctx)
  72. generateRange(&ctx)
  73. return plan.Generate(ctx.DAG.Graph, blder)
  74. }
  75. func findOutputStream(ctx *ParseContext, streamIndex int) *dag.Var {
  76. var ret *dag.Var
  77. for _, s := range ctx.IndexedStreams {
  78. if s.DataIndex == streamIndex {
  79. ret = s.Stream
  80. break
  81. }
  82. }
  83. return ret
  84. }
  85. // 计算输入流的打开范围。会把流的范围按条带大小取整
  86. func calcStreamRange(ctx *ParseContext) {
  87. stripSize := int64(ctx.EC.ChunkSize * ctx.EC.K)
  88. rng := exec.Range{
  89. Offset: math.MaxInt64,
  90. }
  91. for _, to := range ctx.Ft.Toes {
  92. if to.GetDataIndex() == -1 {
  93. toRng := to.GetRange()
  94. rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
  95. if toRng.Length != nil {
  96. rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize))
  97. } else {
  98. rng.Length = nil
  99. }
  100. } else {
  101. toRng := to.GetRange()
  102. blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.EC.ChunkSize))
  103. rng.ExtendStart(blkStartIndex * stripSize)
  104. if toRng.Length != nil {
  105. blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.EC.ChunkSize))
  106. rng.ExtendEnd(blkEndIndex * stripSize)
  107. } else {
  108. rng.Length = nil
  109. }
  110. }
  111. }
  112. ctx.StreamRange = rng
  113. }
  114. func extend(ctx *ParseContext) error {
  115. for _, fr := range ctx.Ft.Froms {
  116. frNode, err := buildFromNode(ctx, fr)
  117. if err != nil {
  118. return err
  119. }
  120. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  121. Stream: frNode.Output().Var,
  122. DataIndex: fr.GetDataIndex(),
  123. })
  124. // 对于完整文件的From,生成Split指令
  125. if fr.GetDataIndex() == -1 {
  126. splitNode := ctx.DAG.NewChunkedSplit(ctx.EC.ChunkSize)
  127. splitNode.Split(frNode.Output().Var, ctx.EC.K)
  128. for i := 0; i < ctx.EC.K; i++ {
  129. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  130. Stream: splitNode.SubStream(i),
  131. DataIndex: i,
  132. })
  133. }
  134. }
  135. }
  136. // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
  137. ecInputStrs := make(map[int]*dag.Var)
  138. for _, s := range ctx.IndexedStreams {
  139. if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil {
  140. ecInputStrs[s.DataIndex] = s.Stream
  141. if len(ecInputStrs) == ctx.EC.K {
  142. break
  143. }
  144. }
  145. }
  146. if len(ecInputStrs) == ctx.EC.K {
  147. mulNode := ctx.DAG.NewECMultiply(ctx.EC)
  148. for i, s := range ecInputStrs {
  149. mulNode.AddInput(s, i)
  150. }
  151. for i := 0; i < ctx.EC.N; i++ {
  152. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  153. Stream: mulNode.NewOutput(i),
  154. DataIndex: i,
  155. })
  156. }
  157. joinNode := ctx.DAG.NewChunkedJoin(ctx.EC.ChunkSize)
  158. for i := 0; i < ctx.EC.K; i++ {
  159. // 不可能找不到流
  160. joinNode.AddInput(findOutputStream(ctx, i))
  161. }
  162. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  163. Stream: joinNode.Joined(),
  164. DataIndex: -1,
  165. })
  166. }
  167. // 为每一个To找到一个输入流
  168. for _, to := range ctx.Ft.Toes {
  169. toNode, err := buildToNode(ctx, to)
  170. if err != nil {
  171. return err
  172. }
  173. ctx.ToNodes[to] = toNode
  174. str := findOutputStream(ctx, to.GetDataIndex())
  175. if str == nil {
  176. return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex())
  177. }
  178. toNode.SetInput(str)
  179. }
  180. return nil
  181. }
  182. func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) {
  183. var repRange exec.Range
  184. var blkRange exec.Range
  185. repRange.Offset = ctx.StreamRange.Offset
  186. blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.EC.ChunkSize*ctx.EC.K) * int64(ctx.EC.ChunkSize)
  187. if ctx.StreamRange.Length != nil {
  188. repRngLen := *ctx.StreamRange.Length
  189. repRange.Length = &repRngLen
  190. blkRngLen := *ctx.StreamRange.Length / int64(ctx.EC.ChunkSize*ctx.EC.K) * int64(ctx.EC.ChunkSize)
  191. blkRange.Length = &blkRngLen
  192. }
  193. switch f := f.(type) {
  194. case *ioswitch2.FromShardstore:
  195. t := ctx.DAG.NewShardRead(f.Storage.StorageID, types.NewOpen(f.FileHash))
  196. if f.DataIndex == -1 {
  197. t.Open.WithNullableLength(repRange.Offset, repRange.Length)
  198. } else {
  199. t.Open.WithNullableLength(blkRange.Offset, blkRange.Length)
  200. }
  201. switch addr := f.Hub.Address.(type) {
  202. case *cdssdk.HttpAddressInfo:
  203. t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.Hub})
  204. t.Env().Pinned = true
  205. case *cdssdk.GRPCAddressInfo:
  206. t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: f.Hub, Address: *addr})
  207. t.Env().Pinned = true
  208. default:
  209. return nil, fmt.Errorf("unsupported node address type %T", addr)
  210. }
  211. return t, nil
  212. case *ioswitch2.FromDriver:
  213. n := ctx.DAG.NewFromDriver(f.Handle)
  214. n.Env().ToEnvDriver()
  215. n.Env().Pinned = true
  216. if f.DataIndex == -1 {
  217. f.Handle.RangeHint.Offset = repRange.Offset
  218. f.Handle.RangeHint.Length = repRange.Length
  219. } else {
  220. f.Handle.RangeHint.Offset = blkRange.Offset
  221. f.Handle.RangeHint.Length = blkRange.Length
  222. }
  223. return n, nil
  224. default:
  225. return nil, fmt.Errorf("unsupported from type %T", f)
  226. }
  227. }
  228. func buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) {
  229. switch t := t.(type) {
  230. case *ioswitch2.ToShardStore:
  231. n := ctx.DAG.NewShardWrite(t.Storage.StorageID, t.FileHashStoreKey)
  232. if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
  233. return nil, err
  234. }
  235. n.Env().Pinned = true
  236. return n, nil
  237. case *ioswitch2.ToDriver:
  238. n := ctx.DAG.NewToDriver(t.Handle)
  239. n.Env().ToEnvDriver()
  240. n.Env().Pinned = true
  241. return n, nil
  242. case *ioswitch2.LoadToShared:
  243. n := ctx.DAG.NewSharedLoad(t.Storage.StorageID, t.UserID, t.PackageID, t.Path)
  244. if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
  245. return nil, err
  246. }
  247. n.Env().Pinned = true
  248. return n, nil
  249. default:
  250. return nil, fmt.Errorf("unsupported to type %T", t)
  251. }
  252. }
  253. func setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error {
  254. switch addr := addr.(type) {
  255. case *cdssdk.HttpAddressInfo:
  256. n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub})
  257. case *cdssdk.GRPCAddressInfo:
  258. n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: hub, Address: *addr})
  259. default:
  260. return fmt.Errorf("unsupported node address type %T", addr)
  261. }
  262. return nil
  263. }
  264. // 删除输出流未被使用的Join指令
  265. func removeUnusedJoin(ctx *ParseContext) bool {
  266. changed := false
  267. dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool {
  268. if node.InputStreams().Len() > 0 {
  269. return true
  270. }
  271. node.RemoveAllInputs()
  272. ctx.DAG.RemoveNode(node)
  273. return true
  274. })
  275. return changed
  276. }
  277. // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
  278. func removeUnusedMultiplyOutput(ctx *ParseContext) bool {
  279. changed := false
  280. dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool {
  281. outArr := node.OutputStreams().RawArray()
  282. for i2, out := range outArr {
  283. if out.To().Len() > 0 {
  284. continue
  285. }
  286. outArr[i2] = nil
  287. node.OutputIndexes[i2] = -2
  288. changed = true
  289. }
  290. node.OutputStreams().SetRawArray(lo2.RemoveAllDefault(outArr))
  291. node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2)
  292. // 如果所有输出流都被删除,则删除该指令
  293. if node.OutputStreams().Len() == 0 {
  294. node.RemoveAllInputs()
  295. ctx.DAG.RemoveNode(node)
  296. changed = true
  297. }
  298. return true
  299. })
  300. return changed
  301. }
  302. // 删除未使用的Split指令
  303. func removeUnusedSplit(ctx *ParseContext) bool {
  304. changed := false
  305. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool {
  306. // Split出来的每一个流都没有被使用,才能删除这个指令
  307. for _, out := range typ.OutputStreams().RawArray() {
  308. if out.To().Len() > 0 {
  309. return true
  310. }
  311. }
  312. typ.Clear()
  313. ctx.DAG.RemoveNode(typ)
  314. changed = true
  315. return true
  316. })
  317. return changed
  318. }
  319. // 如果Split的结果被完全用于Join,则省略Split和Join指令
  320. func omitSplitJoin(ctx *ParseContext) bool {
  321. changed := false
  322. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool {
  323. // Split指令的每一个输出都有且只有一个目的地
  324. var dstNode dag.Node
  325. for _, out := range splitNode.OutputStreams().RawArray() {
  326. if out.To().Len() != 1 {
  327. return true
  328. }
  329. if dstNode == nil {
  330. dstNode = out.To().Get(0).Node
  331. } else if dstNode != out.To().Get(0).Node {
  332. return true
  333. }
  334. }
  335. if dstNode == nil {
  336. return true
  337. }
  338. // 且这个目的地要是一个Join指令
  339. joinNode, ok := dstNode.(*ops2.ChunkedJoinNode)
  340. if !ok {
  341. return true
  342. }
  343. // 同时这个Join指令的输入也必须全部来自Split指令的输出。
  344. // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可
  345. if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() {
  346. return true
  347. }
  348. // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
  349. // F->Split->Join->T 变换为:F->T
  350. splitInput := splitNode.InputStreams().Get(0)
  351. for _, to := range joinNode.Joined().To().RawArray() {
  352. splitInput.StreamTo(to.Node, to.SlotIndex)
  353. }
  354. splitInput.StreamNotTo(splitNode, 0)
  355. // 并删除这两个指令
  356. ctx.DAG.RemoveNode(joinNode)
  357. ctx.DAG.RemoveNode(splitNode)
  358. changed = true
  359. return true
  360. })
  361. return changed
  362. }
  363. // 通过流的输入输出位置来确定指令的执行位置。
  364. // To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG,
  365. // 所以理论上不会出现有指令的位置始终无法确定的情况。
  366. func pin(ctx *ParseContext) bool {
  367. changed := false
  368. ctx.DAG.Walk(func(node dag.Node) bool {
  369. if node.Env().Pinned {
  370. return true
  371. }
  372. var toEnv *dag.NodeEnv
  373. for _, out := range node.OutputStreams().RawArray() {
  374. for _, to := range out.To().RawArray() {
  375. if to.Node.Env().Type == dag.EnvUnknown {
  376. continue
  377. }
  378. if toEnv == nil {
  379. toEnv = to.Node.Env()
  380. } else if !toEnv.Equals(to.Node.Env()) {
  381. toEnv = nil
  382. break
  383. }
  384. }
  385. }
  386. if toEnv != nil {
  387. if !node.Env().Equals(toEnv) {
  388. changed = true
  389. }
  390. *node.Env() = *toEnv
  391. return true
  392. }
  393. // 否则根据输入流的始发地来固定
  394. var fromEnv *dag.NodeEnv
  395. for _, in := range node.InputStreams().RawArray() {
  396. if in.From().Node.Env().Type == dag.EnvUnknown {
  397. continue
  398. }
  399. if fromEnv == nil {
  400. fromEnv = in.From().Node.Env()
  401. } else if !fromEnv.Equals(in.From().Node.Env()) {
  402. fromEnv = nil
  403. break
  404. }
  405. }
  406. if fromEnv != nil {
  407. if !node.Env().Equals(fromEnv) {
  408. changed = true
  409. }
  410. *node.Env() = *fromEnv
  411. }
  412. return true
  413. })
  414. return changed
  415. }
  416. // 对于所有未使用的流,增加Drop指令
  417. func dropUnused(ctx *ParseContext) {
  418. ctx.DAG.Walk(func(node dag.Node) bool {
  419. for _, out := range node.OutputStreams().RawArray() {
  420. if out.To().Len() == 0 {
  421. n := ctx.DAG.NewDropStream()
  422. *n.Env() = *node.Env()
  423. n.SetInput(out)
  424. }
  425. }
  426. return true
  427. })
  428. }
  429. // 为IPFS写入指令存储结果
  430. func storeIPFSWriteResult(ctx *ParseContext) {
  431. dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool {
  432. if n.FileHashStoreKey == "" {
  433. return true
  434. }
  435. storeNode := ctx.DAG.NewStore()
  436. storeNode.Env().ToEnvDriver()
  437. storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
  438. return true
  439. })
  440. }
  441. // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
  442. func generateRange(ctx *ParseContext) {
  443. for i := 0; i < len(ctx.Ft.Toes); i++ {
  444. to := ctx.Ft.Toes[i]
  445. toNode := ctx.ToNodes[to]
  446. toDataIdx := to.GetDataIndex()
  447. toRng := to.GetRange()
  448. if toDataIdx == -1 {
  449. n := ctx.DAG.NewRange()
  450. toInput := toNode.Input()
  451. *n.Env() = *toInput.Var.From().Node.Env()
  452. rnged := n.RangeStream(toInput.Var, exec.Range{
  453. Offset: toRng.Offset - ctx.StreamRange.Offset,
  454. Length: toRng.Length,
  455. })
  456. toInput.Var.StreamNotTo(toNode, toInput.Index)
  457. toNode.SetInput(rnged)
  458. } else {
  459. stripSize := int64(ctx.EC.ChunkSize * ctx.EC.K)
  460. blkStartIdx := ctx.StreamRange.Offset / stripSize
  461. blkStart := blkStartIdx * int64(ctx.EC.ChunkSize)
  462. n := ctx.DAG.NewRange()
  463. toInput := toNode.Input()
  464. *n.Env() = *toInput.Var.From().Node.Env()
  465. rnged := n.RangeStream(toInput.Var, exec.Range{
  466. Offset: toRng.Offset - blkStart,
  467. Length: toRng.Length,
  468. })
  469. toInput.Var.StreamNotTo(toNode, toInput.Index)
  470. toNode.SetInput(rnged)
  471. }
  472. }
  473. }
  474. // 生成Clone指令
  475. func generateClone(ctx *ParseContext) {
  476. ctx.DAG.Walk(func(node dag.Node) bool {
  477. for _, out := range node.OutputStreams().RawArray() {
  478. if out.To().Len() <= 1 {
  479. continue
  480. }
  481. c := ctx.DAG.NewCloneStream()
  482. *c.Env() = *node.Env()
  483. for _, to := range out.To().RawArray() {
  484. c.NewOutput().StreamTo(to.Node, to.SlotIndex)
  485. }
  486. out.To().Resize(0)
  487. c.SetInput(out)
  488. }
  489. for _, out := range node.OutputValues().RawArray() {
  490. if out.To().Len() <= 1 {
  491. continue
  492. }
  493. t := ctx.DAG.NewCloneValue()
  494. *t.Env() = *node.Env()
  495. for _, to := range out.To().RawArray() {
  496. t.NewOutput().ValueTo(to.Node, to.SlotIndex)
  497. }
  498. out.To().Resize(0)
  499. t.SetInput(out)
  500. }
  501. return true
  502. })
  503. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。