diff --git a/common/pkgs/ioswitch/dag/graph.go b/common/pkgs/ioswitch/dag/graph.go new file mode 100644 index 0000000..811a6fe --- /dev/null +++ b/common/pkgs/ioswitch/dag/graph.go @@ -0,0 +1,74 @@ +package dag + +import ( + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type Graph[NP any, VP any] struct { + Nodes []*Node[NP, VP] + isWalking bool + nextVarID int +} + +func NewGraph[NP any, VP any]() *Graph[NP, VP] { + return &Graph[NP, VP]{} +} + +func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] { + n := &Node[NP, VP]{ + Type: typ, + Props: props, + Graph: g, + } + typ.InitNode(n) + g.Nodes = append(g.Nodes, n) + return n +} + +func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) { + for i, n := range g.Nodes { + if n == node { + if g.isWalking { + g.Nodes[i] = nil + } else { + g.Nodes = lo2.RemoveAt(g.Nodes, i) + } + break + } + } +} + +func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) { + g.isWalking = true + for i := 0; i < len(g.Nodes); i++ { + if g.Nodes[i] == nil { + continue + } + + if !cb(g.Nodes[i]) { + break + } + } + g.isWalking = false + + g.Nodes = lo2.RemoveAllDefault(g.Nodes) +} + +func (g *Graph[NP, VP]) genVarID() int { + g.nextVarID++ + return g.nextVarID +} + +func NewNode[NP any, VP any, NT NodeType[NP, VP]](graph *Graph[NP, VP], typ NT, props NP) (*Node[NP, VP], NT) { + return graph.NewNode(typ, props), typ +} + +func WalkOnlyType[N NodeType[NP, VP], NP any, VP any](g *Graph[NP, VP], cb func(node *Node[NP, VP], typ N) bool) { + g.Walk(func(node *Node[NP, VP]) bool { + typ, ok := node.Type.(N) + if ok { + return cb(node, typ) + } + return true + }) +} diff --git a/common/pkgs/ioswitch/dag/node.go b/common/pkgs/ioswitch/dag/node.go new file mode 100644 index 0000000..5e5dd1f --- /dev/null +++ b/common/pkgs/ioswitch/dag/node.go @@ -0,0 +1,68 @@ +package dag + +import ( + "fmt" + + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" +) + +type NodeType[NP any, VP any] interface { + InitNode(node *Node[NP, VP]) + String(node *Node[NP, VP]) string + GenerateOp(node *Node[NP, VP], blder *exec.PlanBuilder) error +} + +type NodeEnvType string + +const ( + EnvUnknown NodeEnvType = "" + EnvExecutor NodeEnvType = "Executor" + EnvWorker NodeEnvType = "Worker" +) + +type NodeEnv struct { + Type NodeEnvType + Worker exec.Worker +} + +func (e *NodeEnv) ToEnvUnknown() { + e.Type = EnvUnknown + e.Worker = nil +} + +func (e *NodeEnv) ToEnvExecutor() { + e.Type = EnvExecutor + e.Worker = nil +} + +func (e *NodeEnv) ToEnvWorker(worker exec.Worker) { + e.Type = EnvWorker + e.Worker = worker +} + +func (e *NodeEnv) Equals(other NodeEnv) bool { + if e.Type != other.Type { + return false + } + + if e.Type != EnvWorker { + return true + } + + return e.Worker.Equals(other.Worker) +} + +type Node[NP any, VP any] struct { + Type NodeType[NP, VP] + Env NodeEnv + Props NP + InputStreams []*StreamVar[NP, VP] + OutputStreams []*StreamVar[NP, VP] + InputValues []*ValueVar[NP, VP] + OutputValues []*ValueVar[NP, VP] + Graph *Graph[NP, VP] +} + +func (n *Node[NP, VP]) String() string { + return fmt.Sprintf("%v", n.Type.String(n)) +} diff --git a/common/pkgs/ioswitch/dag/var.go b/common/pkgs/ioswitch/dag/var.go new file mode 100644 index 0000000..e1caa7b --- /dev/null +++ b/common/pkgs/ioswitch/dag/var.go @@ -0,0 +1,112 @@ +package dag + +import "gitlink.org.cn/cloudream/common/utils/lo2" + +type EndPoint[NP any, VP any] struct { + Node *Node[NP, VP] + SlotIndex int // 所连接的Node的Output或Input数组的索引 +} + +type StreamVar[NP any, VP any] struct { + ID int + From EndPoint[NP, VP] + Toes []EndPoint[NP, VP] + Props VP +} + +func (v *StreamVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { + v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) + to.InputStreams[slotIdx] = v + return len(v.Toes) - 1 +} + +// func (v *StreamVar[NP, VP]) NotTo(toIdx int) EndPoint[NP, VP] { +// ed := v.Toes[toIdx] +// lo2.RemoveAt(v.Toes, toIdx) +// ed.Node.InputStreams[ed.SlotIndex] = nil +// return ed +// } + +func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) { + for i, ed := range v.Toes { + if ed.Node == node { + v.Toes = lo2.RemoveAt(v.Toes, i) + ed.Node.InputStreams[ed.SlotIndex] = nil + return ed, true + } + } + + return EndPoint[NP, VP]{}, false +} + +func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []EndPoint[NP, VP] { + var newToes []EndPoint[NP, VP] + var rmed []EndPoint[NP, VP] + for _, ed := range v.Toes { + if pred(ed) { + ed.Node.InputStreams[ed.SlotIndex] = nil + rmed = append(rmed, ed) + } else { + newToes = append(newToes, ed) + } + } + v.Toes = newToes + return rmed +} + +func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] { + for _, ed := range v.Toes { + ed.Node.InputStreams[ed.SlotIndex] = nil + } + toes := v.Toes + v.Toes = nil + return toes +} + +func NodeNewOutputStream[NP any, VP any](node *Node[NP, VP], props VP) *StreamVar[NP, VP] { + str := &StreamVar[NP, VP]{ + ID: node.Graph.genVarID(), + From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, + Props: props, + } + node.OutputStreams = append(node.OutputStreams, str) + return str +} + +func NodeDeclareInputStream[NP any, VP any](node *Node[NP, VP], cnt int) { + node.InputStreams = make([]*StreamVar[NP, VP], cnt) +} + +type ValueVarType int + +const ( + StringValueVar ValueVarType = iota + SignalValueVar +) + +type ValueVar[NP any, VP any] struct { + ID int + From EndPoint[NP, VP] + Toes []EndPoint[NP, VP] + Props VP +} + +func (v *ValueVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { + v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) + to.InputValues[slotIdx] = v + return len(v.Toes) - 1 +} + +func NodeNewOutputValue[NP any, VP any](node *Node[NP, VP], props VP) *ValueVar[NP, VP] { + val := &ValueVar[NP, VP]{ + ID: node.Graph.genVarID(), + From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, + Props: props, + } + node.OutputValues = append(node.OutputValues, val) + return val +} + +func NodeDeclareInputValue[NP any, VP any](node *Node[NP, VP], cnt int) { + node.InputValues = make([]*ValueVar[NP, VP], cnt) +} diff --git a/common/pkgs/ioswitch/exec/exec.go b/common/pkgs/ioswitch/exec/exec.go new file mode 100644 index 0000000..9c4000f --- /dev/null +++ b/common/pkgs/ioswitch/exec/exec.go @@ -0,0 +1,8 @@ +package exec + +type Worker interface { + // 获取连接到这个worker的GRPC服务的地址 + GetAddress() string + // 判断两个worker是否相同 + Equals(worker Worker) bool +} diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/exec/executor.go similarity index 99% rename from common/pkgs/ioswitch/plans/executor.go rename to common/pkgs/ioswitch/exec/executor.go index 7321a16..4a11160 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/exec/executor.go @@ -1,4 +1,4 @@ -package plans +package exec import ( "context" diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/exec/plan_builder.go similarity index 99% rename from common/pkgs/ioswitch/plans/plan_builder.go rename to common/pkgs/ioswitch/exec/plan_builder.go index c3656ac..ad2ae81 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/exec/plan_builder.go @@ -1,4 +1,4 @@ -package plans +package exec import ( "context" diff --git a/common/pkgs/ioswitch/plans/utils.go b/common/pkgs/ioswitch/exec/utils.go similarity index 92% rename from common/pkgs/ioswitch/plans/utils.go rename to common/pkgs/ioswitch/exec/utils.go index 8cf7ad7..3124af6 100644 --- a/common/pkgs/ioswitch/plans/utils.go +++ b/common/pkgs/ioswitch/exec/utils.go @@ -1,4 +1,4 @@ -package plans +package exec import ( "github.com/google/uuid" diff --git a/common/pkgs/ioswitch/ops/grpc.go b/common/pkgs/ioswitch/ops/grpc.go index 28d688a..4455486 100644 --- a/common/pkgs/ioswitch/ops/grpc.go +++ b/common/pkgs/ioswitch/ops/grpc.go @@ -45,7 +45,7 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { type GetStream struct { Signal *ioswitch.SignalVar `json:"signal"` - Get *ioswitch.StreamVar `json:"get"` + Target *ioswitch.StreamVar `json:"target"` Output *ioswitch.StreamVar `json:"output"` Node cdssdk.Node `json:"node"` } @@ -57,9 +57,9 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) - logger.Debugf("getting stream %v as %v from node %v", o.Get.ID, o.Output.ID, o.Node) + logger.Debugf("getting stream %v as %v from node %v", o.Target.ID, o.Output.ID, o.Node) - str, err := agtCli.GetStream(sw.Plan().ID, o.Get.ID, o.Signal) + str, err := agtCli.GetStream(sw.Plan().ID, o.Target.ID, o.Signal) if err != nil { return fmt.Errorf("getting stream: %w", err) } @@ -105,7 +105,7 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { type GetVar struct { Signal *ioswitch.SignalVar `json:"signal"` - Get ioswitch.Var `json:"get"` + Target ioswitch.Var `json:"target"` Output ioswitch.Var `json:"output"` Node cdssdk.Node `json:"node"` } @@ -117,9 +117,9 @@ func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) - logger.Debugf("getting var %v as %v from node %v", o.Get.GetID(), o.Output.GetID(), o.Node) + logger.Debugf("getting var %v as %v from node %v", o.Target.GetID(), o.Output.GetID(), o.Node) - v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Get, o.Signal) + v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Target, o.Signal) if err != nil { return fmt.Errorf("getting var: %w", err) } diff --git a/common/pkgs/ioswitch/plans/agent.go b/common/pkgs/ioswitch/plans/agent.go deleted file mode 100644 index 3eec7d5..0000000 --- a/common/pkgs/ioswitch/plans/agent.go +++ /dev/null @@ -1,446 +0,0 @@ -package plans - -/* -func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *AgentStreamVar { - opt := ipfs.ReadOption{ - Offset: 0, - Length: -1, - } - if len(opts) > 0 { - opt = opts[0] - } - - str := &AgentStreamVar{ - owner: b, - v: b.blder.NewStreamVar(), - } - - b.Ops = append(b.Ops, &ops.IPFSRead{ - Output: str.v, - FileHash: fileHash, - Option: opt, - }) - - return str -} -func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStreamVar { - agtStr := &AgentStreamVar{ - owner: b, - v: b.blder.NewStreamVar(), - } - - b.Ops = append(b.Ops, &ops.FileRead{ - Output: agtStr.v, - FilePath: filePath, - }) - - return agtStr -} - -func (b *AgentPlanBuilder) ECReconstructAny(ec cdssdk.ECRedundancy, inBlockIndexes []int, outBlockIndexes []int, streams []*AgentStreamVar) []*AgentStreamVar { - var strs []*AgentStreamVar - - var inputStrVars []*ioswitch.StreamVar - for _, str := range streams { - inputStrVars = append(inputStrVars, str.v) - } - - var outputStrVars []*ioswitch.StreamVar - for i := 0; i < len(outBlockIndexes); i++ { - v := b.blder.NewStreamVar() - strs = append(strs, &AgentStreamVar{ - owner: b, - v: v, - }) - outputStrVars = append(outputStrVars, v) - } - - b.Ops = append(b.Ops, &ops.ECReconstructAny{ - EC: ec, - Inputs: inputStrVars, - Outputs: outputStrVars, - InputBlockIndexes: inBlockIndexes, - OutputBlockIndexes: outBlockIndexes, - }) - - return strs -} - -func (b *AgentPlanBuilder) ECReconstruct(ec cdssdk.ECRedundancy, inBlockIndexes []int, streams []*AgentStreamVar) []*AgentStreamVar { - var strs []*AgentStreamVar - - var inputStrVars []*ioswitch.StreamVar - for _, str := range streams { - inputStrVars = append(inputStrVars, str.v) - } - - var outputStrVars []*ioswitch.StreamVar - for i := 0; i < ec.K; i++ { - v := b.blder.NewStreamVar() - strs = append(strs, &AgentStreamVar{ - owner: b, - v: v, - }) - outputStrVars = append(outputStrVars, v) - } - - b.Ops = append(b.Ops, &ops.ECReconstruct{ - EC: ec, - Inputs: inputStrVars, - Outputs: outputStrVars, - InputBlockIndexes: inBlockIndexes, - }) - - return strs -} - -// 进行galois矩阵乘法运算,ecof * inputs -func (b *AgentPlanBuilder) ECMultiply(coef [][]byte, inputs []*AgentStreamVar, chunkSize int64) []*AgentStreamVar { - outs := make([]*AgentStreamVar, len(coef)) - outVars := make([]*ioswitch.StreamVar, len(coef)) - for i := 0; i < len(outs); i++ { - sv := b.blder.NewStreamVar() - outs[i] = &AgentStreamVar{ - owner: b, - v: sv, - } - outVars[i] = sv - } - - ins := make([]*ioswitch.StreamVar, len(inputs)) - for i := 0; i < len(inputs); i++ { - ins[i] = inputs[i].v - } - - b.Ops = append(b.Ops, &ops.ECMultiply{ - Inputs: ins, - Outputs: outVars, - Coef: coef, - ChunkSize: chunkSize, - }) - - return outs -} - -func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentStreamVar { - agtStr := &AgentStreamVar{ - owner: b, - v: b.blder.NewStreamVar(), - } - - var inputStrVars []*ioswitch.StreamVar - for _, str := range streams { - inputStrVars = append(inputStrVars, str.v) - } - - b.Ops = append(b.Ops, &ops.Join{ - Inputs: inputStrVars, - Output: agtStr.v, - Length: length, - }) - - return agtStr -} - -func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) *AgentStreamVar { - agtStr := &AgentStreamVar{ - owner: b, - v: b.blder.NewStreamVar(), - } - - var inputStrVars []*ioswitch.StreamVar - for _, str := range streams { - inputStrVars = append(inputStrVars, str.v) - } - - b.Ops = append(b.Ops, &ops.ChunkedJoin{ - Inputs: inputStrVars, - Output: agtStr.v, - ChunkSize: chunkSize, - }) - - return agtStr -} - -func (b *AgentPlanBuilder) NewString(str string) *AgentStringVar { - v := b.blder.NewStringVar() - v.Value = str - - return &AgentStringVar{ - owner: b, - v: v, - } -} - -func (b *AgentPlanBuilder) NewSignal() *AgentSignalVar { - v := b.blder.NewSignalVar() - - return &AgentSignalVar{ - owner: b, - v: v, - } -} - -// 字节流变量 -type AgentStreamVar struct { - owner *AgentPlanBuilder - v *ioswitch.StreamVar -} - -func (s *AgentStreamVar) IPFSWrite() *AgentStringVar { - v := s.owner.blder.NewStringVar() - - s.owner.Ops = append(s.owner.Ops, &ops.IPFSWrite{ - Input: s.v, - FileHash: v, - }) - - return &AgentStringVar{ - owner: s.owner, - v: v, - } -} - -func (b *AgentStreamVar) FileWrite(filePath string) { - b.owner.Ops = append(b.owner.Ops, &ops.FileWrite{ - Input: b.v, - FilePath: filePath, - }) -} - -func (b *AgentStreamVar) ChunkedSplit(chunkSize int, streamCount int, paddingZeros bool) []*AgentStreamVar { - var strs []*AgentStreamVar - - var outputStrVars []*ioswitch.StreamVar - for i := 0; i < streamCount; i++ { - v := b.owner.blder.NewStreamVar() - strs = append(strs, &AgentStreamVar{ - owner: b.owner, - v: v, - }) - outputStrVars = append(outputStrVars, v) - } - - b.owner.Ops = append(b.owner.Ops, &ops.ChunkedSplit{ - Input: b.v, - Outputs: outputStrVars, - ChunkSize: chunkSize, - PaddingZeros: paddingZeros, - }) - - return strs -} - -func (s *AgentStreamVar) Length(length int64) *AgentStreamVar { - agtStr := &AgentStreamVar{ - owner: s.owner, - v: s.owner.blder.NewStreamVar(), - } - - s.owner.Ops = append(s.owner.Ops, &ops.Length{ - Input: s.v, - Output: agtStr.v, - Length: length, - }) - - return agtStr -} - -func (s *AgentStreamVar) To(node cdssdk.Node) *AgentStreamVar { - s.owner.Ops = append(s.owner.Ops, &ops.SendStream{Stream: s.v, Node: node}) - s.owner = s.owner.blder.AtAgent(node) - - return s -} - -func (s *AgentStreamVar) ToExecutor() *ExecutorStreamVar { - s.owner.blder.executorPlan.ops = append(s.owner.blder.executorPlan.ops, &ops.GetStream{ - Stream: s.v, - Node: s.owner.Node, - }) - - return &ExecutorStreamVar{ - blder: s.owner.blder, - v: s.v, - } -} - -func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar { - var strs []*AgentStreamVar - - var outputStrVars []*ioswitch.StreamVar - for i := 0; i < cnt; i++ { - v := s.owner.blder.NewStreamVar() - strs = append(strs, &AgentStreamVar{ - owner: s.owner, - v: v, - }) - outputStrVars = append(outputStrVars, v) - } - - s.owner.Ops = append(s.owner.Ops, &ops.CloneStream{ - Input: s.v, - Outputs: outputStrVars, - }) - - return strs -} - -// 当流产生时发送一个信号 -func (v *AgentStreamVar) OnBegin() (*AgentStreamVar, *AgentSignalVar) { - ns := v.owner.blder.NewStreamVar() - s := v.owner.blder.NewSignalVar() - - v.owner.Ops = append(v.owner.Ops, &ops.OnStreamBegin{ - Raw: v.v, - New: ns, - Signal: s, - }) - return &AgentStreamVar{owner: v.owner, v: ns}, &AgentSignalVar{owner: v.owner, v: s} -} - -// 当流结束时发送一个信号 -func (v *AgentStreamVar) OnEnd() (*AgentStreamVar, *AgentSignalVar) { - ns := v.owner.blder.NewStreamVar() - s := v.owner.blder.NewSignalVar() - - v.owner.Ops = append(v.owner.Ops, &ops.OnStreamEnd{ - Raw: v.v, - New: ns, - Signal: s, - }) - return &AgentStreamVar{owner: v.owner, v: ns}, &AgentSignalVar{owner: v.owner, v: s} -} - -// 将此流暂存,直到一个信号产生后才释放(一个新流) -func (v *AgentStreamVar) HoldUntil(wait *AgentSignalVar) *AgentStreamVar { - nv := v.owner.blder.NewStreamVar() - v.owner.Ops = append(v.owner.Ops, &ops.HoldUntil{ - Waits: []*ioswitch.SignalVar{wait.v}, - Holds: []ioswitch.Var{v.v}, - Emits: []ioswitch.Var{nv}, - }) - return &AgentStreamVar{owner: v.owner, v: nv} -} - -// 字符串变量 -type AgentStringVar struct { - owner *AgentPlanBuilder - v *ioswitch.StringVar -} - -func (v *AgentStringVar) To(node cdssdk.Node) *AgentStringVar { - v.owner.Ops = append(v.owner.Ops, &ops.SendVar{Var: v.v, Node: node}) - v.owner = v.owner.blder.AtAgent(node) - - return v -} - -func (v *AgentStringVar) ToExecutor() *ExecutorStringVar { - v.owner.blder.executorPlan.ops = append(v.owner.blder.executorPlan.ops, &ops.GetVar{ - Var: v.v, - Node: v.owner.Node, - }) - - return &ExecutorStringVar{ - blder: v.owner.blder, - v: v.v, - } -} - -func (v *AgentStringVar) Clone() (*AgentStringVar, *AgentStringVar) { - c1 := v.owner.blder.NewStringVar() - c2 := v.owner.blder.NewStringVar() - - v.owner.Ops = append(v.owner.Ops, &ops.CloneVar{ - Raw: v.v, - Cloneds: []ioswitch.Var{c1, c2}, - }) - - return &AgentStringVar{owner: v.owner, v: c1}, &AgentStringVar{owner: v.owner, v: c2} -} - -// 返回cnt+1个复制后的变量 -func (v *AgentStringVar) CloneN(cnt int) []*AgentStringVar { - var strs []*AgentStringVar - var cloned []ioswitch.Var - for i := 0; i < cnt+1; i++ { - c := v.owner.blder.NewStringVar() - strs = append(strs, &AgentStringVar{ - owner: v.owner, - v: c, - }) - cloned = append(cloned, c) - } - - v.owner.Ops = append(v.owner.Ops, &ops.CloneVar{ - Raw: v.v, - Cloneds: cloned, - }) - - return strs -} - -// 将此变量暂存,直到一个信号产生后才释放(一个新变量) -func (v *AgentStringVar) HoldUntil(wait *AgentSignalVar) *AgentStringVar { - nv := v.owner.blder.NewStringVar() - v.owner.Ops = append(v.owner.Ops, &ops.HoldUntil{ - Waits: []*ioswitch.SignalVar{wait.v}, - Holds: []ioswitch.Var{v.v}, - Emits: []ioswitch.Var{nv}, - }) - return &AgentStringVar{owner: v.owner, v: nv} -} - -type AgentIntVar struct { - owner *AgentPlanBuilder - v *ioswitch.IntVar -} - -// 信号变量 -type AgentSignalVar struct { - owner *AgentPlanBuilder - v *ioswitch.SignalVar -} - -func (v *AgentSignalVar) To(node cdssdk.Node) *AgentSignalVar { - v.owner.Ops = append(v.owner.Ops, &ops.SendVar{Var: v.v, Node: node}) - v.owner = v.owner.blder.AtAgent(node) - - return v -} - -func (v *AgentSignalVar) ToExecutor() *ExecutorSignalVar { - v.owner.blder.executorPlan.ops = append(v.owner.blder.executorPlan.ops, &ops.GetVar{ - Var: v.v, - Node: v.owner.Node, - }) - - return &ExecutorSignalVar{ - blder: v.owner.blder, - v: v.v, - } -} - -// 当这个信号被产生时,同时产生另外n个信号 -func (v *AgentSignalVar) Broadcast(cnt int) []*AgentSignalVar { - var ss []*AgentSignalVar - var targets []*ioswitch.SignalVar - - for i := 0; i < cnt; i++ { - c := v.owner.blder.NewSignalVar() - ss = append(ss, &AgentSignalVar{ - owner: v.owner, - v: c, - }) - targets = append(targets, c) - } - - v.owner.Ops = append(v.owner.Ops, &ops.Broadcast{ - Source: v.v, - Targets: targets, - }) - - return ss -} -*/ diff --git a/common/pkgs/ioswitch/plans/fromto.go b/common/pkgs/ioswitch/plans/fromto.go index ba48829..a7ff81d 100644 --- a/common/pkgs/ioswitch/plans/fromto.go +++ b/common/pkgs/ioswitch/plans/fromto.go @@ -3,6 +3,7 @@ package plans import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" ) type FromTo struct { @@ -124,51 +125,40 @@ func (f *FromExecutor) GetDataIndex() int { return f.DataIndex } -func (f *FromExecutor) BuildNode(ft *FromTo) Node { - op := Node{ - Env: &ExecutorEnv{}, - Type: &FromExecutorOp{ - Handle: f.Handle, - }, - } - op.NewOutputStream(f.DataIndex) - return op -} - -type FromNode struct { +type FromWorker struct { FileHash string Node *cdssdk.Node DataIndex int } -func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode { - return &FromNode{ +func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromWorker { + return &FromWorker{ FileHash: fileHash, Node: node, DataIndex: dataIndex, } } -func (f *FromNode) GetDataIndex() int { +func (f *FromWorker) GetDataIndex() int { return f.DataIndex } type ToExecutor struct { - Handle *ExecutorReadStream + Handle *exec.ExecutorReadStream DataIndex int Range Range } -func NewToExecutor(dataIndex int) (*ToExecutor, *ExecutorReadStream) { - str := ExecutorReadStream{} +func NewToExecutor(dataIndex int) (*ToExecutor, *exec.ExecutorReadStream) { + str := exec.ExecutorReadStream{} return &ToExecutor{ Handle: &str, DataIndex: dataIndex, }, &str } -func NewToExecutorWithRange(dataIndex int, rng Range) (*ToExecutor, *ExecutorReadStream) { - str := ExecutorReadStream{} +func NewToExecutorWithRange(dataIndex int, rng Range) (*ToExecutor, *exec.ExecutorReadStream) { + str := exec.ExecutorReadStream{} return &ToExecutor{ Handle: &str, DataIndex: dataIndex, diff --git a/common/pkgs/ioswitch/plans/ops.go b/common/pkgs/ioswitch/plans/ops.go index fec029b..410698c 100644 --- a/common/pkgs/ioswitch/plans/ops.go +++ b/common/pkgs/ioswitch/plans/ops.go @@ -6,223 +6,172 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" ) -type VarIndex int - -type StreamVar struct { - DataIndex int - From *Node - Toes []*Node - Var *ioswitch.StreamVar -} - -func (v *StreamVar) AddTo(to *Node) { - v.Toes = append(v.Toes, to) -} - -func (v *StreamVar) RemoveTo(to *Node) { - v.Toes = lo2.Remove(v.Toes, to) -} - -type ValueVarType int - -const ( - StringValueVar ValueVarType = iota - SignalValueVar -) - -type ValueVar struct { - Type ValueVarType - From *Node - Toes []*Node - Var ioswitch.Var -} - -func (v *ValueVar) AddTo(to *Node) { - v.Toes = append(v.Toes, to) -} - -func (v *ValueVar) RemoveTo(to *Node) { - v.Toes = lo2.Remove(v.Toes, to) -} - -type OpEnv interface { - Equals(env OpEnv) bool -} - -type AgentEnv struct { - Node cdssdk.Node -} - -func (e *AgentEnv) Equals(env OpEnv) bool { - if agentEnv, ok := env.(*AgentEnv); ok { - return e.Node.NodeID == agentEnv.Node.NodeID - } - return false -} - -type ExecutorEnv struct{} - -func (e *ExecutorEnv) Equals(env OpEnv) bool { - _, ok := env.(*ExecutorEnv) - return ok -} - -type OpType interface { - GenerateOp(node *Node, blder *PlanBuilder) error -} - -type Node struct { - Env OpEnv // Op将在哪里执行,Agent或者Executor - Type OpType - InputStreams []*StreamVar - OutputStreams []*StreamVar - InputValues []*ValueVar - OutputValues []*ValueVar -} - -func (o *Node) NewOutputStream(dataIndex int) *StreamVar { - v := &StreamVar{DataIndex: dataIndex, From: o} - o.OutputStreams = append(o.OutputStreams, v) - return v -} - -func (o *Node) AddInputStream(str *StreamVar) { - o.InputStreams = append(o.InputStreams, str) - str.AddTo(o) -} - -func (o *Node) ReplaceInputStream(old *StreamVar, new *StreamVar) { - old.RemoveTo(o) - new.AddTo(o) - - idx := lo.IndexOf(o.InputStreams, old) - o.InputStreams[idx] = new -} - -func (o *Node) NewOutputVar(typ ValueVarType) *ValueVar { - v := &ValueVar{Type: typ, From: o} - o.OutputValues = append(o.OutputValues, v) - return v -} - -func (o *Node) AddInputVar(v *ValueVar) { - o.InputValues = append(o.InputValues, v) - v.AddTo(o) -} - -func (o *Node) ReplaceInputVar(old *ValueVar, new *ValueVar) { - old.RemoveTo(o) - new.AddTo(o) - - idx := lo.IndexOf(o.InputValues, old) - o.InputValues[idx] = new -} - -func (o *Node) String() string { - return fmt.Sprintf("Node(%T)", o.Type) -} - type IPFSReadType struct { FileHash string Option ipfs.ReadOption } -func (t *IPFSReadType) GenerateOp(node *Node, blder *PlanBuilder) error { +func (t *IPFSReadType) InitNode(node *Node) { + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *IPFSReadType) GenerateOp(node *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.IPFSRead{ - Output: node.OutputStreams[0].Var, + Output: node.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), FileHash: t.FileHash, Option: t.Option, }, node.Env, blder) return nil } +func (t *IPFSReadType) String(node *Node) string { + return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +} + type IPFSWriteType struct { FileHashStoreKey string Range Range } -func (t *IPFSWriteType) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *IPFSWriteType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) +} + +func (t *IPFSWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.IPFSWrite{ - Input: op.InputStreams[0].Var, - FileHash: op.OutputValues[0].Var.(*ioswitch.StringVar), + Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), + FileHash: op.OutputValues[0].Props.Var.(*ioswitch.StringVar), }, op.Env, blder) return nil } +func (t *IPFSWriteType) String(node *Node) string { + return fmt.Sprintf("IPFSWrite[%s,%v+%v](%v>)", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +} + type ChunkedSplitType struct { - ChunkSize int - PaddingZeros bool + OutputCount int + ChunkSize int +} + +func (t *ChunkedSplitType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + for i := 0; i < t.OutputCount; i++ { + dag.NodeNewOutputStream(node, VarProps{}) + } } -func (t *ChunkedSplitType) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *ChunkedSplitType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.ChunkedSplit{ - Input: op.InputStreams[0].Var, + Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { - return v.Var + return v.Props.Var.(*ioswitch.StreamVar) }), ChunkSize: t.ChunkSize, - PaddingZeros: t.PaddingZeros, + PaddingZeros: true, }, op.Env, blder) return nil } +func (t *ChunkedSplitType) String(node *Node) string { + return fmt.Sprintf("ChunkedSplit[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +} + type ChunkedJoinType struct { - ChunkSize int + InputCount int + ChunkSize int } -func (t *ChunkedJoinType) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *ChunkedJoinType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, t.InputCount) + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *ChunkedJoinType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.ChunkedJoin{ Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { - return v.Var + return v.Props.Var.(*ioswitch.StreamVar) }), - Output: op.OutputStreams[0].Var, + Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), ChunkSize: t.ChunkSize, }, op.Env, blder) return nil } +func (t *ChunkedJoinType) String(node *Node) string { + return fmt.Sprintf("ChunkedJoin[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +} + type CloneStreamType struct{} -func (t *CloneStreamType) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *CloneStreamType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *CloneStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.CloneStream{ - Input: op.InputStreams[0].Var, + Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { - return v.Var + return v.Props.Var.(*ioswitch.StreamVar) }), }, op.Env, blder) return nil } +func (t *CloneStreamType) NewOutput(node *Node) *StreamVar { + return dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *CloneStreamType) String(node *Node) string { + return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + type CloneVarType struct{} -func (t *CloneVarType) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *CloneVarType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *CloneVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.CloneVar{ - Raw: op.InputValues[0].Var, + Raw: op.InputValues[0].Props.Var, Cloneds: lo.Map(op.OutputValues, func(v *ValueVar, idx int) ioswitch.Var { - return v.Var + return v.Props.Var }), }, op.Env, blder) return nil } -type MultiplyOp struct { +func (t *CloneVarType) NewOutput(node *Node) *ValueVar { + return dag.NodeNewOutputValue(node, VarProps{}) +} + +func (t *CloneVarType) String(node *Node) string { + return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type MultiplyType struct { EC cdssdk.ECRedundancy } -func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *MultiplyType) InitNode(node *Node) {} + +func (t *MultiplyType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { var inputIdxs []int var outputIdxs []int for _, in := range op.InputStreams { - inputIdxs = append(inputIdxs, in.DataIndex) + inputIdxs = append(inputIdxs, in.Props.StreamIndex) } for _, out := range op.OutputStreams { - outputIdxs = append(outputIdxs, out.DataIndex) + outputIdxs = append(outputIdxs, out.Props.StreamIndex) } rs, err := ec.NewRs(t.EC.K, t.EC.N) @@ -233,169 +182,353 @@ func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error { addOpByEnv(&ops.ECMultiply{ Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }), - Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }), + Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Props.Var.(*ioswitch.StreamVar) }), + Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Props.Var.(*ioswitch.StreamVar) }), ChunkSize: t.EC.ChunkSize, }, op.Env, blder) return nil } -type FileReadOp struct { +func (t *MultiplyType) AddInput(node *Node, str *StreamVar) { + node.InputStreams = append(node.InputStreams, str) + str.To(node, len(node.InputStreams)-1) +} + +func (t *MultiplyType) NewOutput(node *Node, dataIndex int) *StreamVar { + return dag.NodeNewOutputStream(node, VarProps{StreamIndex: dataIndex}) +} + +func (t *MultiplyType) String(node *Node) string { + return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type FileReadType struct { FilePath string } -func (t *FileReadOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *FileReadType) InitNode(node *Node) { + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *FileReadType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.FileRead{ - Output: op.OutputStreams[0].Var, + Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), FilePath: t.FilePath, }, op.Env, blder) return nil } -type FileWriteOp struct { +func (t *FileReadType) String(node *Node) string { + return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) +} + +type FileWriteType struct { FilePath string } -func (t *FileWriteOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *FileWriteType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *FileWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.FileWrite{ - Input: op.InputStreams[0].Var, + Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), FilePath: t.FilePath, }, op.Env, blder) return nil } -type FromExecutorOp struct { - Handle *ExecutorWriteStream +type FromExecutorType struct { + Handle *exec.ExecutorWriteStream +} + +func (t *FromExecutorType) InitNode(node *Node) { + dag.NodeNewOutputStream(node, VarProps{}) } -func (t *FromExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { - t.Handle.Var = op.OutputStreams[0].Var +func (t *FromExecutorType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + t.Handle.Var = op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar) return nil } -type ToExecutorOp struct { - Handle *ExecutorReadStream +func (t *FromExecutorType) String(node *Node) string { + return fmt.Sprintf("FromExecutor[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type ToExecutorType struct { + Handle *exec.ExecutorReadStream Range Range } -func (t *ToExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { - t.Handle.Var = op.InputStreams[0].Var +func (t *ToExecutorType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *ToExecutorType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + t.Handle.Var = op.InputStreams[0].Props.Var.(*ioswitch.StreamVar) return nil } -type StoreOp struct { +func (t *ToExecutorType) String(node *Node) string { + return fmt.Sprintf("ToExecutor[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +} + +type StoreType struct { StoreKey string } -func (t *StoreOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *StoreType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *StoreType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { blder.AtExecutor().AddOp(&ops.Store{ - Var: op.InputValues[0].Var, + Var: op.InputValues[0].Props.Var, Key: t.StoreKey, Store: blder.ExecutorPlan.StoreMap, }) return nil } -type DropOp struct{} +func (t *StoreType) String(node *Node) string { + return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) +} + +type DropType struct{} + +func (t *DropType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} -func (t *DropOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *DropType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.DropStream{ - Input: op.InputStreams[0].Var, + Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), }, op.Env, blder) return nil } -type SendStreamOp struct{} +func (t *DropType) String(node *Node) string { + return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type SendStreamType struct { +} + +func (t *SendStreamType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputStream(node, VarProps{}) +} -func (t *SendStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { - toAgt := op.OutputStreams[0].Toes[0].Env.(*AgentEnv) +func (t *SendStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + toAgt := op.OutputStreams[0].Toes[0].Node.Env.Worker.(*AgentWorker) addOpByEnv(&ops.SendStream{ - Input: op.InputStreams[0].Var, - Send: op.OutputStreams[0].Var, + Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), + Send: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), Node: toAgt.Node, }, op.Env, blder) return nil } -type GetStreamOp struct{} +func (t *SendStreamType) String(node *Node) string { + return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} -func (t *GetStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { - fromAgt := op.InputStreams[0].From.Env.(*AgentEnv) - addOpByEnv(&ops.GetStream{ - Signal: op.OutputValues[0].Var.(*ioswitch.SignalVar), - Output: op.OutputStreams[0].Var, - Get: op.InputStreams[0].Var, - Node: fromAgt.Node, - }, op.Env, blder) - return nil +type SendVarType struct { } -type SendVarOp struct{} +func (t *SendVarType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) +} -func (t *SendVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { - toAgt := op.OutputValues[0].Toes[0].Env.(*AgentEnv) +func (t *SendVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + toAgt := op.OutputValues[0].Toes[0].Node.Env.Worker.(*AgentWorker) addOpByEnv(&ops.SendVar{ - Input: op.InputValues[0].Var, - Send: op.OutputValues[0].Var, + Input: op.InputValues[0].Props.Var, + Send: op.OutputValues[0].Props.Var, Node: toAgt.Node, }, op.Env, blder) return nil } -type GetVarOp struct{} +func (t *SendVarType) String(node *Node) string { + return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type GetStreamType struct { +} -func (t *GetVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { - fromAgt := op.InputValues[0].From.Env.(*AgentEnv) +func (t *GetStreamType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *GetStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + fromAgt := op.InputStreams[0].From.Node.Env.Worker.(*AgentWorker) + addOpByEnv(&ops.GetStream{ + Signal: op.OutputValues[0].Props.Var.(*ioswitch.SignalVar), + Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), + Target: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), + Node: fromAgt.Node, + }, op.Env, blder) + return nil +} + +func (t *GetStreamType) String(node *Node) string { + return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type GetVaType struct { +} + +func (t *GetVaType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) + dag.NodeNewOutputValue(node, VarProps{}) +} + +func (t *GetVaType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + fromAgt := op.InputValues[0].From.Node.Env.Worker.(*AgentWorker) addOpByEnv(&ops.GetVar{ - Signal: op.OutputValues[0].Var.(*ioswitch.SignalVar), - Output: op.OutputValues[1].Var, - Get: op.InputValues[0].Var, + Signal: op.OutputValues[0].Props.Var.(*ioswitch.SignalVar), + Output: op.OutputValues[1].Props.Var, + Target: op.InputValues[0].Props.Var, Node: fromAgt.Node, }, op.Env, blder) return nil } +func (t *GetVaType) String(node *Node) string { + return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + type RangeType struct { Range Range } -func (t *RangeType) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *RangeType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *RangeType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { addOpByEnv(&ops.Range{ - Input: op.InputStreams[0].Var, - Output: op.OutputStreams[0].Var, + Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), + Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), Offset: t.Range.Offset, Length: t.Range.Length, }, op.Env, blder) return nil } -type HoldUntilOp struct { +func (t *RangeType) String(node *Node) string { + return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) } -func (t *HoldUntilOp) GenerateOp(op *Node, blder *PlanBuilder) error { +type HoldUntilType struct { +} + +func (t *HoldUntilType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *HoldUntilType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { o := &ops.HoldUntil{ - Waits: []*ioswitch.SignalVar{op.InputValues[0].Var.(*ioswitch.SignalVar)}, + Waits: []*ioswitch.SignalVar{op.InputValues[0].Props.Var.(*ioswitch.SignalVar)}, } for i := 0; i < len(op.OutputValues); i++ { - o.Holds = append(o.Holds, op.InputValues[i+1].Var) - o.Emits = append(o.Emits, op.OutputValues[i].Var) + o.Holds = append(o.Holds, op.InputValues[i+1].Props.Var) + o.Emits = append(o.Emits, op.OutputValues[i].Props.Var) } for i := 0; i < len(op.OutputStreams); i++ { - o.Holds = append(o.Holds, op.InputStreams[i].Var) - o.Emits = append(o.Emits, op.OutputStreams[i].Var) + o.Holds = append(o.Holds, op.InputStreams[i].Props.Var) + o.Emits = append(o.Emits, op.OutputStreams[i].Props.Var) } addOpByEnv(o, op.Env, blder) return nil } -func addOpByEnv(op ioswitch.Op, env OpEnv, blder *PlanBuilder) { - switch env := env.(type) { - case *AgentEnv: - blder.AtAgent(env.Node).AddOp(op) - case *ExecutorEnv: +func (t *HoldUntilType) String(node *Node) string { + return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +func addOpByEnv(op ioswitch.Op, env dag.NodeEnv, blder *exec.PlanBuilder) { + switch env.Type { + case dag.EnvWorker: + blder.AtAgent(env.Worker.(*AgentWorker).Node).AddOp(op) + case dag.EnvExecutor: blder.AtExecutor().AddOp(op) } } + +func formatStreamIO(node *Node) string { + is := "" + for i, in := range node.InputStreams { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputStreams { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("S{%s>%s}", is, os) +} + +func formatValueIO(node *Node) string { + is := "" + for i, in := range node.InputValues { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputValues { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("V{%s>%s}", is, os) +} diff --git a/common/pkgs/ioswitch/plans/parser.go b/common/pkgs/ioswitch/plans/parser.go index e6f8b01..e46798b 100644 --- a/common/pkgs/ioswitch/plans/parser.go +++ b/common/pkgs/ioswitch/plans/parser.go @@ -8,10 +8,57 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" ) +type NodeProps struct { + From From + To To +} + +type ValueVarType int + +const ( + StringValueVar ValueVarType = iota + SignalValueVar +) + +type VarProps struct { + StreamIndex int // 流的编号,只在StreamVar上有意义 + ValueType ValueVarType // 值类型,只在ValueVar上有意义 + Var ioswitch.Var // 生成Plan的时候创建的对应的Var +} + +type Graph = dag.Graph[NodeProps, VarProps] + +type Node = dag.Node[NodeProps, VarProps] + +type StreamVar = dag.StreamVar[NodeProps, VarProps] + +type ValueVar = dag.ValueVar[NodeProps, VarProps] + +type AgentWorker struct { + Node cdssdk.Node +} + +func (w *AgentWorker) GetAddress() string { + // TODO 选择地址 + return fmt.Sprintf("%v:%v", w.Node.ExternalIP, w.Node.ExternalGRPCPort) +} + +func (w *AgentWorker) Equals(worker exec.Worker) bool { + aw, ok := worker.(*AgentWorker) + if !ok { + return false + } + + return w.Node.NodeID == aw.Node.NodeID +} + type FromToParser interface { - Parse(ft FromTo, blder *PlanBuilder) error + Parse(ft FromTo, blder *builder.PlanBuilder) error } type DefaultParser struct { @@ -25,15 +72,14 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { } type ParseContext struct { - Ft FromTo - Nodes []*Node - ToNodes []*Node + Ft FromTo + DAG *Graph // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 StreamRange Range } -func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { +func (p *DefaultParser) Parse(ft FromTo, blder *builder.PlanBuilder) error { ctx := ParseContext{Ft: ft} // 分成两个阶段: @@ -42,7 +88,7 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { // 计算一下打开流的范围 p.calcStreamRange(&ctx) - err := p.extend(&ctx, ft, blder) + err := p.extend(&ctx, ft) if err != nil { return err } @@ -72,25 +118,7 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { } // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。 - // 从目前实现上来说不会死循环 - for { - opted := false - if p.pinIPFSRead(&ctx) { - opted = true - } - if p.pinJoin(&ctx) { - opted = true - } - if p.pinMultiply(&ctx) { - opted = true - } - if p.pinSplit(&ctx) { - opted = true - } - - if !opted { - break - } + for p.pin(&ctx) { } // 下面这些只需要执行一次,但需要按顺序 @@ -102,16 +130,19 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { return p.buildPlan(&ctx, blder) } -func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *StreamVar { - for _, op := range ctx.Nodes { - for _, o := range op.OutputStreams { - if o.DataIndex == dataIndex { - return o +func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *StreamVar { + var ret *StreamVar + ctx.DAG.Walk(func(n *dag.Node[NodeProps, VarProps]) bool { + for _, o := range n.OutputStreams { + if o != nil && o.Props.StreamIndex == streamIndex { + ret = o + return false } } - } + return true + }) - return nil + return ret } // 计算输入流的打开范围。会把流的范围按条带大小取整 @@ -149,35 +180,29 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { ctx.StreamRange = rng } -func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) error { +func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo) error { for _, f := range ft.Froms { - n, err := p.buildFromNode(ctx, &ft, f) + _, err := p.buildFromNode(ctx, &ft, f) if err != nil { return err } - ctx.Nodes = append(ctx.Nodes, n) // 对于完整文件的From,生成Split指令 if f.GetDataIndex() == -1 { - splitOp := &Node{ - Env: nil, - Type: &ChunkedSplitType{ChunkSize: p.EC.ChunkSize, PaddingZeros: true}, - } - splitOp.AddInputStream(n.OutputStreams[0]) + n, _ := dag.NewNode(ctx.DAG, &ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, NodeProps{}) for i := 0; i < p.EC.K; i++ { - splitOp.NewOutputStream(i) + n.OutputStreams[i].Props.StreamIndex = i } - ctx.Nodes = append(ctx.Nodes, splitOp) } } // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 ecInputStrs := make(map[int]*StreamVar) loop: - for _, o := range ctx.Nodes { + for _, o := range ctx.DAG.Nodes { for _, s := range o.OutputStreams { - if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { - ecInputStrs[s.DataIndex] = s + if s.Props.StreamIndex >= 0 && ecInputStrs[s.Props.StreamIndex] == nil { + ecInputStrs[s.Props.StreamIndex] = s if len(ecInputStrs) == p.EC.K { break loop } @@ -185,47 +210,42 @@ loop: } } if len(ecInputStrs) == p.EC.K { - mulOp := &Node{ - Env: nil, - Type: &MultiplyOp{EC: p.EC}, - } + mulNode, mulType := dag.NewNode(ctx.DAG, &MultiplyType{ + EC: p.EC, + }, NodeProps{}) for _, s := range ecInputStrs { - mulOp.AddInputStream(s) + mulType.AddInput(mulNode, s) } for i := 0; i < p.EC.N; i++ { - mulOp.NewOutputStream(i) + mulType.NewOutput(mulNode, i) } - ctx.Nodes = append(ctx.Nodes, mulOp) - joinOp := &Node{ - Env: nil, - Type: &ChunkedJoinType{p.EC.ChunkSize}, - } + joinNode, _ := dag.NewNode(ctx.DAG, &ChunkedJoinType{ + InputCount: p.EC.K, + ChunkSize: p.EC.ChunkSize, + }, NodeProps{}) + for i := 0; i < p.EC.K; i++ { // 不可能找不到流 - joinOp.AddInputStream(p.findOutputStream(ctx, i)) + p.findOutputStream(ctx, i).To(joinNode, i) } - joinOp.NewOutputStream(-1) - ctx.Nodes = append(ctx.Nodes, joinOp) + joinNode.OutputStreams[0].Props.StreamIndex = -1 } // 为每一个To找到一个输入流 for _, t := range ft.Toes { - n, err := p.buildToNode(&ft, t) + n, err := p.buildToNode(ctx, &ft, t) if err != nil { return err } - ctx.Nodes = append(ctx.Nodes, n) - ctx.ToNodes = append(ctx.ToNodes, n) - str := p.findOutputStream(ctx, t.GetDataIndex()) if str == nil { return fmt.Errorf("no output stream found for data index %d", t.GetDataIndex()) } - n.AddInputStream(str) + str.To(n, 0) } return nil @@ -246,43 +266,40 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*N } switch f := f.(type) { - case *FromNode: - ty := &IPFSReadType{ + case *FromWorker: + n, t := dag.NewNode(ctx.DAG, &IPFSReadType{ FileHash: f.FileHash, Option: ipfs.ReadOption{ Offset: 0, Length: -1, }, - } + }, NodeProps{ + From: f, + }) + n.OutputStreams[0].Props.StreamIndex = f.DataIndex + if f.DataIndex == -1 { - ty.Option.Offset = repRange.Offset + t.Option.Offset = repRange.Offset if repRange.Length != nil { - ty.Option.Length = *repRange.Length + t.Option.Length = *repRange.Length } } else { - ty.Option.Offset = blkRange.Offset + t.Option.Offset = blkRange.Offset if blkRange.Length != nil { - ty.Option.Length = *blkRange.Length + t.Option.Length = *blkRange.Length } } - n := &Node{ - Type: ty, - } - n.NewOutputStream(f.DataIndex) - if f.Node != nil { - n.Env = &AgentEnv{Node: *f.Node} + n.Env.ToEnvWorker(&AgentWorker{*f.Node}) } return n, nil case *FromExecutor: - n := &Node{ - Env: &ExecutorEnv{}, - Type: &FromExecutorOp{Handle: f.Handle}, - } - n.NewOutputStream(f.DataIndex) + n, _ := dag.NewNode(ctx.DAG, &FromExecutorType{Handle: f.Handle}, NodeProps{From: f}) + n.Env.ToEnvExecutor() + n.OutputStreams[0].Props.StreamIndex = f.DataIndex if f.DataIndex == -1 { f.Handle.RangeHint.Offset = repRange.Offset @@ -299,22 +316,23 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*N } } -func (p *DefaultParser) buildToNode(ft *FromTo, t To) (*Node, error) { +func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *FromTo, t To) (*Node, error) { switch t := t.(type) { case *ToNode: - n := &Node{ - Env: &AgentEnv{t.Node}, - Type: &IPFSWriteType{FileHashStoreKey: t.FileHashStoreKey, Range: t.Range}, - } - n.NewOutputVar(StringValueVar) + n, _ := dag.NewNode(ctx.DAG, &IPFSWriteType{ + FileHashStoreKey: t.FileHashStoreKey, + Range: t.Range, + }, NodeProps{ + To: t, + }) return n, nil case *ToExecutor: - return &Node{ - Env: &ExecutorEnv{}, - Type: &ToExecutorOp{Handle: t.Handle, Range: t.Range}, - }, nil + n, _ := dag.NewNode(ctx.DAG, &ToExecutorType{Handle: t.Handle, Range: t.Range}, NodeProps{To: t}) + n.Env.ToEnvExecutor() + + return n, nil default: return nil, fmt.Errorf("unsupported to type %T", t) @@ -323,294 +341,142 @@ func (p *DefaultParser) buildToNode(ft *FromTo, t To) (*Node, error) { // 删除输出流未被使用的Join指令 func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { - opted := false - for i, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedJoinType) - if !ok { - continue - } + changed := false - if len(op.OutputStreams[0].Toes) > 0 { - continue + dag.WalkOnlyType[*ChunkedJoinType](ctx.DAG, func(node *Node, typ *ChunkedJoinType) bool { + if len(node.OutputStreams[0].Toes) > 0 { + return true } - for _, in := range op.InputStreams { - in.RemoveTo(op) + for _, in := range node.InputStreams { + in.NotTo(node) } - ctx.Nodes[i] = nil - opted = true - } + ctx.DAG.RemoveNode(node) + return true + }) - ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) - return opted + return changed } // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { - opted := false - for i, op := range ctx.Nodes { - _, ok := op.Type.(*MultiplyOp) - if !ok { - continue - } - - for i2, out := range op.OutputStreams { + changed := false + dag.WalkOnlyType[*MultiplyType](ctx.DAG, func(node *Node, typ *MultiplyType) bool { + for i2, out := range node.OutputStreams { if len(out.Toes) > 0 { continue } - op.OutputStreams[i2] = nil - opted = true + node.OutputStreams[i2] = nil + changed = true } - op.OutputStreams = lo2.RemoveAllDefault(op.OutputStreams) + node.OutputStreams = lo2.RemoveAllDefault(node.OutputStreams) - if len(op.OutputStreams) == 0 { - for _, in := range op.InputStreams { - in.RemoveTo(op) + // 如果所有输出流都被删除,则删除该指令 + if len(node.OutputStreams) == 0 { + for _, in := range node.InputStreams { + in.NotTo(node) } - ctx.Nodes[i] = nil - opted = true + ctx.DAG.RemoveNode(node) + changed = true } - } - ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) - return opted + return true + }) + return changed } // 删除未使用的Split指令 func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { - opted := false - for i, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedSplitType) - if !ok { - continue - } - + changed := false + dag.WalkOnlyType[*ChunkedSplitType](ctx.DAG, func(node *Node, typ *ChunkedSplitType) bool { // Split出来的每一个流都没有被使用,才能删除这个指令 - isAllUnused := true - for _, out := range op.OutputStreams { + for _, out := range node.OutputStreams { if len(out.Toes) > 0 { - isAllUnused = false - break + return true } } - if isAllUnused { - op.InputStreams[0].RemoveTo(op) - ctx.Nodes[i] = nil - opted = true - } - } + node.InputStreams[0].NotTo(node) + ctx.DAG.RemoveNode(node) + changed = true + return true + }) - ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) - return opted + return changed } // 如果Split的结果被完全用于Join,则省略Split和Join指令 func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { - opted := false -loop: - for iSplit, splitOp := range ctx.Nodes { - // 进行合并操作时会删除多个指令,因此这里存在splitOp == nil的情况 - if splitOp == nil { - continue - } - - _, ok := splitOp.Type.(*ChunkedSplitType) - if !ok { - continue - } + changed := false + dag.WalkOnlyType[*ChunkedSplitType](ctx.DAG, func(splitNode *Node, typ *ChunkedSplitType) bool { // Split指令的每一个输出都有且只有一个目的地 - var joinOp *Node - for _, out := range splitOp.OutputStreams { + var joinNode *Node + for _, out := range splitNode.OutputStreams { if len(out.Toes) != 1 { continue } - if joinOp == nil { - joinOp = out.Toes[0] - } else if joinOp != out.Toes[0] { - continue loop + if joinNode == nil { + joinNode = out.Toes[0].Node + } else if joinNode != out.Toes[0].Node { + return true } } - if joinOp == nil { - continue + if joinNode == nil { + return true } // 且这个目的地要是一个Join指令 - _, ok = joinOp.Type.(*ChunkedJoinType) + _, ok := joinNode.Type.(*ChunkedJoinType) if !ok { - continue + return true } // 同时这个Join指令的输入也必须全部来自Split指令的输出。 // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可 - if len(joinOp.InputStreams) != len(splitOp.OutputStreams) { - continue + if len(joinNode.InputStreams) != len(splitNode.OutputStreams) { + return true } // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: // F->Split->Join->T 变换为:F->T - splitOp.InputStreams[0].RemoveTo(splitOp) - for i := len(joinOp.OutputStreams[0].Toes) - 1; i >= 0; i-- { - joinOp.OutputStreams[0].Toes[i].ReplaceInputStream(joinOp.OutputStreams[0], splitOp.InputStreams[0]) + splitNode.InputStreams[0].NotTo(splitNode) + for _, out := range joinNode.OutputStreams[0].Toes { + splitNode.InputStreams[0].To(out.Node, out.SlotIndex) } // 并删除这两个指令 - ctx.Nodes[iSplit] = nil - lo2.Clear(ctx.Nodes, joinOp) - opted = true - } - - ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) - return opted -} - -// 确定Split命令的执行位置 -func (p *DefaultParser) pinSplit(ctx *ParseContext) bool { - opted := false - for _, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedSplitType) - if !ok { - continue - } - - // 如果Split的每一个流的目的地都是同一个,则将Split固定在这个地方执行 - var toEnv OpEnv - useToEnv := true - for _, out := range op.OutputStreams { - for _, to := range out.Toes { - // 如果某个流的目的地也不确定,则将其视为与其他流的目的地相同 - if to.Env == nil { - continue - } - - if toEnv == nil { - toEnv = to.Env - } else if toEnv.Equals(to.Env) { - useToEnv = false - break - } - } - if !useToEnv { - break - } - } - - // 所有输出流的目的地都不确定,那么就不能根据输出流去固定 - if toEnv == nil { - useToEnv = false - } - - if useToEnv { - if op.Env == nil || !op.Env.Equals(toEnv) { - opted = true - } - - op.Env = toEnv - continue - } - - // 此时查看输入流的始发地是否可以确定,可以的话使用这个位置 - fromEnv := op.InputStreams[0].From.Env - if fromEnv != nil { - if op.Env == nil || !op.Env.Equals(fromEnv) { - opted = true - } - - op.Env = fromEnv - } - } - - return opted -} - -// 确定Join命令的执行位置,策略与固定Split类似 -func (p *DefaultParser) pinJoin(ctx *ParseContext) bool { - opted := false - for _, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedJoinType) - if !ok { - continue - } - - // 先查看输出流的目的地是否可以确定,可以的话使用这个位置 - var toEnv OpEnv - for _, to := range op.OutputStreams[0].Toes { - if to.Env == nil { - continue - } - - if toEnv == nil { - toEnv = to.Env - } else if !toEnv.Equals(to.Env) { - toEnv = nil - break - } - } - - if toEnv != nil { - if op.Env == nil || !op.Env.Equals(toEnv) { - opted = true - } - - op.Env = toEnv - continue - } - - // 否则根据输入流的始发地来固定 - var fromEnv OpEnv - for _, in := range op.InputStreams { - if in.From.Env == nil { - continue - } - - if fromEnv == nil { - fromEnv = in.From.Env - } else if !fromEnv.Equals(in.From.Env) { - // 输入流的始发地不同,那也必须选一个作为固定位置 - break - } - } + ctx.DAG.RemoveNode(joinNode) + ctx.DAG.RemoveNode(splitNode) - // 所有输入流的始发地都不确定,那没办法了 - if fromEnv != nil { - if op.Env == nil || !op.Env.Equals(fromEnv) { - opted = true - } - - op.Env = fromEnv - continue - } - - } + changed = true + return true + }) - return opted + return changed } -// 确定Multiply命令的执行位置 -func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool { - opted := false - for _, op := range ctx.Nodes { - _, ok := op.Type.(*MultiplyOp) - if !ok { - continue - } - - var toEnv OpEnv - for _, out := range op.OutputStreams { +// 通过流的输入输出位置来确定指令的执行位置。 +// To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG, +// 所以理论上不会出现有指令的位置始终无法确定的情况。 +func (p *DefaultParser) pin(ctx *ParseContext) bool { + changed := false + ctx.DAG.Walk(func(node *Node) bool { + var toEnv *dag.NodeEnv + for _, out := range node.OutputStreams { for _, to := range out.Toes { - if to.Env == nil { + if to.Node.Env.Type == dag.EnvUnknown { continue } if toEnv == nil { - toEnv = to.Env - } else if !toEnv.Equals(to.Env) { + toEnv = &to.Node.Env + } else if !toEnv.Equals(to.Node.Env) { toEnv = nil break } @@ -618,333 +484,287 @@ func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool { } if toEnv != nil { - if op.Env == nil || !op.Env.Equals(toEnv) { - opted = true + if !node.Env.Equals(*toEnv) { + changed = true } - op.Env = toEnv - continue + node.Env = *toEnv + return true } // 否则根据输入流的始发地来固定 - var fromEnv OpEnv - for _, in := range op.InputStreams { - if in.From.Env == nil { + var fromEnv *dag.NodeEnv + for _, in := range node.InputStreams { + if in.From.Node.Env.Type == dag.EnvUnknown { continue } if fromEnv == nil { - fromEnv = in.From.Env - } else if !fromEnv.Equals(in.From.Env) { - // 输入流的始发地不同,那也必须选一个作为固定位置 + fromEnv = &in.From.Node.Env + } else if !fromEnv.Equals(in.From.Node.Env) { + fromEnv = nil break } } - // 所有输入流的始发地都不确定,那没办法了 if fromEnv != nil { - if op.Env == nil || !op.Env.Equals(fromEnv) { - opted = true - } - - op.Env = fromEnv - continue - } - - } - - return opted -} - -// 确定IPFS读取指令的执行位置 -func (p *DefaultParser) pinIPFSRead(ctx *ParseContext) bool { - opted := false - for _, op := range ctx.Nodes { - _, ok := op.Type.(*IPFSReadType) - if !ok { - continue - } - - if op.Env != nil { - continue - } - - var toEnv OpEnv - for _, to := range op.OutputStreams[0].Toes { - if to.Env == nil { - continue - } - - if toEnv == nil { - toEnv = to.Env - } else if !toEnv.Equals(to.Env) { - toEnv = nil - break - } - } - - if toEnv != nil { - if op.Env == nil || !op.Env.Equals(toEnv) { - opted = true + if !node.Env.Equals(*fromEnv) { + changed = true } - op.Env = toEnv + node.Env = *fromEnv } - } + return true + }) - return opted + return changed } // 对于所有未使用的流,增加Drop指令 func (p *DefaultParser) dropUnused(ctx *ParseContext) { - for _, op := range ctx.Nodes { - for _, out := range op.OutputStreams { + ctx.DAG.Walk(func(node *Node) bool { + for _, out := range node.OutputStreams { if len(out.Toes) == 0 { - dropOp := &Node{ - Env: op.Env, - Type: &DropOp{}, - } - dropOp.AddInputStream(out) - ctx.Nodes = append(ctx.Nodes, dropOp) + n := ctx.DAG.NewNode(&DropType{}, NodeProps{}) + n.Env = node.Env + out.To(n, 0) } } - } + return true + }) } // 为IPFS写入指令存储结果 func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { - for _, op := range ctx.Nodes { - w, ok := op.Type.(*IPFSWriteType) - if !ok { - continue + dag.WalkOnlyType[*IPFSWriteType](ctx.DAG, func(node *Node, typ *IPFSWriteType) bool { + if typ.FileHashStoreKey == "" { + return true } - if w.FileHashStoreKey == "" { - continue - } + n := ctx.DAG.NewNode(&StoreType{ + StoreKey: typ.FileHashStoreKey, + }, NodeProps{}) + n.Env.ToEnvExecutor() - storeOp := &Node{ - Env: &ExecutorEnv{}, - Type: &StoreOp{ - StoreKey: w.FileHashStoreKey, - }, - } - storeOp.AddInputVar(op.OutputValues[0]) - ctx.Nodes = append(ctx.Nodes, storeOp) - } + node.OutputValues[0].To(n, 0) + return true + }) } // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func (p *DefaultParser) generateRange(ctx *ParseContext) { - for i, to := range ctx.ToNodes { - toDataIdx := ctx.Ft.Toes[i].GetDataIndex() - toRng := ctx.Ft.Toes[i].GetRange() + ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + if node.Props.To == nil { + return true + } + + toDataIdx := node.Props.To.GetDataIndex() + toRng := node.Props.To.GetRange() if toDataIdx == -1 { - rngType := &RangeType{Range: Range{Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length}} - rngNode := &Node{ - Env: to.InputStreams[0].From.Env, - Type: rngType, - } - rngNode.AddInputStream(to.InputStreams[0]) + n := ctx.DAG.NewNode(&RangeType{ + Range: Range{ + Offset: toRng.Offset - ctx.StreamRange.Offset, + Length: toRng.Length, + }, + }, NodeProps{}) + n.Env = node.InputStreams[0].From.Node.Env + + node.InputStreams[0].To(n, 0) + node.InputStreams[0].NotTo(node) + n.OutputStreams[0].To(node, 0) - to.ReplaceInputStream(to.InputStreams[0], rngNode.NewOutputStream(toDataIdx)) - ctx.Nodes = append(ctx.Nodes, rngNode) } else { stripSize := int64(p.EC.ChunkSize * p.EC.K) blkStartIdx := ctx.StreamRange.Offset / stripSize blkStart := blkStartIdx * int64(p.EC.ChunkSize) - rngType := &RangeType{Range: Range{Offset: toRng.Offset - blkStart, Length: toRng.Length}} - rngNode := &Node{ - Env: to.InputStreams[0].From.Env, - Type: rngType, - } - rngNode.AddInputStream(to.InputStreams[0]) + n := ctx.DAG.NewNode(&RangeType{ + Range: Range{ + Offset: toRng.Offset - blkStart, + Length: toRng.Length, + }, + }, NodeProps{}) + n.Env = node.InputStreams[0].From.Node.Env - to.ReplaceInputStream(to.InputStreams[0], rngNode.NewOutputStream(toDataIdx)) - ctx.Nodes = append(ctx.Nodes, rngNode) + node.InputStreams[0].To(n, 0) + node.InputStreams[0].NotTo(node) + n.OutputStreams[0].To(node, 0) } - } + + return true + }) } // 生成Clone指令 func (p *DefaultParser) generateClone(ctx *ParseContext) { - for _, op := range ctx.Nodes { - for _, out := range op.OutputStreams { + ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + for _, out := range node.OutputStreams { if len(out.Toes) <= 1 { continue } - cloneOp := &Node{ - Env: op.Env, - Type: &CloneStreamType{}, - } - for i := len(out.Toes) - 1; i >= 0; i-- { - out.Toes[i].ReplaceInputStream(out, cloneOp.NewOutputStream(out.DataIndex)) + n, t := dag.NewNode(ctx.DAG, &CloneStreamType{}, NodeProps{}) + n.Env = node.Env + for _, to := range out.Toes { + t.NewOutput(node).To(to.Node, to.SlotIndex) } out.Toes = nil - cloneOp.AddInputStream(out) - ctx.Nodes = append(ctx.Nodes, cloneOp) + out.To(n, 0) } - for _, out := range op.OutputValues { + for _, out := range node.OutputValues { if len(out.Toes) <= 1 { continue } - cloneOp := &Node{ - Env: op.Env, - Type: &CloneVarType{}, - } - for i := len(out.Toes) - 1; i >= 0; i-- { - out.Toes[i].ReplaceInputVar(out, cloneOp.NewOutputVar(out.Type)) + n, t := dag.NewNode(ctx.DAG, &CloneVarType{}, NodeProps{}) + n.Env = node.Env + for _, to := range out.Toes { + t.NewOutput(node).To(to.Node, to.SlotIndex) } out.Toes = nil - cloneOp.AddInputVar(out) - ctx.Nodes = append(ctx.Nodes, cloneOp) + out.To(n, 0) } - } + + return true + }) } // 生成Send指令 func (p *DefaultParser) generateSend(ctx *ParseContext) { - for _, op := range ctx.Nodes { - for _, out := range op.OutputStreams { - toOp := out.Toes[0] - if toOp.Env.Equals(op.Env) { + ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + for _, out := range node.OutputStreams { + to := out.Toes[0] + if to.Node.Env.Equals(node.Env) { continue } - switch toOp.Env.(type) { - case *ExecutorEnv: - // 如果是要送到Executor,则只能由Executor主动去拉取 - getStrOp := &Node{ - Env: &ExecutorEnv{}, - Type: &GetStreamOp{}, - } - - // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 - holdOp := &Node{ - Env: op.Env, - Type: &HoldUntilOp{}, - } - holdOp.AddInputVar(getStrOp.NewOutputVar(SignalValueVar)) - holdOp.AddInputStream(out) + switch to.Node.Env.Type { + case dag.EnvExecutor: + // // 如果是要送到Executor,则只能由Executor主动去拉取 + getNode := ctx.DAG.NewNode(&GetStreamType{}, NodeProps{}) + getNode.Env.ToEnvExecutor() - getStrOp.AddInputStream(holdOp.NewOutputStream(out.DataIndex)) - toOp.ReplaceInputStream(out, getStrOp.NewOutputStream(out.DataIndex)) + // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdNode := ctx.DAG.NewNode(&HoldUntilType{}, NodeProps{}) + holdNode.Env = node.Env - ctx.Nodes = append(ctx.Nodes, holdOp) - ctx.Nodes = append(ctx.Nodes, getStrOp) + // 将Get指令的信号送到Hold指令 + getNode.OutputValues[0].To(holdNode, 0) + // 将Get指令的输出送到目的地 + getNode.OutputStreams[0].To(to.Node, to.SlotIndex) + out.Toes = nil + // 将源节点的输出送到Hold指令 + out.To(holdNode, 0) + // 将Hold指令的输出送到Get指令 + holdNode.OutputStreams[0].To(getNode, 0) - case *AgentEnv: + case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - sendStrOp := &Node{ - Env: op.Env, - Type: &SendStreamOp{}, - } + n := ctx.DAG.NewNode(&SendStreamType{}, NodeProps{}) + n.Env = node.Env + n.OutputStreams[0].To(to.Node, to.SlotIndex) out.Toes = nil - sendStrOp.AddInputStream(out) - toOp.ReplaceInputStream(out, sendStrOp.NewOutputStream(out.DataIndex)) - ctx.Nodes = append(ctx.Nodes, sendStrOp) + out.To(n, 0) } } - for _, out := range op.OutputValues { - toOp := out.Toes[0] - if toOp.Env.Equals(op.Env) { + for _, out := range node.OutputValues { + to := out.Toes[0] + if to.Node.Env.Equals(node.Env) { continue } - switch toOp.Env.(type) { - case *ExecutorEnv: - // 如果是要送到Executor,则只能由Executor主动去拉取 - getStrOp := &Node{ - Env: &ExecutorEnv{}, - Type: &GetVarOp{}, - } - - // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 - holdOp := &Node{ - Env: op.Env, - Type: &HoldUntilOp{}, - } - holdOp.AddInputVar(getStrOp.NewOutputVar(SignalValueVar)) - holdOp.AddInputVar(out) + switch to.Node.Env.Type { + case dag.EnvExecutor: + // // 如果是要送到Executor,则只能由Executor主动去拉取 + getNode := ctx.DAG.NewNode(&GetVaType{}, NodeProps{}) + getNode.Env.ToEnvExecutor() - getStrOp.AddInputVar(holdOp.NewOutputVar(out.Type)) - toOp.ReplaceInputVar(out, getStrOp.NewOutputVar(out.Type)) + // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdNode := ctx.DAG.NewNode(&HoldUntilType{}, NodeProps{}) + holdNode.Env = node.Env - ctx.Nodes = append(ctx.Nodes, holdOp) - ctx.Nodes = append(ctx.Nodes, getStrOp) + // 将Get指令的信号送到Hold指令 + getNode.OutputValues[0].To(holdNode, 0) + // 将Get指令的输出送到目的地 + getNode.OutputValues[1].To(to.Node, to.SlotIndex) + out.Toes = nil + // 将源节点的输出送到Hold指令 + out.To(holdNode, 0) + // 将Hold指令的输出送到Get指令 + holdNode.OutputValues[0].To(getNode, 0) - case *AgentEnv: + case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - sendVarOp := &Node{ - Env: op.Env, - Type: &SendVarOp{}, - } + n := ctx.DAG.NewNode(&SendVarType{}, NodeProps{}) + n.Env = node.Env + n.OutputValues[0].To(to.Node, to.SlotIndex) out.Toes = nil - sendVarOp.AddInputVar(out) - toOp.ReplaceInputVar(out, sendVarOp.NewOutputVar(out.Type)) - ctx.Nodes = append(ctx.Nodes, sendVarOp) + out.To(n, 0) } } - } + + return true + }) } // 生成Plan -func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *PlanBuilder) error { - for _, op := range ctx.Nodes { - for _, out := range op.OutputStreams { - if out.Var != nil { +func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *builder.PlanBuilder) error { + var retErr error + ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + for _, out := range node.OutputStreams { + if out.Props.Var != nil { continue } - out.Var = blder.NewStreamVar() + out.Props.Var = blder.NewStreamVar() } - for _, in := range op.InputStreams { - if in.Var != nil { + for _, in := range node.InputStreams { + if in.Props.Var != nil { continue } - in.Var = blder.NewStreamVar() + in.Props.Var = blder.NewStreamVar() } - for _, out := range op.OutputValues { - if out.Var != nil { + for _, out := range node.OutputValues { + if out.Props.Var != nil { continue } - switch out.Type { + switch out.Props.ValueType { case StringValueVar: - out.Var = blder.NewStringVar() + out.Props.Var = blder.NewStringVar() case SignalValueVar: - out.Var = blder.NewSignalVar() + out.Props.Var = blder.NewSignalVar() } } - for _, in := range op.InputValues { - if in.Var != nil { + for _, in := range node.InputValues { + if in.Props.Var != nil { continue } - switch in.Type { + switch in.Props.ValueType { case StringValueVar: - in.Var = blder.NewStringVar() + in.Props.Var = blder.NewStringVar() case SignalValueVar: - in.Var = blder.NewSignalVar() + in.Props.Var = blder.NewSignalVar() } } - if err := op.Type.GenerateOp(op, blder); err != nil { - return err + if err := node.Type.GenerateOp(node, blder); err != nil { + retErr = err + return false } - } - return nil + return true + }) + + return retErr }