Browse Source

将多副本读取逻辑改成基于Plan来实现

gitlink
Sydonian 1 year ago
parent
commit
b6cb55b9a5
9 changed files with 386 additions and 218 deletions
  1. +32
    -12
      common/pkgs/downloader/iterator.go
  2. +4
    -0
      common/pkgs/ec/rs.go
  3. +34
    -0
      common/pkgs/ioswitch/ops/drop.go
  4. +1
    -1
      common/pkgs/ioswitch/ops/ec.go
  5. +29
    -22
      common/pkgs/ioswitch/ops/grpc.go
  6. +11
    -61
      common/pkgs/ioswitch/plans/executor.go
  7. +50
    -72
      common/pkgs/ioswitch/plans/fromto.go
  8. +129
    -8
      common/pkgs/ioswitch/plans/ops.go
  9. +96
    -42
      common/pkgs/ioswitch/plans/parser.go

+ 32
- 12
common/pkgs/downloader/iterator.go View File

@@ -1,6 +1,7 @@
package downloader

import (
"context"
"fmt"
"io"
"math"
@@ -21,6 +22,7 @@ import (
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
@@ -170,27 +172,45 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2
return nil, err
}

var strHandle *plans.ExecutorReadStream
ft := plans.FromTo{
Object: *obj.Detail,
}

bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
if bsc < osc {
logger.Debugf("downloading object from node %v(%v)", blocks[0].Node.Name, blocks[0].Node.NodeID)

return NewIPFSReaderWithRange(blocks[0].Node, blocks[0].Block.FileHash, ipfs.ReadOption{
Offset: obj.Raw.Offset,
Length: obj.Raw.Length,
}), nil
}
toExec, handle := plans.NewToExecutor(-1)
ft.AddFrom(plans.NewFromNode(&blocks[0].Node, -1)).AddTo(toExec)
strHandle = handle

// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
if osc == math.MaxFloat64 {
// TODO2 处理Offset和Length
} else if osc == math.MaxFloat64 {
// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
return nil, fmt.Errorf("no node has this object")
} else {
logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID)

toExec, handle := plans.NewToExecutor(-1)
ft.AddFrom(plans.NewFromNode(node, -1)).AddTo(toExec)
strHandle = handle
// TODO2 处理Offset和Length
}

logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID)
return NewIPFSReaderWithRange(*node, obj.Detail.Object.FileHash, ipfs.ReadOption{
Offset: obj.Raw.Offset,
Length: obj.Raw.Length,
}), nil
parser := plans.DefaultParser{
EC: cdssdk.DefaultECRedundancy,
}
plans := plans.NewPlanBuilder()
if err := parser.Parse(ft, plans); err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

exec := plans.Execute()
go exec.Wait(context.TODO())

return exec.BeginRead(strHandle)
}

func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {


+ 4
- 0
common/pkgs/ec/rs.go View File

@@ -42,3 +42,7 @@ func (r *Rs) ReconstructAny(blocks [][]byte, outBlockIdxes []int) error {
}
return r.encoder.ReconstructAny(blocks, required)
}

func (r *Rs) GenerateMatrix(inputIdxs []int, outputIdxs []int) ([][]byte, error) {
return r.encoder.(reedsolomon.Extensions).GenerateMatrix(inputIdxs, outputIdxs)
}

+ 34
- 0
common/pkgs/ioswitch/ops/drop.go View File

@@ -0,0 +1,34 @@
package ops

import (
"context"
"io"

"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type DropStream struct {
Input *ioswitch.StreamVar `json:"input"`
}

func (o *DropStream) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, o.Input)
if err != nil {
return err
}

for {
buf := make([]byte, 1024*8)
_, err = o.Input.Stream.Read(buf)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}

func init() {
OpUnion.AddT((*DropStream)(nil))
}

+ 1
- 1
common/pkgs/ioswitch/ops/ec.go View File

@@ -105,7 +105,7 @@ type ECMultiply struct {
Coef [][]byte `json:"coef"`
Inputs []*ioswitch.StreamVar `json:"inputs"`
Outputs []*ioswitch.StreamVar `json:"outputs"`
ChunkSize int64 `json:"chunkSize"`
ChunkSize int `json:"chunkSize"`
}

func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error {


+ 29
- 22
common/pkgs/ioswitch/ops/grpc.go View File

@@ -14,16 +14,17 @@ import (
)

type SendStream struct {
Stream *ioswitch.StreamVar `json:"stream"`
Node cdssdk.Node `json:"node"`
Input *ioswitch.StreamVar `json:"input"`
Send *ioswitch.StreamVar `json:"send"`
Node cdssdk.Node `json:"node"`
}

func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, o.Stream)
err := sw.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Stream.Stream.Close()
defer o.Input.Stream.Close()

agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node))
if err != nil {
@@ -31,9 +32,10 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error {
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("sending stream %v to node %v", o.Stream.ID, o.Node)
logger.Debugf("sending stream %v as %v to node %v", o.Input.ID, o.Send.ID, o.Node)

err = agtCli.SendStream(ctx, sw.Plan().ID, o.Stream.ID, o.Stream.Stream)
// 发送后流的ID不同
err = agtCli.SendStream(ctx, sw.Plan().ID, o.Send.ID, o.Input.Stream)
if err != nil {
return fmt.Errorf("sending stream: %w", err)
}
@@ -42,7 +44,8 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error {
}

type GetStream struct {
Stream *ioswitch.StreamVar `json:"stream"`
Get *ioswitch.StreamVar `json:"get"`
Output *ioswitch.StreamVar `json:"output"`
Node cdssdk.Node `json:"node"`
}

@@ -53,29 +56,31 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error {
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("getting stream %v from node %v", o.Stream.ID, o.Node)
logger.Debugf("getting stream %v as %v from node %v", o.Get.ID, o.Output.ID, o.Node)

str, err := agtCli.GetStream(sw.Plan().ID, o.Stream.ID)
str, err := agtCli.GetStream(sw.Plan().ID, o.Get.ID)
if err != nil {
return fmt.Errorf("getting stream: %w", err)
}

fut := future.NewSetVoid()
o.Stream.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
// 获取后送到本地的流ID是不同的
o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
fut.SetVoid()
})
sw.PutVars(o.Stream)
sw.PutVars(o.Output)

return fut.Wait(ctx)
}

type SendVar struct {
Var ioswitch.Var `json:"var"`
Node cdssdk.Node `json:"node"`
Input ioswitch.Var `json:"input"`
Send ioswitch.Var `json:"send"`
Node cdssdk.Node `json:"node"`
}

func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, o.Var)
err := sw.BindVars(ctx, o.Input)
if err != nil {
return err
}
@@ -86,9 +91,10 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error {
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("sending var %v to node %v", o.Var.GetID(), o.Node)
logger.Debugf("sending var %v as %v to node %v", o.Input.GetID(), o.Send.GetID(), o.Node)

err = agtCli.SendVar(ctx, sw.Plan().ID, o.Var)
ioswitch.AssignVar(o.Input, o.Send)
err = agtCli.SendVar(ctx, sw.Plan().ID, o.Send)
if err != nil {
return fmt.Errorf("sending var: %w", err)
}
@@ -97,8 +103,9 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error {
}

type GetVar struct {
Var ioswitch.Var `json:"var"`
Node cdssdk.Node `json:"node"`
Get ioswitch.Var `json:"get"`
Output ioswitch.Var `json:"output"`
Node cdssdk.Node `json:"node"`
}

func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error {
@@ -108,14 +115,14 @@ func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error {
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("getting var %v from node %v", o.Var.GetID(), o.Node)
logger.Debugf("getting var %v as %v from node %v", o.Get.GetID(), o.Output.GetID(), o.Node)

v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Var)
v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Get)
if err != nil {
return fmt.Errorf("getting var: %w", err)
}
o.Var = v2
sw.PutVars(o.Var)
ioswitch.AssignVar(v2, o.Output)
sw.PutVars(o.Output)

return nil
}


+ 11
- 61
common/pkgs/ioswitch/plans/executor.go View File

@@ -20,22 +20,22 @@ type Executor struct {
executorSw *ioswitch.Switch
}

func (e *Executor) BeginWrite(str io.ReadCloser, target *ExecutorWriteStream) {
target.stream.Stream = str
e.executorSw.PutVars(target.stream)
func (e *Executor) BeginWrite(str io.ReadCloser, handle *ExecutorWriteStream) {
handle.Var.Stream = str
e.executorSw.PutVars(handle.Var)
}

func (e *Executor) BeginRead(target *ExecutorReadStream) (io.ReadCloser, error) {
err := e.executorSw.BindVars(e.ctx, target.stream)
func (e *Executor) BeginRead(handle *ExecutorReadStream) (io.ReadCloser, error) {
err := e.executorSw.BindVars(e.ctx, handle.Var)
if err != nil {
return nil, fmt.Errorf("bind vars: %w", err)
}

return target.stream.Stream, nil
return handle.Var.Stream, nil
}

func (e *Executor) Signal(signal *ExecutorSignalVar) {
e.executorSw.PutVars(signal.v)
e.executorSw.PutVars(signal.Var)
}

func (e *Executor) Wait(ctx context.Context) (map[string]any, error) {
@@ -45,7 +45,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) {
}

ret := make(map[string]any)
e.planBlder.StoreMap.Range(func(k, v any) bool {
e.planBlder.ExecutorPlan.StoreMap.Range(func(k, v any) bool {
ret[k.(string)] = v
return true
})
@@ -98,64 +98,14 @@ func (e *Executor) stopWith(err error) {
e.cancel()
}

// type ExecutorStreamVar struct {
// blder *PlanBuilder
// v *ioswitch.StreamVar
// }
type ExecutorWriteStream struct {
stream *ioswitch.StreamVar
Var *ioswitch.StreamVar
}

// func (b *ExecutorPlanBuilder) WillWrite(str *ExecutorWriteStream) *ExecutorStreamVar {
// stream := b.blder.NewStreamVar()
// str.stream = stream
// return &ExecutorStreamVar{blder: b.blder, v: stream}
// }

// func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar {
// s := b.blder.NewSignalVar()
// return &ExecutorSignalVar{blder: b.blder, v: s}
// }

type ExecutorReadStream struct {
stream *ioswitch.StreamVar
}

// func (v *ExecutorStreamVar) WillRead(str *ExecutorReadStream) {
// str.stream = v.v
// }
/*
func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar {
s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.SendStream{Stream: s.v, Node: node})
return &AgentStreamVar{
owner: s.blder.AtAgent(node),
v: s.v,
}
}

type ExecutorStringVar struct {
blder *PlanBuilder
v *ioswitch.StringVar
}

func (s *ExecutorStringVar) Store(key string) {
s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.Store{
Var: s.v,
Key: key,
Store: s.blder.StoreMap,
})
Var *ioswitch.StreamVar
}

type ExecutorSignalVar struct {
blder *PlanBuilder
v *ioswitch.SignalVar
}

func (s *ExecutorSignalVar) To(node cdssdk.Node) *AgentSignalVar {
s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.SendVar{Var: s.v, Node: node})
return &AgentSignalVar{
owner: s.blder.AtAgent(node),
v: s.v,
}
Var *ioswitch.SignalVar
}
*/

+ 50
- 72
common/pkgs/ioswitch/plans/fromto.go View File

@@ -2,27 +2,43 @@ package plans

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

type FromTo struct {
Object stgmod.ObjectDetail
Froms []From
Tos []To
}

func (ft *FromTo) AddFrom(from From) *FromTo {
ft.Froms = append(ft.Froms, from)
return ft
}

func (ft *FromTo) AddTo(to To) *FromTo {
ft.Tos = append(ft.Tos, to)
return ft
}

type FromTos []FromTo

type From interface {
GetDataIndex() int
BuildOp() Node
}

type To interface {
GetRange() Range
GetDataIndex() int
BuildOp() Node
}

type FromTos []FromTo

type FromTo struct {
Froms []From
Tos []To
type Range struct {
Offset int64
Length int64
}

type FromExecutor struct {
Stream *ExecutorWriteStream
Handle *ExecutorWriteStream
DataIndex int
}

@@ -30,64 +46,43 @@ func (f *FromExecutor) GetDataIndex() int {
return f.DataIndex
}

func (f *FromExecutor) BuildOp() Node {
func (f *FromExecutor) BuildNode(ft *FromTo) Node {
op := Node{
Env: &ExecutorEnv{},
Type: FromExecutorOp{
OutputVar: 0,
Handle: f.Stream,
Type: &FromExecutorOp{
Handle: f.Handle,
},
}
op.NewOutput(nil)
op.NewOutput(f.DataIndex)
return op
}

type FromIPFS struct {
type FromNode struct {
Node *cdssdk.Node
FileHash string
DataIndex int
}

func NewFromIPFS(node *cdssdk.Node, fileHash string, dataIndex int) *FromIPFS {
return &FromIPFS{
func NewFromNode(node *cdssdk.Node, dataIndex int) *FromNode {
return &FromNode{
Node: node,
FileHash: fileHash,
DataIndex: dataIndex,
}
}

func (f *FromIPFS) GetDataIndex() int {
func (f *FromNode) GetDataIndex() int {
return f.DataIndex
}

func (f *FromIPFS) BuildOp() Node {
op := Node{
Pinned: true,
Type: &IPFSReadType{
OutputVar: 0,
FileHash: f.FileHash,
},
}

if f.Node == nil {
op.Env = nil
} else {
op.Env = &AgentEnv{*f.Node}
}

op.NewOutput(nil)
return op
}

type ToExecutor struct {
Stream *ExecutorReadStream
Handle *ExecutorReadStream
DataIndex int
Range Range
}

func NewToExecutor(dataIndex int) (*ToExecutor, *ExecutorReadStream) {
str := ExecutorReadStream{}
return &ToExecutor{
Stream: &str,
Handle: &str,
DataIndex: dataIndex,
}, &str
}
@@ -96,48 +91,31 @@ func (t *ToExecutor) GetDataIndex() int {
return t.DataIndex
}

func (t *ToExecutor) BuildOp() Node {
op := Node{
Env: &ExecutorEnv{},
Pinned: true,
Type: ToExecutorOp{
InputVar: 0,
Handle: t.Stream,
},
}
op.NewOutput(nil)
return op
func (t *ToExecutor) GetRange() Range {
return t.Range
}

type ToIPFS struct {
Node cdssdk.Node
DataIndex int
FileHashKey string
type ToAgent struct {
Node cdssdk.Node
DataIndex int
Range Range
FileHashStoreKey string
}

func NewToIPFS(node cdssdk.Node, dataIndex int, fileHashKey string) *ToIPFS {
return &ToIPFS{
Node: node,
DataIndex: dataIndex,
FileHashKey: fileHashKey,
func NewToAgent(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToAgent {
return &ToAgent{
Node: node,
DataIndex: dataIndex,
FileHashStoreKey: fileHashStoreKey,
}
}

func (t *ToIPFS) GetDataIndex() int {
func (t *ToAgent) GetDataIndex() int {
return t.DataIndex
}

func (t *ToIPFS) BuildOp() Node {
op := Node{
Env: &AgentEnv{t.Node},
Pinned: true,
Type: &IPFSWriteType{
InputVar: 0,
FileHashVar: 0,
},
}
op.NewInput(nil)
return op
func (t *ToAgent) GetRange() Range {
return t.Range
}

// type ToStorage struct {


+ 129
- 8
common/pkgs/ioswitch/plans/ops.go View File

@@ -5,7 +5,9 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
)

type VarIndex int
@@ -130,8 +132,13 @@ type IPFSReadType struct {
Option ipfs.ReadOption
}

func (t *IPFSReadType) GenerateOp(op *Node, blder *PlanBuilder) error {

func (t *IPFSReadType) GenerateOp(node *Node, blder *PlanBuilder) error {
addOpByEnv(&ops.IPFSRead{
Output: node.OutputStreams[0].Var,
FileHash: t.FileHash,
Option: t.Option,
}, node.Env, blder)
return nil
}

type IPFSWriteType struct {
@@ -139,7 +146,11 @@ type IPFSWriteType struct {
}

func (t *IPFSWriteType) GenerateOp(op *Node, blder *PlanBuilder) error {

addOpByEnv(&ops.IPFSWrite{
Input: op.InputStreams[0].Var,
FileHash: op.OutputValues[0].Var.(*ioswitch.StringVar),
}, op.Env, blder)
return nil
}

type ChunkedSplitOp struct {
@@ -148,7 +159,15 @@ type ChunkedSplitOp struct {
}

func (t *ChunkedSplitOp) GenerateOp(op *Node, blder *PlanBuilder) error {

addOpByEnv(&ops.ChunkedSplit{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar {
return v.Var
}),
ChunkSize: t.ChunkSize,
PaddingZeros: t.PaddingZeros,
}, op.Env, blder)
return nil
}

type ChunkedJoinOp struct {
@@ -156,27 +175,68 @@ type ChunkedJoinOp struct {
}

func (t *ChunkedJoinOp) GenerateOp(op *Node, blder *PlanBuilder) error {

addOpByEnv(&ops.ChunkedJoin{
Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar {
return v.Var
}),
Output: op.OutputStreams[0].Var,
ChunkSize: t.ChunkSize,
}, op.Env, blder)
return nil
}

type CloneStreamOp struct{}

func (t *CloneStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error {

addOpByEnv(&ops.CloneStream{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar {
return v.Var
}),
}, op.Env, blder)
return nil
}

type CloneVarOp struct{}

func (t *CloneVarOp) GenerateOp(op *Node, blder *PlanBuilder) error {

addOpByEnv(&ops.CloneVar{
Raw: op.InputValues[0].Var,
Cloneds: lo.Map(op.OutputValues, func(v *ValueVar, idx int) ioswitch.Var {
return v.Var
}),
}, op.Env, blder)
return nil
}

type MultiplyOp struct {
Coef [][]byte
EC cdssdk.ECRedundancy
ChunkSize int
}

func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error {
var inputIdxs []int
var outputIdxs []int
for _, in := range op.InputStreams {
inputIdxs = append(inputIdxs, in.DataIndex)
}
for _, out := range op.OutputStreams {
outputIdxs = append(outputIdxs, out.DataIndex)
}

rs, _ := ec.NewRs(t.EC.K, t.EC.N)
coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs)
if err != nil {
return err
}

addOpByEnv(&ops.ECMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }),
ChunkSize: t.ChunkSize,
}, op.Env, blder)
return nil
}

type FileReadOp struct {
@@ -184,6 +244,11 @@ type FileReadOp struct {
}

func (t *FileReadOp) GenerateOp(op *Node, blder *PlanBuilder) error {
addOpByEnv(&ops.FileRead{
Output: op.OutputStreams[0].Var,
FilePath: t.FilePath,
}, op.Env, blder)
return nil
}

type FileWriteOp struct {
@@ -191,6 +256,11 @@ type FileWriteOp struct {
}

func (t *FileWriteOp) GenerateOp(op *Node, blder *PlanBuilder) error {
addOpByEnv(&ops.FileWrite{
Input: op.InputStreams[0].Var,
FilePath: t.FilePath,
}, op.Env, blder)
return nil
}

type FromExecutorOp struct {
@@ -198,6 +268,8 @@ type FromExecutorOp struct {
}

func (t *FromExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error {
t.Handle.Var = op.OutputStreams[0].Var
return nil
}

type ToExecutorOp struct {
@@ -205,6 +277,8 @@ type ToExecutorOp struct {
}

func (t *ToExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error {
t.Handle.Var = op.InputStreams[0].Var
return nil
}

type StoreOp struct {
@@ -212,29 +286,76 @@ type StoreOp struct {
}

func (t *StoreOp) GenerateOp(op *Node, blder *PlanBuilder) error {
blder.AtExecutor().AddOp(&ops.Store{
Var: op.InputValues[0].Var,
Key: t.StoreKey,
Store: blder.ExecutorPlan.StoreMap,
})
return nil
}

type DropOp struct{}

func (t *DropOp) GenerateOp(op *Node, blder *PlanBuilder) error {
addOpByEnv(&ops.DropStream{
Input: op.InputStreams[0].Var,
}, op.Env, blder)
return nil
}

type SendStreamOp struct{}

func (t *SendStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error {
toAgt := op.OutputStreams[0].Toes[0].Env.(*AgentEnv)
addOpByEnv(&ops.SendStream{
Input: op.InputStreams[0].Var,
Send: op.OutputStreams[0].Var,
Node: toAgt.Node,
}, op.Env, blder)
return nil
}

type GetStreamOp struct{}

func (t *GetStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error {
fromAgt := op.InputStreams[0].From.Env.(*AgentEnv)
addOpByEnv(&ops.GetStream{
Output: op.OutputStreams[0].Var,
Get: op.InputStreams[0].Var,
Node: fromAgt.Node,
}, op.Env, blder)
return nil
}

type SendVarOp struct{}

func (t *SendVarOp) GenerateOp(op *Node, blder *PlanBuilder) error {
toAgt := op.OutputValues[0].Toes[0].Env.(*AgentEnv)
addOpByEnv(&ops.SendVar{
Input: op.InputValues[0].Var,
Send: op.OutputValues[0].Var,
Node: toAgt.Node,
}, op.Env, blder)
return nil
}

type GetVarOp struct{}

func (t *GetVarOp) GenerateOp(op *Node, blder *PlanBuilder) error {
fromAgt := op.InputValues[0].From.Env.(*AgentEnv)
addOpByEnv(&ops.GetVar{
Output: op.OutputValues[0].Var,
Get: op.InputValues[0].Var,
Node: fromAgt.Node,
}, op.Env, blder)
return nil
}

func addOpByEnv(op ioswitch.Op, env OpEnv, blder *PlanBuilder) {
switch env := env.(type) {
case *AgentEnv:
blder.AtAgent(env.Node).AddOp(op)
case *ExecutorEnv:
blder.AtExecutor().AddOp(op)
}
}

+ 96
- 42
common/pkgs/ioswitch/plans/parser.go View File

@@ -12,12 +12,12 @@ type FromToParser interface {
}

type DefaultParser struct {
EC *cdssdk.ECRedundancy
EC cdssdk.ECRedundancy
}

type ParseContext struct {
Ft FromTo
Ops []*Node
Ft FromTo
Nodes []*Node
}

func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error {
@@ -85,7 +85,7 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error {
return p.buildPlan(&ctx, blder)
}
func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *StreamVar {
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
for _, o := range op.OutputStreams {
if o.DataIndex == dataIndex {
return o
@@ -98,8 +98,11 @@ func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *Stre

func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) error {
for _, f := range ft.Froms {
o := f.BuildOp()
ctx.Ops = append(ctx.Ops, &o)
n, err := p.buildFromNode(&ft, f)
if err != nil {
return err
}
ctx.Nodes = append(ctx.Nodes, n)

// 对于完整文件的From,生成Split指令
if f.GetDataIndex() == -1 {
@@ -107,18 +110,18 @@ func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder)
Env: nil,
Type: &ChunkedSplitOp{ChunkSize: p.EC.ChunkSize, PaddingZeros: true},
}
splitOp.AddInput(o.OutputStreams[0])
splitOp.AddInput(n.OutputStreams[0])
for i := 0; i < p.EC.K; i++ {
splitOp.NewOutput(i)
}
ctx.Ops = append(ctx.Ops, splitOp)
ctx.Nodes = append(ctx.Nodes, splitOp)
}
}

// 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
ecInputStrs := make(map[int]*StreamVar)
loop:
for _, o := range ctx.Ops {
for _, o := range ctx.Nodes {
for _, s := range o.OutputStreams {
if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil {
ecInputStrs[s.DataIndex] = s
@@ -140,7 +143,7 @@ loop:
for i := 0; i < p.EC.N; i++ {
mulOp.NewOutput(i)
}
ctx.Ops = append(ctx.Ops, mulOp)
ctx.Nodes = append(ctx.Nodes, mulOp)

joinOp := &Node{
Env: nil,
@@ -155,24 +158,75 @@ loop:

// 为每一个To找到一个输入流
for _, t := range ft.Tos {
o := t.BuildOp()
ctx.Ops = append(ctx.Ops, &o)
n, err := p.buildToNode(&ft, t)
if err != nil {
return err
}

ctx.Nodes = append(ctx.Nodes, n)

str := p.findOutputStream(ctx, t.GetDataIndex())
if str == nil {
return fmt.Errorf("no output stream found for data index %d", t.GetDataIndex())
}

o.AddInput(str)
n.AddInput(str)
}

return nil
}

func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) {
switch f := f.(type) {
case *FromNode:
n := &Node{
// TODO2 需要FromTo的Range来设置Option
Type: &IPFSReadType{FileHash: ft.Object.Object.FileHash},
}
n.NewOutput(f.DataIndex)

if f.Node != nil {
n.Env = &AgentEnv{Node: *f.Node}
}

return n, nil

case *FromExecutor:
n := &Node{
Env: &ExecutorEnv{},
Type: &FromExecutorOp{Handle: f.Handle},
}
n.NewOutput(f.DataIndex)
return n, nil

default:
return nil, fmt.Errorf("unsupported from type %T", f)
}
}

func (p *DefaultParser) buildToNode(ft *FromTo, t To) (*Node, error) {
switch t := t.(type) {
case *ToAgent:
return &Node{
Env: &AgentEnv{t.Node},
Type: &IPFSWriteType{FileHashStoreKey: t.FileHashStoreKey},
}, nil

case *ToExecutor:
return &Node{
Env: &ExecutorEnv{},
Type: &ToExecutorOp{Handle: t.Handle},
}, nil

default:
return nil, fmt.Errorf("unsupported to type %T", t)
}
}

// 删除输出流未被使用的Join指令
func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
opted := false
for i, op := range ctx.Ops {
for i, op := range ctx.Nodes {
_, ok := op.Type.(*ChunkedJoinOp)
if !ok {
continue
@@ -186,18 +240,18 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
in.RemoveTo(op)
}

ctx.Ops[i] = nil
ctx.Nodes[i] = nil
opted = true
}

ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes)
return opted
}

// 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
opted := false
for i, op := range ctx.Ops {
for i, op := range ctx.Nodes {
_, ok := op.Type.(*MultiplyOp)
if !ok {
continue
@@ -217,20 +271,20 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
in.RemoveTo(op)
}

ctx.Ops[i] = nil
ctx.Nodes[i] = nil
}

opted = true
}

ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes)
return opted
}

// 删除未使用的Split指令
func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
opted := false
for i, op := range ctx.Ops {
for i, op := range ctx.Nodes {
_, ok := op.Type.(*ChunkedSplitOp)
if !ok {
continue
@@ -247,12 +301,12 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {

if isAllUnused {
op.InputStreams[0].RemoveTo(op)
ctx.Ops[i] = nil
ctx.Nodes[i] = nil
opted = true
}
}

ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes)
return opted
}

@@ -260,7 +314,7 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
opted := false
loop:
for iSplit, splitOp := range ctx.Ops {
for iSplit, splitOp := range ctx.Nodes {
// 进行合并操作时会删除多个指令,因此这里存在splitOp == nil的情况
if splitOp == nil {
continue
@@ -309,19 +363,19 @@ loop:
}

// 并删除这两个指令
ctx.Ops[iSplit] = nil
lo2.Clear(ctx.Ops, joinOp)
ctx.Nodes[iSplit] = nil
lo2.Clear(ctx.Nodes, joinOp)
opted = true
}

ctx.Ops = lo2.RemoveAllDefault(ctx.Ops)
ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes)
return opted
}

// 确定Split命令的执行位置
func (p *DefaultParser) pinSplit(ctx *ParseContext) bool {
opted := false
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
_, ok := op.Type.(*ChunkedSplitOp)
if !ok {
continue
@@ -380,7 +434,7 @@ func (p *DefaultParser) pinSplit(ctx *ParseContext) bool {
// 确定Join命令的执行位置,策略与固定Split类似
func (p *DefaultParser) pinJoin(ctx *ParseContext) bool {
opted := false
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
_, ok := op.Type.(*ChunkedJoinOp)
if !ok {
continue
@@ -443,7 +497,7 @@ func (p *DefaultParser) pinJoin(ctx *ParseContext) bool {
// 确定Multiply命令的执行位置
func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool {
opted := false
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
_, ok := op.Type.(*MultiplyOp)
if !ok {
continue
@@ -507,7 +561,7 @@ func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool {
// 确定IPFS读取指令的执行位置
func (p *DefaultParser) pinIPFSRead(ctx *ParseContext) bool {
opted := false
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
_, ok := op.Type.(*IPFSReadType)
if !ok {
continue
@@ -545,7 +599,7 @@ func (p *DefaultParser) pinIPFSRead(ctx *ParseContext) bool {

// 对于所有未使用的流,增加Drop指令
func (p *DefaultParser) dropUnused(ctx *ParseContext) {
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
for _, out := range op.OutputStreams {
if len(out.Toes) == 0 {
dropOp := &Node{
@@ -553,7 +607,7 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) {
Type: &DropOp{},
}
dropOp.AddInput(out)
ctx.Ops = append(ctx.Ops, dropOp)
ctx.Nodes = append(ctx.Nodes, dropOp)
}
}
}
@@ -561,7 +615,7 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) {

// 为IPFS写入指令存储结果
func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
w, ok := op.Type.(*IPFSWriteType)
if !ok {
continue
@@ -578,13 +632,13 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
},
}
storeOp.AddInputVar(op.OutputValues[0])
ctx.Ops = append(ctx.Ops, storeOp)
ctx.Nodes = append(ctx.Nodes, storeOp)
}
}

// 生成Clone指令
func (p *DefaultParser) generateClone(ctx *ParseContext) {
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
for _, out := range op.OutputStreams {
if len(out.Toes) <= 1 {
continue
@@ -599,7 +653,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {
}
out.Toes = nil
cloneOp.AddInput(out)
ctx.Ops = append(ctx.Ops, cloneOp)
ctx.Nodes = append(ctx.Nodes, cloneOp)
}

for _, out := range op.OutputValues {
@@ -622,7 +676,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {

// 生成Send指令
func (p *DefaultParser) generateSend(ctx *ParseContext) {
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
for _, out := range op.OutputStreams {
to := out.Toes[0]
if to.Env.Equals(op.Env) {
@@ -639,7 +693,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) {
out.Toes = nil
getStrOp.AddInput(out)
to.ReplaceInput(out, getStrOp.NewOutput(out.DataIndex))
ctx.Ops = append(ctx.Ops, getStrOp)
ctx.Nodes = append(ctx.Nodes, getStrOp)

case *AgentEnv:
// 如果是要送到Agent,则可以直接发送
@@ -650,7 +704,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) {
out.Toes = nil
sendStrOp.AddInput(out)
to.ReplaceInput(out, sendStrOp.NewOutput(out.DataIndex))
ctx.Ops = append(ctx.Ops, sendStrOp)
ctx.Nodes = append(ctx.Nodes, sendStrOp)
}
}

@@ -670,7 +724,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) {
out.Toes = nil
getVarOp.AddInputVar(out)
to.ReplaceInputVar(out, getVarOp.NewOutputVar(out.Type))
ctx.Ops = append(ctx.Ops, getVarOp)
ctx.Nodes = append(ctx.Nodes, getVarOp)

case *AgentEnv:
// 如果是要送到Agent,则可以直接发送
@@ -681,7 +735,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) {
out.Toes = nil
sendVarOp.AddInputVar(out)
to.ReplaceInputVar(out, sendVarOp.NewOutputVar(out.Type))
ctx.Ops = append(ctx.Ops, sendVarOp)
ctx.Nodes = append(ctx.Nodes, sendVarOp)
}
}
}
@@ -689,7 +743,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) {

// 生成Plan
func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *PlanBuilder) error {
for _, op := range ctx.Ops {
for _, op := range ctx.Nodes {
for _, out := range op.OutputStreams {
if out.Var != nil {
continue


Loading…
Cancel
Save