diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 0aaa6b9..2c924db 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -17,8 +17,8 @@ import ( func init() { rootCmd.AddCommand(&cobra.Command{ - Use: "test", - Short: "test", + Use: "test32", + Short: "test32", Run: func(cmd *cobra.Command, args []string) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -34,8 +34,8 @@ func init() { ft := ioswitch2.NewFromTo() ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3) ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream())) - ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) - ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1), "1")) + // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) + ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1), "1", exec.Range{Offset: 1})) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2), "2")) plans := exec.NewPlanBuilder() @@ -63,8 +63,8 @@ func init() { }, }) rootCmd.AddCommand(&cobra.Command{ - Use: "test3", - Short: "test3", + Use: "test", + Short: "test", Run: func(cmd *cobra.Command, args []string) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -84,7 +84,7 @@ func init() { ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1))) ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2))) - toDrv, drvStr := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), exec.NewRange(500, 500)) + toDrv, drvStr := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), exec.NewRange(0, 1293)) ft.AddTo(toDrv) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(0), "EC0")) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(1), "EC1")) diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index 5cf4158..4bf1cdc 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -89,13 +89,13 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { } func (t *CloneStreamType) NewOutput() *dag.StreamVar { - return t.OutputStreams().SetupNew(t).Var + return t.OutputStreams().AppendNew(t).Var } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ Raw: t.InputStreams().Get(0).VarID, - Cloneds: t.OutputValues().GetVarIDs(), + Cloneds: t.OutputStreams().GetVarIDs(), }, nil } diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 2502471..e87666d 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -142,7 +142,7 @@ func (t *ECMultiplyNode) RemoveAllInputs() { func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return t.OutputStreams().SetupNew(t).Var + return t.OutputStreams().AppendNew(t).Var } func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitch2/ops2/segment.go b/common/pkgs/ioswitch2/ops2/segment.go index 4cd2e65..c391a47 100644 --- a/common/pkgs/ioswitch2/ops2/segment.go +++ b/common/pkgs/ioswitch2/ops2/segment.go @@ -150,20 +150,10 @@ func (n *SegmentSplitNode) Segment(index int) *dag.StreamVar { } func (n *SegmentSplitNode) GenerateOp() (exec.Op, error) { - var realSegs []int64 - var realSegVarIDs []exec.VarID - - for i := 0; i < len(n.Segments); i++ { - if n.Segments[i] > 0 { - realSegs = append(realSegs, n.Segments[i]) - realSegVarIDs = append(realSegVarIDs, n.Segment(i).VarID) - } - } - return &SegmentSplit{ Input: n.InputStreams().Get(0).VarID, - Segments: realSegs, - Outputs: realSegVarIDs, + Segments: n.Segments, + Outputs: n.OutputStreams().GetVarIDs(), }, nil } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 321211d..e618e7b 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -512,24 +512,19 @@ func fixSegmentSplit(ctx *ParseContext) error { startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(ctx.StreamRange.Offset, strEnd) // 关闭超出范围的分段 - for i := 0; i < startSeg; i++ { - node.Segments[i] = 0 - node.OutputStreams().Slots.Set(i, nil) - } - - for i := endSeg; i < len(node.Segments); i++ { - node.Segments[i] = 0 - node.OutputStreams().Slots.Set(i, nil) - } + node.OutputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg) + node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg) + node.OutputStreams().Slots.RemoveRange(0, startSeg) + node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg) // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离 startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg) - node.Segments[startSeg] -= ctx.StreamRange.Offset - startSegStart + node.Segments[0] -= ctx.StreamRange.Offset - startSegStart // StreamRange结束的位置可能在某个分段的中间,此时这个分段的大小就等于流结束位置到分段起始位置的距离 if strEnd != nil { endSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(endSeg - 1) - node.Segments[endSeg-1] = *strEnd - endSegStart + node.Segments[len(node.Segments)-1] = *strEnd - endSegStart } return true }) @@ -551,18 +546,13 @@ func fixSegmentJoin(ctx *ParseContext) error { startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end) // 关闭超出范围的分段 - for i := 0; i < startSeg; i++ { - node.InputStreams().ClearInputAt(node, i) - } - - for i := endSeg; i < node.InputStreams().Len(); i++ { - node.InputStreams().ClearInputAt(node, i) - } + node.InputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg) + node.InputStreams().Slots.RemoveRange(0, startSeg) // 检查一下必须的分段是否都被加入到Join中 - for i := startSeg; i < endSeg; i++ { + for i := 0; i < node.InputStreams().Len(); i++ { if node.InputStreams().Get(i) == nil { - err = fmt.Errorf("segment %v missed to join an raw stream", i) + err = fmt.Errorf("segment %v missed to join an raw stream", i+startSeg) return false } } @@ -615,36 +605,36 @@ func omitSegmentSplitJoin(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(splitNode *ops2.SegmentSplitNode) bool { - // Split指令的每一个输出都有且只有一个目的地 - var dstNode dag.Node - for _, out := range splitNode.OutputStreams().Slots.RawArray() { - if out.Dst.Len() != 1 { - return true - } - - if dstNode == nil { - dstNode = out.Dst.Get(0) - } else if dstNode != out.Dst.Get(0) { - return true - } - } - - if dstNode == nil { + // 随便找一个输出流的目的地 + splitOut := splitNode.OutputStreams().Get(0) + if splitOut.Dst.Len() != 1 { return true } + dstNode := splitOut.Dst.Get(0) - // 且这个目的地要是一个Join指令 + // 这个目的地要是一个Join指令 joinNode, ok := dstNode.(*ops2.SegmentJoinNode) if !ok { return true } - // 同时这个Join指令的输入也必须全部来自Split指令的输出。 - // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可 - if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() { + if splitNode.OutputStreams().Len() != joinNode.Joined().Dst.Len() { return true } + // Join指令的输入必须全部来自Split指令的输出,且位置要相同 + for i := 0; i < splitNode.OutputStreams().Len(); i++ { + splitOut := splitNode.OutputStreams().Get(i) + joinIn := joinNode.InputStreams().Get(i) + if splitOut != joinIn { + return true + } + + if splitOut != nil && splitOut.Dst.Len() != 1 { + return true + } + } + // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: // F->Split->Join->T 变换为:F->T splitInput := splitNode.InputStreams().Get(0) @@ -960,8 +950,8 @@ func generateClone(ctx *ParseContext) { c := ctx.DAG.NewCloneStream() *c.Env() = *node.Env() - for _, to := range outVar.Dst.RawArray() { - c.NewOutput().To(to, to.InputStreams().IndexOf(outVar)) + for _, dst := range outVar.Dst.RawArray() { + c.NewOutput().To(dst, dst.InputStreams().IndexOf(outVar)) } outVar.Dst.Resize(0) c.SetInput(outVar) @@ -974,8 +964,8 @@ func generateClone(ctx *ParseContext) { t := ctx.DAG.NewCloneValue() *t.Env() = *node.Env() - for _, to := range outVar.Dst.RawArray() { - t.NewOutput().To(to, to.InputValues().IndexOf(outVar)) + for _, dst := range outVar.Dst.RawArray() { + t.NewOutput().To(dst, dst.InputValues().IndexOf(outVar)) } outVar.Dst.Resize(0) t.SetInput(outVar) diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index 5cf4158..5e730be 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -89,7 +89,7 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { } func (t *CloneStreamType) NewOutput() *dag.StreamVar { - return t.OutputStreams().SetupNew(t).Var + return t.OutputStreams().AppendNew(t).Var } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 2c6eb03..4e180f8 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -141,7 +141,7 @@ func (t *LRCConstructAnyNode) RemoveAllInputs() { func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return t.OutputStreams().SetupNew(t).Var + return t.OutputStreams().AppendNew(t).Var } func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/storage/types/shard_store.go b/common/pkgs/storage/types/shard_store.go index 61badd8..a7d9421 100644 --- a/common/pkgs/storage/types/shard_store.go +++ b/common/pkgs/storage/types/shard_store.go @@ -82,10 +82,10 @@ func (o *OpenOption) WithLength(len int64) OpenOption { return *o } -// [start, end],即包含end +// [start, end),不包含end func (o *OpenOption) WithRange(start int64, end int64) OpenOption { o.Offset = start - o.Length = end - start + 1 + o.Length = end - start return *o } @@ -104,7 +104,7 @@ func (o *OpenOption) String() string { rangeEnd := "" if o.Length >= 0 { - rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length-1) + rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length) } if rangeStart == "" && rangeEnd == "" {