From b6cb55b9a51c0341efd513dbb4e388a6601a2b6f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 30 Jul 2024 15:00:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E5=A4=9A=E5=89=AF=E6=9C=AC=E8=AF=BB?= =?UTF-8?q?=E5=8F=96=E9=80=BB=E8=BE=91=E6=94=B9=E6=88=90=E5=9F=BA=E4=BA=8E?= =?UTF-8?q?Plan=E6=9D=A5=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/downloader/iterator.go | 44 +++++--- common/pkgs/ec/rs.go | 4 + common/pkgs/ioswitch/ops/drop.go | 34 ++++++ common/pkgs/ioswitch/ops/ec.go | 2 +- common/pkgs/ioswitch/ops/grpc.go | 51 +++++---- common/pkgs/ioswitch/plans/executor.go | 72 ++----------- common/pkgs/ioswitch/plans/fromto.go | 122 +++++++++------------- common/pkgs/ioswitch/plans/ops.go | 137 ++++++++++++++++++++++-- common/pkgs/ioswitch/plans/parser.go | 138 +++++++++++++++++-------- 9 files changed, 386 insertions(+), 218 deletions(-) create mode 100644 common/pkgs/ioswitch/ops/drop.go diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 928f6cc..6fa7078 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -1,6 +1,7 @@ package downloader import ( + "context" "fmt" "io" "math" @@ -21,6 +22,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -170,27 +172,45 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2 return nil, err } + var strHandle *plans.ExecutorReadStream + ft := plans.FromTo{ + Object: *obj.Detail, + } + bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) osc, node := iter.getMinReadingObjectSolution(allNodes, 1) if bsc < osc { logger.Debugf("downloading object from node %v(%v)", blocks[0].Node.Name, blocks[0].Node.NodeID) - return NewIPFSReaderWithRange(blocks[0].Node, blocks[0].Block.FileHash, ipfs.ReadOption{ - Offset: obj.Raw.Offset, - Length: obj.Raw.Length, - }), nil - } + toExec, handle := plans.NewToExecutor(-1) + ft.AddFrom(plans.NewFromNode(&blocks[0].Node, -1)).AddTo(toExec) + strHandle = handle - // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 - if osc == math.MaxFloat64 { + // TODO2 处理Offset和Length + } else if osc == math.MaxFloat64 { + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 return nil, fmt.Errorf("no node has this object") + } else { + logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID) + + toExec, handle := plans.NewToExecutor(-1) + ft.AddFrom(plans.NewFromNode(node, -1)).AddTo(toExec) + strHandle = handle + // TODO2 处理Offset和Length } - logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID) - return NewIPFSReaderWithRange(*node, obj.Detail.Object.FileHash, ipfs.ReadOption{ - Offset: obj.Raw.Offset, - Length: obj.Raw.Length, - }), nil + parser := plans.DefaultParser{ + EC: cdssdk.DefaultECRedundancy, + } + plans := plans.NewPlanBuilder() + if err := parser.Parse(ft, plans); err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + + exec := plans.Execute() + go exec.Wait(context.TODO()) + + return exec.BeginRead(strHandle) } func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { diff --git a/common/pkgs/ec/rs.go b/common/pkgs/ec/rs.go index aa72a07..0502aa0 100644 --- a/common/pkgs/ec/rs.go +++ b/common/pkgs/ec/rs.go @@ -42,3 +42,7 @@ func (r *Rs) ReconstructAny(blocks [][]byte, outBlockIdxes []int) error { } return r.encoder.ReconstructAny(blocks, required) } + +func (r *Rs) GenerateMatrix(inputIdxs []int, outputIdxs []int) ([][]byte, error) { + return r.encoder.(reedsolomon.Extensions).GenerateMatrix(inputIdxs, outputIdxs) +} diff --git a/common/pkgs/ioswitch/ops/drop.go b/common/pkgs/ioswitch/ops/drop.go new file mode 100644 index 0000000..4a78596 --- /dev/null +++ b/common/pkgs/ioswitch/ops/drop.go @@ -0,0 +1,34 @@ +package ops + +import ( + "context" + "io" + + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type DropStream struct { + Input *ioswitch.StreamVar `json:"input"` +} + +func (o *DropStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := sw.BindVars(ctx, o.Input) + if err != nil { + return err + } + + for { + buf := make([]byte, 1024*8) + _, err = o.Input.Stream.Read(buf) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + } +} + +func init() { + OpUnion.AddT((*DropStream)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index 41a4415..60843ed 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -105,7 +105,7 @@ type ECMultiply struct { Coef [][]byte `json:"coef"` Inputs []*ioswitch.StreamVar `json:"inputs"` Outputs []*ioswitch.StreamVar `json:"outputs"` - ChunkSize int64 `json:"chunkSize"` + ChunkSize int `json:"chunkSize"` } func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { diff --git a/common/pkgs/ioswitch/ops/grpc.go b/common/pkgs/ioswitch/ops/grpc.go index d1a6c97..e14cbee 100644 --- a/common/pkgs/ioswitch/ops/grpc.go +++ b/common/pkgs/ioswitch/ops/grpc.go @@ -14,16 +14,17 @@ import ( ) type SendStream struct { - Stream *ioswitch.StreamVar `json:"stream"` - Node cdssdk.Node `json:"node"` + Input *ioswitch.StreamVar `json:"input"` + Send *ioswitch.StreamVar `json:"send"` + Node cdssdk.Node `json:"node"` } func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Stream) + err := sw.BindVars(ctx, o.Input) if err != nil { return err } - defer o.Stream.Stream.Close() + defer o.Input.Stream.Close() agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node)) if err != nil { @@ -31,9 +32,10 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) - logger.Debugf("sending stream %v to node %v", o.Stream.ID, o.Node) + logger.Debugf("sending stream %v as %v to node %v", o.Input.ID, o.Send.ID, o.Node) - err = agtCli.SendStream(ctx, sw.Plan().ID, o.Stream.ID, o.Stream.Stream) + // 发送后流的ID不同 + err = agtCli.SendStream(ctx, sw.Plan().ID, o.Send.ID, o.Input.Stream) if err != nil { return fmt.Errorf("sending stream: %w", err) } @@ -42,7 +44,8 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } type GetStream struct { - Stream *ioswitch.StreamVar `json:"stream"` + Get *ioswitch.StreamVar `json:"get"` + Output *ioswitch.StreamVar `json:"output"` Node cdssdk.Node `json:"node"` } @@ -53,29 +56,31 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) - logger.Debugf("getting stream %v from node %v", o.Stream.ID, o.Node) + logger.Debugf("getting stream %v as %v from node %v", o.Get.ID, o.Output.ID, o.Node) - str, err := agtCli.GetStream(sw.Plan().ID, o.Stream.ID) + str, err := agtCli.GetStream(sw.Plan().ID, o.Get.ID) if err != nil { return fmt.Errorf("getting stream: %w", err) } fut := future.NewSetVoid() - o.Stream.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { + // 获取后送到本地的流ID是不同的 + o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { fut.SetVoid() }) - sw.PutVars(o.Stream) + sw.PutVars(o.Output) return fut.Wait(ctx) } type SendVar struct { - Var ioswitch.Var `json:"var"` - Node cdssdk.Node `json:"node"` + Input ioswitch.Var `json:"input"` + Send ioswitch.Var `json:"send"` + Node cdssdk.Node `json:"node"` } func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Var) + err := sw.BindVars(ctx, o.Input) if err != nil { return err } @@ -86,9 +91,10 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) - logger.Debugf("sending var %v to node %v", o.Var.GetID(), o.Node) + logger.Debugf("sending var %v as %v to node %v", o.Input.GetID(), o.Send.GetID(), o.Node) - err = agtCli.SendVar(ctx, sw.Plan().ID, o.Var) + ioswitch.AssignVar(o.Input, o.Send) + err = agtCli.SendVar(ctx, sw.Plan().ID, o.Send) if err != nil { return fmt.Errorf("sending var: %w", err) } @@ -97,8 +103,9 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } type GetVar struct { - Var ioswitch.Var `json:"var"` - Node cdssdk.Node `json:"node"` + Get ioswitch.Var `json:"get"` + Output ioswitch.Var `json:"output"` + Node cdssdk.Node `json:"node"` } func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { @@ -108,14 +115,14 @@ func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) - logger.Debugf("getting var %v from node %v", o.Var.GetID(), o.Node) + logger.Debugf("getting var %v as %v from node %v", o.Get.GetID(), o.Output.GetID(), o.Node) - v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Var) + v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Get) if err != nil { return fmt.Errorf("getting var: %w", err) } - o.Var = v2 - sw.PutVars(o.Var) + ioswitch.AssignVar(v2, o.Output) + sw.PutVars(o.Output) return nil } diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index 0618fe3..42c342c 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -20,22 +20,22 @@ type Executor struct { executorSw *ioswitch.Switch } -func (e *Executor) BeginWrite(str io.ReadCloser, target *ExecutorWriteStream) { - target.stream.Stream = str - e.executorSw.PutVars(target.stream) +func (e *Executor) BeginWrite(str io.ReadCloser, handle *ExecutorWriteStream) { + handle.Var.Stream = str + e.executorSw.PutVars(handle.Var) } -func (e *Executor) BeginRead(target *ExecutorReadStream) (io.ReadCloser, error) { - err := e.executorSw.BindVars(e.ctx, target.stream) +func (e *Executor) BeginRead(handle *ExecutorReadStream) (io.ReadCloser, error) { + err := e.executorSw.BindVars(e.ctx, handle.Var) if err != nil { return nil, fmt.Errorf("bind vars: %w", err) } - return target.stream.Stream, nil + return handle.Var.Stream, nil } func (e *Executor) Signal(signal *ExecutorSignalVar) { - e.executorSw.PutVars(signal.v) + e.executorSw.PutVars(signal.Var) } func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { @@ -45,7 +45,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { } ret := make(map[string]any) - e.planBlder.StoreMap.Range(func(k, v any) bool { + e.planBlder.ExecutorPlan.StoreMap.Range(func(k, v any) bool { ret[k.(string)] = v return true }) @@ -98,64 +98,14 @@ func (e *Executor) stopWith(err error) { e.cancel() } -// type ExecutorStreamVar struct { -// blder *PlanBuilder -// v *ioswitch.StreamVar -// } type ExecutorWriteStream struct { - stream *ioswitch.StreamVar + Var *ioswitch.StreamVar } -// func (b *ExecutorPlanBuilder) WillWrite(str *ExecutorWriteStream) *ExecutorStreamVar { -// stream := b.blder.NewStreamVar() -// str.stream = stream -// return &ExecutorStreamVar{blder: b.blder, v: stream} -// } - -// func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar { -// s := b.blder.NewSignalVar() -// return &ExecutorSignalVar{blder: b.blder, v: s} -// } - type ExecutorReadStream struct { - stream *ioswitch.StreamVar -} - -// func (v *ExecutorStreamVar) WillRead(str *ExecutorReadStream) { -// str.stream = v.v -// } -/* -func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar { - s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.SendStream{Stream: s.v, Node: node}) - return &AgentStreamVar{ - owner: s.blder.AtAgent(node), - v: s.v, - } -} - -type ExecutorStringVar struct { - blder *PlanBuilder - v *ioswitch.StringVar -} - -func (s *ExecutorStringVar) Store(key string) { - s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.Store{ - Var: s.v, - Key: key, - Store: s.blder.StoreMap, - }) + Var *ioswitch.StreamVar } type ExecutorSignalVar struct { - blder *PlanBuilder - v *ioswitch.SignalVar -} - -func (s *ExecutorSignalVar) To(node cdssdk.Node) *AgentSignalVar { - s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.SendVar{Var: s.v, Node: node}) - return &AgentSignalVar{ - owner: s.blder.AtAgent(node), - v: s.v, - } + Var *ioswitch.SignalVar } -*/ diff --git a/common/pkgs/ioswitch/plans/fromto.go b/common/pkgs/ioswitch/plans/fromto.go index 3f353b3..2cbb2e7 100644 --- a/common/pkgs/ioswitch/plans/fromto.go +++ b/common/pkgs/ioswitch/plans/fromto.go @@ -2,27 +2,43 @@ package plans import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" ) +type FromTo struct { + Object stgmod.ObjectDetail + Froms []From + Tos []To +} + +func (ft *FromTo) AddFrom(from From) *FromTo { + ft.Froms = append(ft.Froms, from) + return ft +} + +func (ft *FromTo) AddTo(to To) *FromTo { + ft.Tos = append(ft.Tos, to) + return ft +} + +type FromTos []FromTo + type From interface { GetDataIndex() int - BuildOp() Node } type To interface { + GetRange() Range GetDataIndex() int - BuildOp() Node } -type FromTos []FromTo - -type FromTo struct { - Froms []From - Tos []To +type Range struct { + Offset int64 + Length int64 } type FromExecutor struct { - Stream *ExecutorWriteStream + Handle *ExecutorWriteStream DataIndex int } @@ -30,64 +46,43 @@ func (f *FromExecutor) GetDataIndex() int { return f.DataIndex } -func (f *FromExecutor) BuildOp() Node { +func (f *FromExecutor) BuildNode(ft *FromTo) Node { op := Node{ Env: &ExecutorEnv{}, - Type: FromExecutorOp{ - OutputVar: 0, - Handle: f.Stream, + Type: &FromExecutorOp{ + Handle: f.Handle, }, } - op.NewOutput(nil) + op.NewOutput(f.DataIndex) return op } -type FromIPFS struct { +type FromNode struct { Node *cdssdk.Node - FileHash string DataIndex int } -func NewFromIPFS(node *cdssdk.Node, fileHash string, dataIndex int) *FromIPFS { - return &FromIPFS{ +func NewFromNode(node *cdssdk.Node, dataIndex int) *FromNode { + return &FromNode{ Node: node, - FileHash: fileHash, DataIndex: dataIndex, } } -func (f *FromIPFS) GetDataIndex() int { +func (f *FromNode) GetDataIndex() int { return f.DataIndex } -func (f *FromIPFS) BuildOp() Node { - op := Node{ - Pinned: true, - Type: &IPFSReadType{ - OutputVar: 0, - FileHash: f.FileHash, - }, - } - - if f.Node == nil { - op.Env = nil - } else { - op.Env = &AgentEnv{*f.Node} - } - - op.NewOutput(nil) - return op -} - type ToExecutor struct { - Stream *ExecutorReadStream + Handle *ExecutorReadStream DataIndex int + Range Range } func NewToExecutor(dataIndex int) (*ToExecutor, *ExecutorReadStream) { str := ExecutorReadStream{} return &ToExecutor{ - Stream: &str, + Handle: &str, DataIndex: dataIndex, }, &str } @@ -96,48 +91,31 @@ func (t *ToExecutor) GetDataIndex() int { return t.DataIndex } -func (t *ToExecutor) BuildOp() Node { - op := Node{ - Env: &ExecutorEnv{}, - Pinned: true, - Type: ToExecutorOp{ - InputVar: 0, - Handle: t.Stream, - }, - } - op.NewOutput(nil) - return op +func (t *ToExecutor) GetRange() Range { + return t.Range } -type ToIPFS struct { - Node cdssdk.Node - DataIndex int - FileHashKey string +type ToAgent struct { + Node cdssdk.Node + DataIndex int + Range Range + FileHashStoreKey string } -func NewToIPFS(node cdssdk.Node, dataIndex int, fileHashKey string) *ToIPFS { - return &ToIPFS{ - Node: node, - DataIndex: dataIndex, - FileHashKey: fileHashKey, +func NewToAgent(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToAgent { + return &ToAgent{ + Node: node, + DataIndex: dataIndex, + FileHashStoreKey: fileHashStoreKey, } } -func (t *ToIPFS) GetDataIndex() int { +func (t *ToAgent) GetDataIndex() int { return t.DataIndex } -func (t *ToIPFS) BuildOp() Node { - op := Node{ - Env: &AgentEnv{t.Node}, - Pinned: true, - Type: &IPFSWriteType{ - InputVar: 0, - FileHashVar: 0, - }, - } - op.NewInput(nil) - return op +func (t *ToAgent) GetRange() Range { + return t.Range } // type ToStorage struct { diff --git a/common/pkgs/ioswitch/plans/ops.go b/common/pkgs/ioswitch/plans/ops.go index ebb2fc5..73babfd 100644 --- a/common/pkgs/ioswitch/plans/ops.go +++ b/common/pkgs/ioswitch/plans/ops.go @@ -5,7 +5,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" ) type VarIndex int @@ -130,8 +132,13 @@ type IPFSReadType struct { Option ipfs.ReadOption } -func (t *IPFSReadType) GenerateOp(op *Node, blder *PlanBuilder) error { - +func (t *IPFSReadType) GenerateOp(node *Node, blder *PlanBuilder) error { + addOpByEnv(&ops.IPFSRead{ + Output: node.OutputStreams[0].Var, + FileHash: t.FileHash, + Option: t.Option, + }, node.Env, blder) + return nil } type IPFSWriteType struct { @@ -139,7 +146,11 @@ type IPFSWriteType struct { } func (t *IPFSWriteType) GenerateOp(op *Node, blder *PlanBuilder) error { - + addOpByEnv(&ops.IPFSWrite{ + Input: op.InputStreams[0].Var, + FileHash: op.OutputValues[0].Var.(*ioswitch.StringVar), + }, op.Env, blder) + return nil } type ChunkedSplitOp struct { @@ -148,7 +159,15 @@ type ChunkedSplitOp struct { } func (t *ChunkedSplitOp) GenerateOp(op *Node, blder *PlanBuilder) error { - + addOpByEnv(&ops.ChunkedSplit{ + Input: op.InputStreams[0].Var, + Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { + return v.Var + }), + ChunkSize: t.ChunkSize, + PaddingZeros: t.PaddingZeros, + }, op.Env, blder) + return nil } type ChunkedJoinOp struct { @@ -156,27 +175,68 @@ type ChunkedJoinOp struct { } func (t *ChunkedJoinOp) GenerateOp(op *Node, blder *PlanBuilder) error { - + addOpByEnv(&ops.ChunkedJoin{ + Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { + return v.Var + }), + Output: op.OutputStreams[0].Var, + ChunkSize: t.ChunkSize, + }, op.Env, blder) + return nil } type CloneStreamOp struct{} func (t *CloneStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { - + addOpByEnv(&ops.CloneStream{ + Input: op.InputStreams[0].Var, + Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { + return v.Var + }), + }, op.Env, blder) + return nil } type CloneVarOp struct{} func (t *CloneVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { - + addOpByEnv(&ops.CloneVar{ + Raw: op.InputValues[0].Var, + Cloneds: lo.Map(op.OutputValues, func(v *ValueVar, idx int) ioswitch.Var { + return v.Var + }), + }, op.Env, blder) + return nil } type MultiplyOp struct { - Coef [][]byte + EC cdssdk.ECRedundancy ChunkSize int } func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error { + var inputIdxs []int + var outputIdxs []int + for _, in := range op.InputStreams { + inputIdxs = append(inputIdxs, in.DataIndex) + } + for _, out := range op.OutputStreams { + outputIdxs = append(outputIdxs, out.DataIndex) + } + + rs, _ := ec.NewRs(t.EC.K, t.EC.N) + coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs) + if err != nil { + return err + } + + addOpByEnv(&ops.ECMultiply{ + Coef: coef, + Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }), + Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }), + ChunkSize: t.ChunkSize, + }, op.Env, blder) + return nil } type FileReadOp struct { @@ -184,6 +244,11 @@ type FileReadOp struct { } func (t *FileReadOp) GenerateOp(op *Node, blder *PlanBuilder) error { + addOpByEnv(&ops.FileRead{ + Output: op.OutputStreams[0].Var, + FilePath: t.FilePath, + }, op.Env, blder) + return nil } type FileWriteOp struct { @@ -191,6 +256,11 @@ type FileWriteOp struct { } func (t *FileWriteOp) GenerateOp(op *Node, blder *PlanBuilder) error { + addOpByEnv(&ops.FileWrite{ + Input: op.InputStreams[0].Var, + FilePath: t.FilePath, + }, op.Env, blder) + return nil } type FromExecutorOp struct { @@ -198,6 +268,8 @@ type FromExecutorOp struct { } func (t *FromExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { + t.Handle.Var = op.OutputStreams[0].Var + return nil } type ToExecutorOp struct { @@ -205,6 +277,8 @@ type ToExecutorOp struct { } func (t *ToExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { + t.Handle.Var = op.InputStreams[0].Var + return nil } type StoreOp struct { @@ -212,29 +286,76 @@ type StoreOp struct { } func (t *StoreOp) GenerateOp(op *Node, blder *PlanBuilder) error { + blder.AtExecutor().AddOp(&ops.Store{ + Var: op.InputValues[0].Var, + Key: t.StoreKey, + Store: blder.ExecutorPlan.StoreMap, + }) + return nil } type DropOp struct{} func (t *DropOp) GenerateOp(op *Node, blder *PlanBuilder) error { + addOpByEnv(&ops.DropStream{ + Input: op.InputStreams[0].Var, + }, op.Env, blder) + return nil } type SendStreamOp struct{} func (t *SendStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { + toAgt := op.OutputStreams[0].Toes[0].Env.(*AgentEnv) + addOpByEnv(&ops.SendStream{ + Input: op.InputStreams[0].Var, + Send: op.OutputStreams[0].Var, + Node: toAgt.Node, + }, op.Env, blder) + return nil } type GetStreamOp struct{} func (t *GetStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { + fromAgt := op.InputStreams[0].From.Env.(*AgentEnv) + addOpByEnv(&ops.GetStream{ + Output: op.OutputStreams[0].Var, + Get: op.InputStreams[0].Var, + Node: fromAgt.Node, + }, op.Env, blder) + return nil } type SendVarOp struct{} func (t *SendVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { + toAgt := op.OutputValues[0].Toes[0].Env.(*AgentEnv) + addOpByEnv(&ops.SendVar{ + Input: op.InputValues[0].Var, + Send: op.OutputValues[0].Var, + Node: toAgt.Node, + }, op.Env, blder) + return nil } type GetVarOp struct{} func (t *GetVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { + fromAgt := op.InputValues[0].From.Env.(*AgentEnv) + addOpByEnv(&ops.GetVar{ + Output: op.OutputValues[0].Var, + Get: op.InputValues[0].Var, + Node: fromAgt.Node, + }, op.Env, blder) + return nil +} + +func addOpByEnv(op ioswitch.Op, env OpEnv, blder *PlanBuilder) { + switch env := env.(type) { + case *AgentEnv: + blder.AtAgent(env.Node).AddOp(op) + case *ExecutorEnv: + blder.AtExecutor().AddOp(op) + } } diff --git a/common/pkgs/ioswitch/plans/parser.go b/common/pkgs/ioswitch/plans/parser.go index 5e6b765..2c0d881 100644 --- a/common/pkgs/ioswitch/plans/parser.go +++ b/common/pkgs/ioswitch/plans/parser.go @@ -12,12 +12,12 @@ type FromToParser interface { } type DefaultParser struct { - EC *cdssdk.ECRedundancy + EC cdssdk.ECRedundancy } type ParseContext struct { - Ft FromTo - Ops []*Node + Ft FromTo + Nodes []*Node } func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { @@ -85,7 +85,7 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { return p.buildPlan(&ctx, blder) } func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *StreamVar { - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { for _, o := range op.OutputStreams { if o.DataIndex == dataIndex { return o @@ -98,8 +98,11 @@ func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *Stre func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) error { for _, f := range ft.Froms { - o := f.BuildOp() - ctx.Ops = append(ctx.Ops, &o) + n, err := p.buildFromNode(&ft, f) + if err != nil { + return err + } + ctx.Nodes = append(ctx.Nodes, n) // 对于完整文件的From,生成Split指令 if f.GetDataIndex() == -1 { @@ -107,18 +110,18 @@ func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) Env: nil, Type: &ChunkedSplitOp{ChunkSize: p.EC.ChunkSize, PaddingZeros: true}, } - splitOp.AddInput(o.OutputStreams[0]) + splitOp.AddInput(n.OutputStreams[0]) for i := 0; i < p.EC.K; i++ { splitOp.NewOutput(i) } - ctx.Ops = append(ctx.Ops, splitOp) + ctx.Nodes = append(ctx.Nodes, splitOp) } } // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 ecInputStrs := make(map[int]*StreamVar) loop: - for _, o := range ctx.Ops { + for _, o := range ctx.Nodes { for _, s := range o.OutputStreams { if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { ecInputStrs[s.DataIndex] = s @@ -140,7 +143,7 @@ loop: for i := 0; i < p.EC.N; i++ { mulOp.NewOutput(i) } - ctx.Ops = append(ctx.Ops, mulOp) + ctx.Nodes = append(ctx.Nodes, mulOp) joinOp := &Node{ Env: nil, @@ -155,24 +158,75 @@ loop: // 为每一个To找到一个输入流 for _, t := range ft.Tos { - o := t.BuildOp() - ctx.Ops = append(ctx.Ops, &o) + n, err := p.buildToNode(&ft, t) + if err != nil { + return err + } + + ctx.Nodes = append(ctx.Nodes, n) str := p.findOutputStream(ctx, t.GetDataIndex()) if str == nil { return fmt.Errorf("no output stream found for data index %d", t.GetDataIndex()) } - o.AddInput(str) + n.AddInput(str) } return nil } +func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) { + switch f := f.(type) { + case *FromNode: + n := &Node{ + // TODO2 需要FromTo的Range来设置Option + Type: &IPFSReadType{FileHash: ft.Object.Object.FileHash}, + } + n.NewOutput(f.DataIndex) + + if f.Node != nil { + n.Env = &AgentEnv{Node: *f.Node} + } + + return n, nil + + case *FromExecutor: + n := &Node{ + Env: &ExecutorEnv{}, + Type: &FromExecutorOp{Handle: f.Handle}, + } + n.NewOutput(f.DataIndex) + return n, nil + + default: + return nil, fmt.Errorf("unsupported from type %T", f) + } +} + +func (p *DefaultParser) buildToNode(ft *FromTo, t To) (*Node, error) { + switch t := t.(type) { + case *ToAgent: + return &Node{ + Env: &AgentEnv{t.Node}, + Type: &IPFSWriteType{FileHashStoreKey: t.FileHashStoreKey}, + }, nil + + case *ToExecutor: + return &Node{ + Env: &ExecutorEnv{}, + Type: &ToExecutorOp{Handle: t.Handle}, + }, nil + + default: + return nil, fmt.Errorf("unsupported to type %T", t) + } +} + // 删除输出流未被使用的Join指令 func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { opted := false - for i, op := range ctx.Ops { + for i, op := range ctx.Nodes { _, ok := op.Type.(*ChunkedJoinOp) if !ok { continue @@ -186,18 +240,18 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { in.RemoveTo(op) } - ctx.Ops[i] = nil + ctx.Nodes[i] = nil opted = true } - ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) return opted } // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { opted := false - for i, op := range ctx.Ops { + for i, op := range ctx.Nodes { _, ok := op.Type.(*MultiplyOp) if !ok { continue @@ -217,20 +271,20 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { in.RemoveTo(op) } - ctx.Ops[i] = nil + ctx.Nodes[i] = nil } opted = true } - ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) return opted } // 删除未使用的Split指令 func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { opted := false - for i, op := range ctx.Ops { + for i, op := range ctx.Nodes { _, ok := op.Type.(*ChunkedSplitOp) if !ok { continue @@ -247,12 +301,12 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { if isAllUnused { op.InputStreams[0].RemoveTo(op) - ctx.Ops[i] = nil + ctx.Nodes[i] = nil opted = true } } - ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) return opted } @@ -260,7 +314,7 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { opted := false loop: - for iSplit, splitOp := range ctx.Ops { + for iSplit, splitOp := range ctx.Nodes { // 进行合并操作时会删除多个指令,因此这里存在splitOp == nil的情况 if splitOp == nil { continue @@ -309,19 +363,19 @@ loop: } // 并删除这两个指令 - ctx.Ops[iSplit] = nil - lo2.Clear(ctx.Ops, joinOp) + ctx.Nodes[iSplit] = nil + lo2.Clear(ctx.Nodes, joinOp) opted = true } - ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) return opted } // 确定Split命令的执行位置 func (p *DefaultParser) pinSplit(ctx *ParseContext) bool { opted := false - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { _, ok := op.Type.(*ChunkedSplitOp) if !ok { continue @@ -380,7 +434,7 @@ func (p *DefaultParser) pinSplit(ctx *ParseContext) bool { // 确定Join命令的执行位置,策略与固定Split类似 func (p *DefaultParser) pinJoin(ctx *ParseContext) bool { opted := false - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { _, ok := op.Type.(*ChunkedJoinOp) if !ok { continue @@ -443,7 +497,7 @@ func (p *DefaultParser) pinJoin(ctx *ParseContext) bool { // 确定Multiply命令的执行位置 func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool { opted := false - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { _, ok := op.Type.(*MultiplyOp) if !ok { continue @@ -507,7 +561,7 @@ func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool { // 确定IPFS读取指令的执行位置 func (p *DefaultParser) pinIPFSRead(ctx *ParseContext) bool { opted := false - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { _, ok := op.Type.(*IPFSReadType) if !ok { continue @@ -545,7 +599,7 @@ func (p *DefaultParser) pinIPFSRead(ctx *ParseContext) bool { // 对于所有未使用的流,增加Drop指令 func (p *DefaultParser) dropUnused(ctx *ParseContext) { - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { for _, out := range op.OutputStreams { if len(out.Toes) == 0 { dropOp := &Node{ @@ -553,7 +607,7 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { Type: &DropOp{}, } dropOp.AddInput(out) - ctx.Ops = append(ctx.Ops, dropOp) + ctx.Nodes = append(ctx.Nodes, dropOp) } } } @@ -561,7 +615,7 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { // 为IPFS写入指令存储结果 func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { w, ok := op.Type.(*IPFSWriteType) if !ok { continue @@ -578,13 +632,13 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { }, } storeOp.AddInputVar(op.OutputValues[0]) - ctx.Ops = append(ctx.Ops, storeOp) + ctx.Nodes = append(ctx.Nodes, storeOp) } } // 生成Clone指令 func (p *DefaultParser) generateClone(ctx *ParseContext) { - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { for _, out := range op.OutputStreams { if len(out.Toes) <= 1 { continue @@ -599,7 +653,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { } out.Toes = nil cloneOp.AddInput(out) - ctx.Ops = append(ctx.Ops, cloneOp) + ctx.Nodes = append(ctx.Nodes, cloneOp) } for _, out := range op.OutputValues { @@ -622,7 +676,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { // 生成Send指令 func (p *DefaultParser) generateSend(ctx *ParseContext) { - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { for _, out := range op.OutputStreams { to := out.Toes[0] if to.Env.Equals(op.Env) { @@ -639,7 +693,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { out.Toes = nil getStrOp.AddInput(out) to.ReplaceInput(out, getStrOp.NewOutput(out.DataIndex)) - ctx.Ops = append(ctx.Ops, getStrOp) + ctx.Nodes = append(ctx.Nodes, getStrOp) case *AgentEnv: // 如果是要送到Agent,则可以直接发送 @@ -650,7 +704,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { out.Toes = nil sendStrOp.AddInput(out) to.ReplaceInput(out, sendStrOp.NewOutput(out.DataIndex)) - ctx.Ops = append(ctx.Ops, sendStrOp) + ctx.Nodes = append(ctx.Nodes, sendStrOp) } } @@ -670,7 +724,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { out.Toes = nil getVarOp.AddInputVar(out) to.ReplaceInputVar(out, getVarOp.NewOutputVar(out.Type)) - ctx.Ops = append(ctx.Ops, getVarOp) + ctx.Nodes = append(ctx.Nodes, getVarOp) case *AgentEnv: // 如果是要送到Agent,则可以直接发送 @@ -681,7 +735,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { out.Toes = nil sendVarOp.AddInputVar(out) to.ReplaceInputVar(out, sendVarOp.NewOutputVar(out.Type)) - ctx.Ops = append(ctx.Ops, sendVarOp) + ctx.Nodes = append(ctx.Nodes, sendVarOp) } } } @@ -689,7 +743,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { // 生成Plan func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *PlanBuilder) error { - for _, op := range ctx.Ops { + for _, op := range ctx.Nodes { for _, out := range op.OutputStreams { if out.Var != nil { continue