package ops2 import ( "fmt" "io" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" "golang.org/x/sync/semaphore" ) func init() { exec.UseOp[*ChunkedSplit]() exec.UseOp[*ChunkedJoin]() } type ChunkedSplit struct { Input *exec.StreamVar `json:"input"` Outputs []*exec.StreamVar `json:"outputs"` ChunkSize int `json:"chunkSize"` PaddingZeros bool `json:"paddingZeros"` } func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { err := e.BindVars(ctx.Context, o.Input) if err != nil { return err } defer o.Input.Stream.Close() outputs := io2.ChunkedSplit(o.Input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{ PaddingZeros: o.PaddingZeros, }) sem := semaphore.NewWeighted(int64(len(outputs))) for i := range outputs { sem.Acquire(ctx.Context, 1) o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { sem.Release(1) }) } exec.PutArrayVars(e, o.Outputs) return sem.Acquire(ctx.Context, int64(len(outputs))) } func (o *ChunkedSplit) String() string { return fmt.Sprintf( "ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)", o.ChunkSize, o.PaddingZeros, o.Input.ID, utils.FormatVarIDs(o.Outputs), ) } type ChunkedJoin struct { Inputs []*exec.StreamVar `json:"inputs"` Output *exec.StreamVar `json:"output"` ChunkSize int `json:"chunkSize"` } func (o *ChunkedJoin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { err := exec.BindArrayVars(e, ctx.Context, o.Inputs) if err != nil { return err } var strReaders []io.Reader for _, s := range o.Inputs { strReaders = append(strReaders, s.Stream) } defer func() { for _, str := range o.Inputs { str.Stream.Close() } }() fut := future.NewSetVoid() o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { fut.SetVoid() }) e.PutVars(o.Output) return fut.Wait(ctx.Context) } func (o *ChunkedJoin) String() string { return fmt.Sprintf( "ChunkedJoin(chunkSize=%v), (%v) -> %v", o.ChunkSize, utils.FormatVarIDs(o.Inputs), o.Output.ID, ) } type ChunkedSplitNode struct { dag.NodeBase ChunkSize int } func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { node := &ChunkedSplitNode{ ChunkSize: chunkSize, } b.AddNode(node) return node } func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) { t.InputStreams().EnsureSize(1) input.Connect(t, 0) t.OutputStreams().Resize(cnt) for i := 0; i < cnt; i++ { t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i) } } func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { return t.OutputStreams().Get(idx) } func (t *ChunkedSplitNode) SplitCount() int { return t.OutputStreams().Len() } func (t *ChunkedSplitNode) Clear() { if t.InputStreams().Len() == 0 { return } t.InputStreams().Get(0).Disconnect(t, 0) t.InputStreams().Resize(0) for _, out := range t.OutputStreams().RawArray() { out.DisconnectAll() } t.OutputStreams().Resize(0) } func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ Input: t.InputStreams().Get(0).Var, Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.ChunkSize, PaddingZeros: true, }, nil } // func (t *ChunkedSplitNode) String() string { // return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) // } type ChunkedJoinNode struct { dag.NodeBase ChunkSize int } func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { node := &ChunkedJoinNode{ ChunkSize: chunkSize, } b.AddNode(node) node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar()) return node } func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { idx := t.InputStreams().EnlargeOne() str.Connect(t, idx) } func (t *ChunkedJoinNode) Joined() *dag.StreamVar { return t.OutputStreams().Get(0) } func (t *ChunkedJoinNode) RemoveAllInputs() { for i, in := range t.InputStreams().RawArray() { in.Disconnect(t, i) } t.InputStreams().Resize(0) } func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), Output: t.OutputStreams().Get(0).Var, ChunkSize: t.ChunkSize, }, nil } // func (t *ChunkedJoinType) String() string { // return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) // }