| @@ -17,8 +17,8 @@ import ( | |||||
| func init() { | func init() { | ||||
| rootCmd.AddCommand(&cobra.Command{ | rootCmd.AddCommand(&cobra.Command{ | ||||
| Use: "test", | |||||
| Short: "test", | |||||
| Use: "test32", | |||||
| Short: "test32", | |||||
| Run: func(cmd *cobra.Command, args []string) { | Run: func(cmd *cobra.Command, args []string) { | ||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | coorCli, err := stgglb.CoordinatorMQPool.Acquire() | ||||
| if err != nil { | if err != nil { | ||||
| @@ -34,8 +34,8 @@ func init() { | |||||
| ft := ioswitch2.NewFromTo() | ft := ioswitch2.NewFromTo() | ||||
| ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3) | ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3) | ||||
| ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream())) | ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream())) | ||||
| ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) | |||||
| ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1), "1")) | |||||
| // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) | |||||
| ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1), "1", exec.Range{Offset: 1})) | |||||
| ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2), "2")) | ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2), "2")) | ||||
| plans := exec.NewPlanBuilder() | plans := exec.NewPlanBuilder() | ||||
| @@ -63,8 +63,8 @@ func init() { | |||||
| }, | }, | ||||
| }) | }) | ||||
| rootCmd.AddCommand(&cobra.Command{ | rootCmd.AddCommand(&cobra.Command{ | ||||
| Use: "test3", | |||||
| Short: "test3", | |||||
| Use: "test", | |||||
| Short: "test", | |||||
| Run: func(cmd *cobra.Command, args []string) { | Run: func(cmd *cobra.Command, args []string) { | ||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | coorCli, err := stgglb.CoordinatorMQPool.Acquire() | ||||
| if err != nil { | if err != nil { | ||||
| @@ -84,7 +84,7 @@ func init() { | |||||
| ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1))) | ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1))) | ||||
| ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2))) | ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2))) | ||||
| toDrv, drvStr := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), exec.NewRange(500, 500)) | |||||
| toDrv, drvStr := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), exec.NewRange(0, 1293)) | |||||
| ft.AddTo(toDrv) | ft.AddTo(toDrv) | ||||
| ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(0), "EC0")) | ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(0), "EC0")) | ||||
| ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(1), "EC1")) | ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(1), "EC1")) | ||||
| @@ -89,13 +89,13 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { | |||||
| } | } | ||||
| func (t *CloneStreamType) NewOutput() *dag.StreamVar { | func (t *CloneStreamType) NewOutput() *dag.StreamVar { | ||||
| return t.OutputStreams().SetupNew(t).Var | |||||
| return t.OutputStreams().AppendNew(t).Var | |||||
| } | } | ||||
| func (t *CloneStreamType) GenerateOp() (exec.Op, error) { | func (t *CloneStreamType) GenerateOp() (exec.Op, error) { | ||||
| return &CloneStream{ | return &CloneStream{ | ||||
| Raw: t.InputStreams().Get(0).VarID, | Raw: t.InputStreams().Get(0).VarID, | ||||
| Cloneds: t.OutputValues().GetVarIDs(), | |||||
| Cloneds: t.OutputStreams().GetVarIDs(), | |||||
| }, nil | }, nil | ||||
| } | } | ||||
| @@ -142,7 +142,7 @@ func (t *ECMultiplyNode) RemoveAllInputs() { | |||||
| func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { | func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { | ||||
| t.OutputIndexes = append(t.OutputIndexes, dataIndex) | t.OutputIndexes = append(t.OutputIndexes, dataIndex) | ||||
| return t.OutputStreams().SetupNew(t).Var | |||||
| return t.OutputStreams().AppendNew(t).Var | |||||
| } | } | ||||
| func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { | func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { | ||||
| @@ -150,20 +150,10 @@ func (n *SegmentSplitNode) Segment(index int) *dag.StreamVar { | |||||
| } | } | ||||
| func (n *SegmentSplitNode) GenerateOp() (exec.Op, error) { | func (n *SegmentSplitNode) GenerateOp() (exec.Op, error) { | ||||
| var realSegs []int64 | |||||
| var realSegVarIDs []exec.VarID | |||||
| for i := 0; i < len(n.Segments); i++ { | |||||
| if n.Segments[i] > 0 { | |||||
| realSegs = append(realSegs, n.Segments[i]) | |||||
| realSegVarIDs = append(realSegVarIDs, n.Segment(i).VarID) | |||||
| } | |||||
| } | |||||
| return &SegmentSplit{ | return &SegmentSplit{ | ||||
| Input: n.InputStreams().Get(0).VarID, | Input: n.InputStreams().Get(0).VarID, | ||||
| Segments: realSegs, | |||||
| Outputs: realSegVarIDs, | |||||
| Segments: n.Segments, | |||||
| Outputs: n.OutputStreams().GetVarIDs(), | |||||
| }, nil | }, nil | ||||
| } | } | ||||
| @@ -512,24 +512,19 @@ func fixSegmentSplit(ctx *ParseContext) error { | |||||
| startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(ctx.StreamRange.Offset, strEnd) | startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(ctx.StreamRange.Offset, strEnd) | ||||
| // 关闭超出范围的分段 | // 关闭超出范围的分段 | ||||
| for i := 0; i < startSeg; i++ { | |||||
| node.Segments[i] = 0 | |||||
| node.OutputStreams().Slots.Set(i, nil) | |||||
| } | |||||
| for i := endSeg; i < len(node.Segments); i++ { | |||||
| node.Segments[i] = 0 | |||||
| node.OutputStreams().Slots.Set(i, nil) | |||||
| } | |||||
| node.OutputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg) | |||||
| node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg) | |||||
| node.OutputStreams().Slots.RemoveRange(0, startSeg) | |||||
| node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg) | |||||
| // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离 | // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离 | ||||
| startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg) | startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg) | ||||
| node.Segments[startSeg] -= ctx.StreamRange.Offset - startSegStart | |||||
| node.Segments[0] -= ctx.StreamRange.Offset - startSegStart | |||||
| // StreamRange结束的位置可能在某个分段的中间,此时这个分段的大小就等于流结束位置到分段起始位置的距离 | // StreamRange结束的位置可能在某个分段的中间,此时这个分段的大小就等于流结束位置到分段起始位置的距离 | ||||
| if strEnd != nil { | if strEnd != nil { | ||||
| endSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(endSeg - 1) | endSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(endSeg - 1) | ||||
| node.Segments[endSeg-1] = *strEnd - endSegStart | |||||
| node.Segments[len(node.Segments)-1] = *strEnd - endSegStart | |||||
| } | } | ||||
| return true | return true | ||||
| }) | }) | ||||
| @@ -551,18 +546,13 @@ func fixSegmentJoin(ctx *ParseContext) error { | |||||
| startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end) | startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end) | ||||
| // 关闭超出范围的分段 | // 关闭超出范围的分段 | ||||
| for i := 0; i < startSeg; i++ { | |||||
| node.InputStreams().ClearInputAt(node, i) | |||||
| } | |||||
| for i := endSeg; i < node.InputStreams().Len(); i++ { | |||||
| node.InputStreams().ClearInputAt(node, i) | |||||
| } | |||||
| node.InputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg) | |||||
| node.InputStreams().Slots.RemoveRange(0, startSeg) | |||||
| // 检查一下必须的分段是否都被加入到Join中 | // 检查一下必须的分段是否都被加入到Join中 | ||||
| for i := startSeg; i < endSeg; i++ { | |||||
| for i := 0; i < node.InputStreams().Len(); i++ { | |||||
| if node.InputStreams().Get(i) == nil { | if node.InputStreams().Get(i) == nil { | ||||
| err = fmt.Errorf("segment %v missed to join an raw stream", i) | |||||
| err = fmt.Errorf("segment %v missed to join an raw stream", i+startSeg) | |||||
| return false | return false | ||||
| } | } | ||||
| } | } | ||||
| @@ -615,36 +605,36 @@ func omitSegmentSplitJoin(ctx *ParseContext) bool { | |||||
| changed := false | changed := false | ||||
| dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(splitNode *ops2.SegmentSplitNode) bool { | dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(splitNode *ops2.SegmentSplitNode) bool { | ||||
| // Split指令的每一个输出都有且只有一个目的地 | |||||
| var dstNode dag.Node | |||||
| for _, out := range splitNode.OutputStreams().Slots.RawArray() { | |||||
| if out.Dst.Len() != 1 { | |||||
| return true | |||||
| } | |||||
| if dstNode == nil { | |||||
| dstNode = out.Dst.Get(0) | |||||
| } else if dstNode != out.Dst.Get(0) { | |||||
| return true | |||||
| } | |||||
| } | |||||
| if dstNode == nil { | |||||
| // 随便找一个输出流的目的地 | |||||
| splitOut := splitNode.OutputStreams().Get(0) | |||||
| if splitOut.Dst.Len() != 1 { | |||||
| return true | return true | ||||
| } | } | ||||
| dstNode := splitOut.Dst.Get(0) | |||||
| // 且这个目的地要是一个Join指令 | |||||
| // 这个目的地要是一个Join指令 | |||||
| joinNode, ok := dstNode.(*ops2.SegmentJoinNode) | joinNode, ok := dstNode.(*ops2.SegmentJoinNode) | ||||
| if !ok { | if !ok { | ||||
| return true | return true | ||||
| } | } | ||||
| // 同时这个Join指令的输入也必须全部来自Split指令的输出。 | |||||
| // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可 | |||||
| if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() { | |||||
| if splitNode.OutputStreams().Len() != joinNode.Joined().Dst.Len() { | |||||
| return true | return true | ||||
| } | } | ||||
| // Join指令的输入必须全部来自Split指令的输出,且位置要相同 | |||||
| for i := 0; i < splitNode.OutputStreams().Len(); i++ { | |||||
| splitOut := splitNode.OutputStreams().Get(i) | |||||
| joinIn := joinNode.InputStreams().Get(i) | |||||
| if splitOut != joinIn { | |||||
| return true | |||||
| } | |||||
| if splitOut != nil && splitOut.Dst.Len() != 1 { | |||||
| return true | |||||
| } | |||||
| } | |||||
| // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: | // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: | ||||
| // F->Split->Join->T 变换为:F->T | // F->Split->Join->T 变换为:F->T | ||||
| splitInput := splitNode.InputStreams().Get(0) | splitInput := splitNode.InputStreams().Get(0) | ||||
| @@ -960,8 +950,8 @@ func generateClone(ctx *ParseContext) { | |||||
| c := ctx.DAG.NewCloneStream() | c := ctx.DAG.NewCloneStream() | ||||
| *c.Env() = *node.Env() | *c.Env() = *node.Env() | ||||
| for _, to := range outVar.Dst.RawArray() { | |||||
| c.NewOutput().To(to, to.InputStreams().IndexOf(outVar)) | |||||
| for _, dst := range outVar.Dst.RawArray() { | |||||
| c.NewOutput().To(dst, dst.InputStreams().IndexOf(outVar)) | |||||
| } | } | ||||
| outVar.Dst.Resize(0) | outVar.Dst.Resize(0) | ||||
| c.SetInput(outVar) | c.SetInput(outVar) | ||||
| @@ -974,8 +964,8 @@ func generateClone(ctx *ParseContext) { | |||||
| t := ctx.DAG.NewCloneValue() | t := ctx.DAG.NewCloneValue() | ||||
| *t.Env() = *node.Env() | *t.Env() = *node.Env() | ||||
| for _, to := range outVar.Dst.RawArray() { | |||||
| t.NewOutput().To(to, to.InputValues().IndexOf(outVar)) | |||||
| for _, dst := range outVar.Dst.RawArray() { | |||||
| t.NewOutput().To(dst, dst.InputValues().IndexOf(outVar)) | |||||
| } | } | ||||
| outVar.Dst.Resize(0) | outVar.Dst.Resize(0) | ||||
| t.SetInput(outVar) | t.SetInput(outVar) | ||||
| @@ -89,7 +89,7 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { | |||||
| } | } | ||||
| func (t *CloneStreamType) NewOutput() *dag.StreamVar { | func (t *CloneStreamType) NewOutput() *dag.StreamVar { | ||||
| return t.OutputStreams().SetupNew(t).Var | |||||
| return t.OutputStreams().AppendNew(t).Var | |||||
| } | } | ||||
| func (t *CloneStreamType) GenerateOp() (exec.Op, error) { | func (t *CloneStreamType) GenerateOp() (exec.Op, error) { | ||||
| @@ -141,7 +141,7 @@ func (t *LRCConstructAnyNode) RemoveAllInputs() { | |||||
| func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { | func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { | ||||
| t.OutputIndexes = append(t.OutputIndexes, dataIndex) | t.OutputIndexes = append(t.OutputIndexes, dataIndex) | ||||
| return t.OutputStreams().SetupNew(t).Var | |||||
| return t.OutputStreams().AppendNew(t).Var | |||||
| } | } | ||||
| func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { | func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { | ||||
| @@ -82,10 +82,10 @@ func (o *OpenOption) WithLength(len int64) OpenOption { | |||||
| return *o | return *o | ||||
| } | } | ||||
| // [start, end],即包含end | |||||
| // [start, end),不包含end | |||||
| func (o *OpenOption) WithRange(start int64, end int64) OpenOption { | func (o *OpenOption) WithRange(start int64, end int64) OpenOption { | ||||
| o.Offset = start | o.Offset = start | ||||
| o.Length = end - start + 1 | |||||
| o.Length = end - start | |||||
| return *o | return *o | ||||
| } | } | ||||
| @@ -104,7 +104,7 @@ func (o *OpenOption) String() string { | |||||
| rangeEnd := "" | rangeEnd := "" | ||||
| if o.Length >= 0 { | if o.Length >= 0 { | ||||
| rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length-1) | |||||
| rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length) | |||||
| } | } | ||||
| if rangeStart == "" && rangeEnd == "" { | if rangeStart == "" && rangeEnd == "" { | ||||