diff --git a/common/pkgs/ec/lrc/lrc.go b/common/pkgs/ec/lrc/lrc.go new file mode 100644 index 0000000..6ef6c5a --- /dev/null +++ b/common/pkgs/ec/lrc/lrc.go @@ -0,0 +1,36 @@ +package lrc + +import "github.com/klauspost/reedsolomon" + +type LRC struct { + n int // 总块数,包括局部块 + k int // 数据块数量 + groups []int // 分组校验块生成时使用的数据块 + l *reedsolomon.LRC +} + +func New(n int, k int, groups []int) (*LRC, error) { + lrc := &LRC{ + n: n, + k: k, + groups: groups, + } + + l, err := reedsolomon.NewLRC(k, n-k, groups) + if err != nil { + return nil, err + } + + lrc.l = l + return lrc, nil +} + +// 根据全局修复的原理,生成根据输入修复指定块的矩阵。要求input内元素的值= 0,则表示在文件的某个分片的范围。 + GetRange() exec.Range + GetDataIndex() int +} + +type FromDriver struct { + Handle *exec.DriverWriteStream + DataIndex int +} + +func NewFromDriver(dataIndex int) (*FromDriver, *exec.DriverWriteStream) { + handle := &exec.DriverWriteStream{ + RangeHint: &exec.Range{}, + } + return &FromDriver{ + Handle: handle, + DataIndex: dataIndex, + }, handle +} + +func (f *FromDriver) GetDataIndex() int { + return f.DataIndex +} + +type FromNode struct { + FileHash string + Node *cdssdk.Node + DataIndex int +} + +func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode { + return &FromNode{ + FileHash: fileHash, + Node: node, + DataIndex: dataIndex, + } +} + +func (f *FromNode) GetDataIndex() int { + return f.DataIndex +} + +type ToDriver struct { + Handle *exec.DriverReadStream + DataIndex int + Range exec.Range +} + +func NewToDriver(dataIndex int) (*ToDriver, *exec.DriverReadStream) { + str := exec.DriverReadStream{} + return &ToDriver{ + Handle: &str, + DataIndex: dataIndex, + }, &str +} + +func NewToDriverWithRange(dataIndex int, rng exec.Range) (*ToDriver, *exec.DriverReadStream) { + str := exec.DriverReadStream{} + return &ToDriver{ + Handle: &str, + DataIndex: dataIndex, + Range: rng, + }, &str +} + +func (t *ToDriver) GetDataIndex() int { + return t.DataIndex +} + +func (t *ToDriver) GetRange() exec.Range { + return t.Range +} + +type ToNode struct { + Node cdssdk.Node + DataIndex int + Range exec.Range + FileHashStoreKey string +} + +func NewToNode(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToNode { + return &ToNode{ + Node: node, + DataIndex: dataIndex, + FileHashStoreKey: fileHashStoreKey, + } +} + +func NewToNodeWithRange(node cdssdk.Node, dataIndex int, fileHashStoreKey string, rng exec.Range) *ToNode { + return &ToNode{ + Node: node, + DataIndex: dataIndex, + FileHashStoreKey: fileHashStoreKey, + Range: rng, + } +} + +func (t *ToNode) GetDataIndex() int { + return t.DataIndex +} + +func (t *ToNode) GetRange() exec.Range { + return t.Range +} + +// type ToStorage struct { +// Storage cdssdk.Storage +// DataIndex int +// } + +// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage { +// return &ToStorage{ +// Storage: storage, +// DataIndex: dataIndex, +// } +// } + +// func (t *ToStorage) GetDataIndex() int { +// return t.DataIndex +// } diff --git a/common/pkgs/ioswitchlrc/ioswitch.go b/common/pkgs/ioswitchlrc/ioswitch.go new file mode 100644 index 0000000..b6198a8 --- /dev/null +++ b/common/pkgs/ioswitchlrc/ioswitch.go @@ -0,0 +1,23 @@ +package ioswitchlrc + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +type NodeProps struct { + From From + To To +} + +type ValueVarType int + +const ( + StringValueVar ValueVarType = iota + SignalValueVar +) + +type VarProps struct { + StreamIndex int // 流的编号,只在StreamVar上有意义 + ValueType ValueVarType // 值类型,只在ValueVar上有意义 + Var exec.Var // 生成Plan的时候创建的对应的Var +} diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go new file mode 100644 index 0000000..18090a4 --- /dev/null +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -0,0 +1,137 @@ +package ops2 + +import ( + "context" + "fmt" + "io" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "golang.org/x/sync/semaphore" +) + +func init() { + exec.UseOp[*ChunkedSplit]() + exec.UseOp[*ChunkedJoin]() +} + +type ChunkedSplit struct { + Input *exec.StreamVar `json:"input"` + Outputs []*exec.StreamVar `json:"outputs"` + ChunkSize int `json:"chunkSize"` + PaddingZeros bool `json:"paddingZeros"` +} + +func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + outputs := io2.ChunkedSplit(o.Input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{ + PaddingZeros: o.PaddingZeros, + }) + + sem := semaphore.NewWeighted(int64(len(outputs))) + for i := range outputs { + sem.Acquire(ctx, 1) + + o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + sem.Release(1) + }) + } + exec.PutArrayVars(e, o.Outputs) + + return sem.Acquire(ctx, int64(len(outputs))) +} + +type ChunkedJoin struct { + Inputs []*exec.StreamVar `json:"inputs"` + Output *exec.StreamVar `json:"output"` + ChunkSize int `json:"chunkSize"` +} + +func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx, o.Inputs) + if err != nil { + return err + } + + var strReaders []io.Reader + for _, s := range o.Inputs { + strReaders = append(strReaders, s.Stream) + } + defer func() { + for _, str := range o.Inputs { + str.Stream.Close() + } + }() + + fut := future.NewSetVoid() + o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { + fut.SetVoid() + }) + e.PutVars(o.Output) + + return fut.Wait(ctx) +} + +type ChunkedSplitType struct { + OutputCount int + ChunkSize int +} + +func (t *ChunkedSplitType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) + for i := 0; i < t.OutputCount; i++ { + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{ + StreamIndex: i, + }) + } +} + +func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &ChunkedSplit{ + Input: op.InputStreams[0].Var, + Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + return v.Var + }), + ChunkSize: t.ChunkSize, + PaddingZeros: true, + }, nil +} + +func (t *ChunkedSplitType) String(node *dag.Node) string { + return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +} + +type ChunkedJoinType struct { + InputCount int + ChunkSize int +} + +func (t *ChunkedJoinType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, t.InputCount) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{ + StreamIndex: -1, + }) +} + +func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &ChunkedJoin{ + Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + return v.Var + }), + Output: op.OutputStreams[0].Var, + ChunkSize: t.ChunkSize, + }, nil +} + +func (t *ChunkedJoinType) String(node *dag.Node) string { + return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +} diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go new file mode 100644 index 0000000..d53e29b --- /dev/null +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -0,0 +1,112 @@ +package ops2 + +import ( + "context" + "fmt" + "io" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/io2" + "golang.org/x/sync/semaphore" +) + +func init() { + exec.UseOp[*CloneStream]() + exec.UseOp[*CloneVar]() +} + +type CloneStream struct { + Input *exec.StreamVar `json:"input"` + Outputs []*exec.StreamVar `json:"outputs"` +} + +func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + cloned := io2.Clone(o.Input.Stream, len(o.Outputs)) + + sem := semaphore.NewWeighted(int64(len(o.Outputs))) + for i, s := range cloned { + sem.Acquire(ctx, 1) + + o.Outputs[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + sem.Release(1) + }) + } + exec.PutArrayVars(e, o.Outputs) + + return sem.Acquire(ctx, int64(len(o.Outputs))) +} + +type CloneVar struct { + Raw exec.Var `json:"raw"` + Cloneds []exec.Var `json:"cloneds"` +} + +func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Raw) + if err != nil { + return err + } + + for _, v := range o.Cloneds { + if err := exec.AssignVar(o.Raw, v); err != nil { + return fmt.Errorf("clone var: %w", err) + } + } + e.PutVars(o.Cloneds...) + + return nil +} + +type CloneStreamType struct{} + +func (t *CloneStreamType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &CloneStream{ + Input: op.InputStreams[0].Var, + Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + return v.Var + }), + }, nil +} + +func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar { + return dag.NodeNewOutputStream(node, nil) +} + +func (t *CloneStreamType) String(node *dag.Node) string { + return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type CloneVarType struct{} + +func (t *CloneVarType) InitNode(node *dag.Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &CloneVar{ + Raw: op.InputValues[0].Var, + Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var { + return v.Var + }), + }, nil +} + +func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar { + return dag.NodeNewOutputValue(node, nil) +} + +func (t *CloneVarType) String(node *dag.Node) string { + return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +} diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go new file mode 100644 index 0000000..7d644d4 --- /dev/null +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -0,0 +1,189 @@ +package ops2 + +import ( + "context" + "fmt" + "io" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/sync2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec/lrc" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" +) + +func init() { + exec.UseOp[*GalMultiply]() +} + +type GalMultiply struct { + Coef [][]byte `json:"coef"` + Inputs []*exec.StreamVar `json:"inputs"` + Outputs []*exec.StreamVar `json:"outputs"` + ChunkSize int `json:"chunkSize"` +} + +func (o *GalMultiply) Execute(ctx context.Context, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx, o.Inputs) + if err != nil { + return err + } + defer func() { + for _, s := range o.Inputs { + s.Stream.Close() + } + }() + + outputWrs := make([]*io.PipeWriter, len(o.Outputs)) + + for i := range o.Outputs { + rd, wr := io.Pipe() + o.Outputs[i].Stream = rd + outputWrs[i] = wr + } + + fut := future.NewSetVoid() + go func() { + mul := ec.GaloisMultiplier().BuildGalois() + + inputChunks := make([][]byte, len(o.Inputs)) + for i := range o.Inputs { + inputChunks[i] = make([]byte, o.ChunkSize) + } + outputChunks := make([][]byte, len(o.Outputs)) + for i := range o.Outputs { + outputChunks[i] = make([]byte, o.ChunkSize) + } + + for { + err := sync2.ParallelDo(o.Inputs, func(s *exec.StreamVar, i int) error { + _, err := io.ReadFull(s.Stream, inputChunks[i]) + return err + }) + if err == io.EOF { + fut.SetVoid() + return + } + if err != nil { + fut.SetError(err) + return + } + + err = mul.Multiply(o.Coef, inputChunks, outputChunks) + if err != nil { + fut.SetError(err) + return + } + + for i := range o.Outputs { + err := io2.WriteAll(outputWrs[i], outputChunks[i]) + if err != nil { + fut.SetError(err) + return + } + } + } + }() + + exec.PutArrayVars(e, o.Outputs) + err = fut.Wait(ctx) + if err != nil { + for _, wr := range outputWrs { + wr.CloseWithError(err) + } + return err + } + + for _, wr := range outputWrs { + wr.Close() + } + return nil +} + +type LRCConstructAnyType struct { + LRC cdssdk.LRCRedundancy +} + +func (t *LRCConstructAnyType) InitNode(node *dag.Node) {} + +func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) { + var inputIdxs []int + var outputIdxs []int + for _, in := range op.InputStreams { + inputIdxs = append(inputIdxs, ioswitch2.SProps(in).StreamIndex) + } + for _, out := range op.OutputStreams { + outputIdxs = append(outputIdxs, ioswitch2.SProps(out).StreamIndex) + } + + l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups) + if err != nil { + return nil, err + } + coef, err := l.GenerateMatrix(inputIdxs, outputIdxs) + if err != nil { + return nil, err + } + + return &GalMultiply{ + Coef: coef, + Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + ChunkSize: t.LRC.ChunkSize, + }, nil +} + +func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar) { + node.InputStreams = append(node.InputStreams, str) + str.To(node, len(node.InputStreams)-1) +} + +func (t *LRCConstructAnyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { + return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex}) +} + +func (t *LRCConstructAnyType) String(node *dag.Node) string { + return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type LRCConstructGroupType struct { + LRC cdssdk.LRCRedundancy + TargetBlockIndex int +} + +func (t *LRCConstructGroupType) InitNode(node *dag.Node) { + dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ + StreamIndex: t.TargetBlockIndex, + }) + + grpIdx := t.LRC.FindGroup(t.TargetBlockIndex) + dag.NodeDeclareInputStream(node, t.LRC.Groups[grpIdx]) +} + +func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) { + l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups) + if err != nil { + return nil, err + } + coef, err := l.GenerateGroupMatrix(t.TargetBlockIndex) + if err != nil { + return nil, err + } + + return &GalMultiply{ + Coef: coef, + Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + ChunkSize: t.LRC.ChunkSize, + }, nil +} + +func (t *LRCConstructGroupType) String(node *dag.Node) string { + return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node)) +} diff --git a/common/pkgs/ioswitchlrc/ops2/ipfs.go b/common/pkgs/ioswitchlrc/ops2/ipfs.go new file mode 100644 index 0000000..fa02b6d --- /dev/null +++ b/common/pkgs/ioswitchlrc/ops2/ipfs.go @@ -0,0 +1,129 @@ +package ops2 + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/io2" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" +) + +func init() { + exec.UseOp[*IPFSRead]() + exec.UseOp[*IPFSWrite]() +} + +type IPFSRead struct { + Output *exec.StreamVar `json:"output"` + FileHash string `json:"fileHash"` + Option ipfs.ReadOption `json:"option"` +} + +func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error { + logger. + WithField("FileHash", o.FileHash). + Debugf("ipfs read op") + defer logger.Debugf("ipfs read op finished") + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + file, err := ipfsCli.OpenRead(o.FileHash, o.Option) + if err != nil { + return fmt.Errorf("reading ipfs: %w", err) + } + defer file.Close() + + fut := future.NewSetVoid() + o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { + fut.SetVoid() + }) + e.PutVars(o.Output) + + return fut.Wait(ctx) +} + +type IPFSWrite struct { + Input *exec.StreamVar `json:"input"` + FileHash *exec.StringVar `json:"fileHash"` +} + +func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error { + logger. + WithField("Input", o.Input.ID). + WithField("FileHashVar", o.FileHash.ID). + Debugf("ipfs write op") + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + err = e.BindVars(ctx, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + o.FileHash.Value, err = ipfsCli.CreateFile(o.Input.Stream) + if err != nil { + return fmt.Errorf("creating ipfs file: %w", err) + } + + e.PutVars(o.FileHash) + + return nil +} + +type IPFSReadType struct { + FileHash string + Option ipfs.ReadOption +} + +func (t *IPFSReadType) InitNode(node *dag.Node) { + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +} + +func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) { + return &IPFSRead{ + Output: n.OutputStreams[0].Var, + FileHash: t.FileHash, + Option: t.Option, + }, nil +} + +func (t *IPFSReadType) String(node *dag.Node) string { + return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +} + +type IPFSWriteType struct { + FileHashStoreKey string + Range exec.Range +} + +func (t *IPFSWriteType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputValue(node, &ioswitch2.VarProps{}) +} + +func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &IPFSWrite{ + Input: op.InputStreams[0].Var, + FileHash: op.OutputValues[0].Var.(*exec.StringVar), + }, nil +} + +func (t *IPFSWriteType) String(node *dag.Node) string { + return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +} diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go new file mode 100644 index 0000000..c7309b6 --- /dev/null +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -0,0 +1,75 @@ +package ops2 + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" +) + +func formatStreamIO(node *dag.Node) string { + is := "" + for i, in := range node.InputStreams { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputStreams { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("S{%s>%s}", is, os) +} + +func formatValueIO(node *dag.Node) string { + is := "" + for i, in := range node.InputValues { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputValues { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("V{%s>%s}", is, os) +} diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go new file mode 100644 index 0000000..383b3c0 --- /dev/null +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -0,0 +1,95 @@ +package ops2 + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" +) + +func init() { + exec.UseOp[*Range]() +} + +type Range struct { + Input *exec.StreamVar `json:"input"` + Output *exec.StreamVar `json:"output"` + Offset int64 `json:"offset"` + Length *int64 `json:"length"` +} + +func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + buf := make([]byte, 1024*16) + + // 跳过前Offset个字节 + for o.Offset > 0 { + rdCnt := math2.Min(o.Offset, int64(len(buf))) + rd, err := o.Input.Stream.Read(buf[:rdCnt]) + if err == io.EOF { + // 输入流不够长度也不报错,只是产生一个空的流 + break + } + if err != nil { + return err + } + o.Offset -= int64(rd) + } + + fut := future.NewSetVoid() + + if o.Length == nil { + o.Output.Stream = io2.AfterEOF(o.Input.Stream, func(closer io.ReadCloser, err error) { + fut.SetVoid() + }) + + e.PutVars(o.Output) + return fut.Wait(ctx) + } + + o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) { + fut.SetVoid() + }) + + e.PutVars(o.Output) + err = fut.Wait(ctx) + if err != nil { + return err + } + + io2.DropWithBuf(o.Input.Stream, buf) + return nil +} + +type RangeType struct { + Range exec.Range +} + +func (t *RangeType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +} + +func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) { + return &Range{ + Input: n.InputStreams[0].Var, + Output: n.OutputStreams[0].Var, + Offset: t.Range.Offset, + Length: t.Range.Length, + }, nil +} + +func (t *RangeType) String(node *dag.Node) string { + return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +} diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go new file mode 100644 index 0000000..97b4b8c --- /dev/null +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -0,0 +1,297 @@ +package parser + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2" +) + +type GenerateContext struct { + LRC cdssdk.LRCRedundancy + DAG *dag.Graph + Toes []ioswitchlrc.To + StreamRange exec.Range +} + +// 输入一个完整文件,从这个完整文件产生任意文件块(也可再产生完整文件)。 +func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { + if fr.GetDataIndex() != -1 { + return fmt.Errorf("from data is not a complete file") + } + + ctx := GenerateContext{ + LRC: cdssdk.DefaultLRCRedundancy, + DAG: dag.NewGraph(), + Toes: toes, + } + + calcStreamRange(&ctx) + err := buildDAGEncode(&ctx, fr, toes) + if err != nil { + return err + } + + // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。 + for pin(&ctx) { + } + + // 下面这些只需要执行一次,但需要按顺序 + dropUnused(&ctx) + storeIPFSWriteResult(&ctx) + generateClone(&ctx) + generateRange(&ctx) + + return plan.Generate(ctx.DAG, blder) +} + +func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlrc.To) error { + frNode, err := buildFromNode(ctx, fr) + if err != nil { + return fmt.Errorf("building from node: %w", err) + } + + var dataToes []ioswitchlrc.To + var parityToes []ioswitchlrc.To + + // 先创建需要完整文件的To节点,同时统计一下需要哪些文件块 + for _, to := range toes { + if to.GetDataIndex() != -1 { + continue + } + + toNode, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + + idx := to.GetDataIndex() + if idx == -1 { + frNode.OutputStreams[0].To(toNode, 0) + } else if idx < ctx.LRC.K { + dataToes = append(dataToes, to) + } else { + parityToes = append(parityToes, to) + } + } + + if len(dataToes) == 0 && len(parityToes) == 0 { + return nil + } + + // 需要文件块,则生成Split指令 + splitNode := ctx.DAG.NewNode(&ops2.ChunkedSplitType{ + OutputCount: ctx.LRC.K, + ChunkSize: ctx.LRC.ChunkSize, + }, nil) + + for _, to := range dataToes { + toNode, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + + splitNode.OutputStreams[to.GetDataIndex()].To(toNode, 0) + } + + if len(parityToes) == 0 { + return nil + } + + // 需要校验块,则进一步生成Construct指令 + + conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ + LRC: ctx.LRC, + }, nil) + + for _, out := range splitNode.OutputStreams { + conType.AddInput(conNode, out) + } + + for _, to := range parityToes { + toNode, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + + conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0) + } + return nil +} + +// 提供数据块+编码块中的k个块,重建任意块,包括完整文件。 +func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { + ctx := GenerateContext{ + LRC: cdssdk.DefaultLRCRedundancy, + DAG: dag.NewGraph(), + Toes: toes, + } + + calcStreamRange(&ctx) + err := buildDAGReconstructAny(&ctx, frs, toes) + if err != nil { + return err + } + + // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。 + for pin(&ctx) { + } + + // 下面这些只需要执行一次,但需要按顺序 + dropUnused(&ctx) + storeIPFSWriteResult(&ctx) + generateClone(&ctx) + generateRange(&ctx) + + return plan.Generate(ctx.DAG, blder) +} + +func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { + frNodes := make(map[int]*dag.Node) + for _, fr := range frs { + frNode, err := buildFromNode(ctx, fr) + if err != nil { + return fmt.Errorf("building from node: %w", err) + } + + frNodes[fr.GetDataIndex()] = frNode + } + + var completeToes []ioswitchlrc.To + var missedToes []ioswitchlrc.To + + // 先创建需要完整文件的To节点,同时统计一下需要哪些文件块 + for _, to := range toes { + toIdx := to.GetDataIndex() + fr := frNodes[toIdx] + if fr != nil { + node, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + + fr.OutputStreams[0].To(node, 0) + continue + } + + if toIdx == -1 { + completeToes = append(completeToes, to) + } else { + missedToes = append(missedToes, to) + } + } + + if len(completeToes) == 0 && len(missedToes) == 0 { + return nil + } + + // 生成Construct指令来恢复缺少的块 + + conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ + LRC: ctx.LRC, + }, nil) + + for _, fr := range frNodes { + conType.AddInput(conNode, fr.OutputStreams[0]) + } + + for _, to := range missedToes { + toNode, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + + conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0) + } + + if len(completeToes) == 0 { + return nil + } + + // 需要完整文件,则生成Join指令 + + joinNode := ctx.DAG.NewNode(&ops2.ChunkedJoinType{ + InputCount: ctx.LRC.K, + ChunkSize: ctx.LRC.ChunkSize, + }, nil) + + for i := 0; i < ctx.LRC.K; i++ { + n := frNodes[i] + if n == nil { + conType.NewOutput(conNode, i).To(joinNode, i) + } else { + n.OutputStreams[0].To(joinNode, i) + } + } + + for _, to := range completeToes { + toNode, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + + joinNode.OutputStreams[0].To(toNode, 0) + } + + return nil +} + +// 输入同一组的多个块,恢复出剩下缺少的一个块。 +func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { + ctx := GenerateContext{ + LRC: cdssdk.DefaultLRCRedundancy, + DAG: dag.NewGraph(), + Toes: toes, + } + + calcStreamRange(&ctx) + err := buildDAGReconstructGroup(&ctx, frs, toes) + if err != nil { + return err + } + + // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。 + for pin(&ctx) { + } + + // 下面这些只需要执行一次,但需要按顺序 + dropUnused(&ctx) + storeIPFSWriteResult(&ctx) + generateClone(&ctx) + generateRange(&ctx) + + return plan.Generate(ctx.DAG, blder) +} + +func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { + missedGrpIdx := toes[0].GetDataIndex() + + conNode := ctx.DAG.NewNode(&ops2.LRCConstructGroupType{ + LRC: ctx.LRC, + TargetBlockIndex: missedGrpIdx, + }, nil) + + for i, fr := range frs { + frNode, err := buildFromNode(ctx, fr) + if err != nil { + return fmt.Errorf("building from node: %w", err) + } + + frNode.OutputStreams[0].To(conNode, i) + } + + for _, to := range toes { + toNode, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + + conNode.OutputStreams[0].To(toNode, 0) + } + + return nil +} diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go new file mode 100644 index 0000000..ed4b1c9 --- /dev/null +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -0,0 +1,309 @@ +package parser + +import ( + "fmt" + "math" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2" +) + +// 计算输入流的打开范围。会把流的范围按条带大小取整 +func calcStreamRange(ctx *GenerateContext) { + stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K) + + rng := exec.Range{ + Offset: math.MaxInt64, + } + + for _, to := range ctx.Toes { + if to.GetDataIndex() == -1 { + toRng := to.GetRange() + rng.ExtendStart(math2.Floor(toRng.Offset, stripSize)) + if toRng.Length != nil { + rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize)) + } else { + rng.Length = nil + } + + } else { + toRng := to.GetRange() + + blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.LRC.ChunkSize)) + rng.ExtendStart(blkStartIndex * stripSize) + if toRng.Length != nil { + blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.LRC.ChunkSize)) + rng.ExtendEnd(blkEndIndex * stripSize) + } else { + rng.Length = nil + } + } + } + + ctx.StreamRange = rng +} + +func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) { + var repRange exec.Range + var blkRange exec.Range + + repRange.Offset = ctx.StreamRange.Offset + blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize) + if ctx.StreamRange.Length != nil { + repRngLen := *ctx.StreamRange.Length + repRange.Length = &repRngLen + + blkRngLen := *ctx.StreamRange.Length / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize) + blkRange.Length = &blkRngLen + } + + switch f := f.(type) { + case *ioswitchlrc.FromNode: + n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{ + FileHash: f.FileHash, + Option: ipfs.ReadOption{ + Offset: 0, + Length: -1, + }, + }, &ioswitchlrc.NodeProps{ + From: f, + }) + ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + + if f.DataIndex == -1 { + t.Option.Offset = repRange.Offset + if repRange.Length != nil { + t.Option.Length = *repRange.Length + } + } else { + t.Option.Offset = blkRange.Offset + if blkRange.Length != nil { + t.Option.Length = *blkRange.Length + } + } + + if f.Node != nil { + n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node}) + } + + return n, nil + + case *ioswitchlrc.FromDriver: + n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitchlrc.NodeProps{From: f}) + n.Env.ToEnvDriver() + ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + + if f.DataIndex == -1 { + f.Handle.RangeHint.Offset = repRange.Offset + f.Handle.RangeHint.Length = repRange.Length + } else { + f.Handle.RangeHint.Offset = blkRange.Offset + f.Handle.RangeHint.Length = blkRange.Length + } + + return n, nil + + default: + return nil, fmt.Errorf("unsupported from type %T", f) + } +} + +func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) { + switch t := t.(type) { + case *ioswitchlrc.ToNode: + n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{ + FileHashStoreKey: t.FileHashStoreKey, + Range: t.Range, + }, &ioswitchlrc.NodeProps{ + To: t, + }) + + return n, nil + + case *ioswitchlrc.ToDriver: + n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitchlrc.NodeProps{To: t}) + n.Env.ToEnvDriver() + + return n, nil + + default: + return nil, fmt.Errorf("unsupported to type %T", t) + } +} + +// 通过流的输入输出位置来确定指令的执行位置。 +// To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG, +// 所以理论上不会出现有指令的位置始终无法确定的情况。 +func pin(ctx *GenerateContext) bool { + changed := false + ctx.DAG.Walk(func(node *dag.Node) bool { + var toEnv *dag.NodeEnv + for _, out := range node.OutputStreams { + for _, to := range out.Toes { + if to.Node.Env.Type == dag.EnvUnknown { + continue + } + + if toEnv == nil { + toEnv = &to.Node.Env + } else if !toEnv.Equals(to.Node.Env) { + toEnv = nil + break + } + } + } + + if toEnv != nil { + if !node.Env.Equals(*toEnv) { + changed = true + } + + node.Env = *toEnv + return true + } + + // 否则根据输入流的始发地来固定 + var fromEnv *dag.NodeEnv + for _, in := range node.InputStreams { + if in.From.Node.Env.Type == dag.EnvUnknown { + continue + } + + if fromEnv == nil { + fromEnv = &in.From.Node.Env + } else if !fromEnv.Equals(in.From.Node.Env) { + fromEnv = nil + break + } + } + + if fromEnv != nil { + if !node.Env.Equals(*fromEnv) { + changed = true + } + + node.Env = *fromEnv + } + return true + }) + + return changed +} + +// 对于所有未使用的流,增加Drop指令 +func dropUnused(ctx *GenerateContext) { + ctx.DAG.Walk(func(node *dag.Node) bool { + for _, out := range node.OutputStreams { + if len(out.Toes) == 0 { + n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitchlrc.NodeProps{}) + n.Env = node.Env + out.To(n, 0) + } + } + return true + }) +} + +// 为IPFS写入指令存储结果 +func storeIPFSWriteResult(ctx *GenerateContext) { + dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool { + if typ.FileHashStoreKey == "" { + return true + } + + n := ctx.DAG.NewNode(&ops.StoreType{ + StoreKey: typ.FileHashStoreKey, + }, &ioswitchlrc.NodeProps{}) + n.Env.ToEnvDriver() + + node.OutputValues[0].To(n, 0) + return true + }) +} + +// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 +func generateRange(ctx *GenerateContext) { + ctx.DAG.Walk(func(node *dag.Node) bool { + props := ioswitchlrc.NProps(node) + if props.To == nil { + return true + } + + toDataIdx := props.To.GetDataIndex() + toRng := props.To.GetRange() + + if toDataIdx == -1 { + n := ctx.DAG.NewNode(&ops2.RangeType{ + Range: exec.Range{ + Offset: toRng.Offset - ctx.StreamRange.Offset, + Length: toRng.Length, + }, + }, &ioswitchlrc.NodeProps{}) + n.Env = node.InputStreams[0].From.Node.Env + + node.InputStreams[0].To(n, 0) + node.InputStreams[0].NotTo(node) + n.OutputStreams[0].To(node, 0) + + } else { + stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K) + blkStartIdx := ctx.StreamRange.Offset / stripSize + + blkStart := blkStartIdx * int64(ctx.LRC.ChunkSize) + + n := ctx.DAG.NewNode(&ops2.RangeType{ + Range: exec.Range{ + Offset: toRng.Offset - blkStart, + Length: toRng.Length, + }, + }, &ioswitchlrc.NodeProps{}) + n.Env = node.InputStreams[0].From.Node.Env + + node.InputStreams[0].To(n, 0) + node.InputStreams[0].NotTo(node) + n.OutputStreams[0].To(node, 0) + } + + return true + }) +} + +// 生成Clone指令 +func generateClone(ctx *GenerateContext) { + ctx.DAG.Walk(func(node *dag.Node) bool { + for _, out := range node.OutputStreams { + if len(out.Toes) <= 1 { + continue + } + + n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitchlrc.NodeProps{}) + n.Env = node.Env + for _, to := range out.Toes { + t.NewOutput(node).To(to.Node, to.SlotIndex) + } + out.Toes = nil + out.To(n, 0) + } + + for _, out := range node.OutputValues { + if len(out.Toes) <= 1 { + continue + } + + n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitchlrc.NodeProps{}) + n.Env = node.Env + for _, to := range out.Toes { + t.NewOutput(node).To(to.Node, to.SlotIndex) + } + out.Toes = nil + out.To(n, 0) + } + + return true + }) +} diff --git a/common/pkgs/ioswitchlrc/utils.go b/common/pkgs/ioswitchlrc/utils.go new file mode 100644 index 0000000..aa162ee --- /dev/null +++ b/common/pkgs/ioswitchlrc/utils.go @@ -0,0 +1,17 @@ +package ioswitchlrc + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" +) + +func NProps(n *dag.Node) *NodeProps { + return dag.NProps[*NodeProps](n) +} + +func SProps(str *dag.StreamVar) *VarProps { + return dag.SProps[*VarProps](str) +} + +func VProps(v *dag.ValueVar) *VarProps { + return dag.VProps[*VarProps](v) +}