Browse Source

用新模式重写剩余的读写操作

gitlink
Sydonian 1 year ago
parent
commit
e8348517c7
11 changed files with 462 additions and 273 deletions
  1. +12
    -5
      common/pkgs/cmd/upload_objects.go
  2. +0
    -148
      common/pkgs/downloader/io.go
  3. +33
    -35
      common/pkgs/downloader/iterator.go
  4. +33
    -41
      common/pkgs/downloader/strip_iterator.go
  5. +69
    -0
      common/pkgs/ioswitch/ops/range.go
  6. +10
    -1
      common/pkgs/ioswitch/plans/executor.go
  7. +110
    -12
      common/pkgs/ioswitch/plans/fromto.go
  8. +16
    -0
      common/pkgs/ioswitch/plans/ops.go
  9. +135
    -10
      common/pkgs/ioswitch/plans/parser.go
  10. +33
    -16
      scanner/internal/event/check_package_redundancy.go
  11. +11
    -5
      scanner/internal/event/clean_pinned.go

+ 12
- 5
common/pkgs/cmd/upload_objects.go View File

@@ -238,12 +238,19 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) {
}

func uploadToNode(file io.Reader, node cdssdk.Node) (string, error) {
plan := plans.NewPlanBuilder()
str, v := plan.AtExecutor().WillWrite()
v.To(node).IPFSWrite().ToExecutor().Store("fileHash")
ft := plans.NewFromTo()
fromExec, hd := plans.NewFromExecutor(-1)
ft.AddFrom(fromExec).AddTo(plans.NewToNode(node, -1, "fileHash"))

exec := plan.Execute()
exec.BeginWrite(io.NopCloser(file), str)
parser := plans.NewParser(cdssdk.DefaultECRedundancy)
plans := plans.NewPlanBuilder()
err := parser.Parse(ft, plans)
if err != nil {
return "", fmt.Errorf("parsing plan: %w", err)
}

exec := plans.Execute()
exec.BeginWrite(io.NopCloser(file), hd)
ret, err := exec.Wait(context.TODO())
if err != nil {
return "", err


+ 0
- 148
common/pkgs/downloader/io.go View File

@@ -1,148 +0,0 @@
package downloader

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
)

type IPFSReader struct {
node cdssdk.Node
fileHash string
stream io.ReadCloser
offset int64
}

func NewIPFSReader(node cdssdk.Node, fileHash string) *IPFSReader {
return &IPFSReader{
node: node,
fileHash: fileHash,
}
}

func NewIPFSReaderWithRange(node cdssdk.Node, fileHash string, rng ipfs.ReadOption) io.ReadCloser {
str := &IPFSReader{
node: node,
fileHash: fileHash,
}
str.Seek(rng.Offset, io.SeekStart)
if rng.Length > 0 {
return io2.Length(str, rng.Length)
}

return str
}

func (r *IPFSReader) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekEnd {
return 0, fmt.Errorf("seek end not supported")
}

if whence == io.SeekCurrent {
return 0, fmt.Errorf("seek current not supported")
}

if r.stream == nil {
r.offset = offset
return r.offset, nil
}

// 如果文件流已经打开,而seek的位置和当前位置不同,则需要重新打开文件流
if offset != r.offset {
var err error
r.stream.Close()
r.offset = offset
r.stream, err = r.openStream()
if err != nil {
return 0, fmt.Errorf("reopen stream: %w", err)
}
}

return r.offset, nil
}

func (r *IPFSReader) Read(buf []byte) (int, error) {
if r.stream == nil {
var err error
r.stream, err = r.openStream()
if err != nil {
return 0, err
}
}

n, err := r.stream.Read(buf)
r.offset += int64(n)
return n, err
}

func (r *IPFSReader) Close() error {
if r.stream != nil {
return r.stream.Close()
}

return nil
}

func (r *IPFSReader) openStream() (io.ReadCloser, error) {
if stgglb.IPFSPool != nil {
logger.Debug("try to use local IPFS to download file")

reader, err := r.fromLocalIPFS()
if err == nil {
return reader, nil
}

logger.Warnf("download from local IPFS failed, so try to download from node %v, err: %s", r.node.Name, err.Error())
}

return r.fromNode()
}

func (r *IPFSReader) fromNode() (io.ReadCloser, error) {
planBld := plans.NewPlanBuilder()
toExe, toStr := plans.NewToExecutor(-1)
ft := plans.FromTo{
Froms: []plans.From{
plans.NewFromIPFS(r.node, r.fileHash, -1),
},
Tos: []plans.To{
toExe,
},
}
par := plans.DefaultParser{}
par.Parse(ft, planBld)

exec := planBld.Execute()
go func() {
exec.Wait(context.Background())
}()

return exec.BeginRead(toStr)
}

func (r *IPFSReader) fromLocalIPFS() (io.ReadCloser, error) {
ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err)
}

reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{
Offset: r.offset,
Length: -1,
})
if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
}

reader = io2.AfterReadClosed(reader, func(io.ReadCloser) {
ipfsCli.Close()
})
return reader, nil
}

+ 33
- 35
common/pkgs/downloader/iterator.go View File

@@ -11,7 +11,6 @@ import (
"github.com/samber/lo"

"gitlink.org.cn/cloudream/common/pkgs/bitmap"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

@@ -172,45 +171,20 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2
return nil, err
}

var strHandle *plans.ExecutorReadStream
ft := plans.FromTo{
Object: *obj.Detail,
}

bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
if bsc < osc {
logger.Debugf("downloading object from node %v(%v)", blocks[0].Node.Name, blocks[0].Node.NodeID)
return iter.downloadFromNode(&blocks[0].Node, obj)
}

toExec, handle := plans.NewToExecutor(-1)
ft.AddFrom(plans.NewFromNode(&blocks[0].Node, -1)).AddTo(toExec)
strHandle = handle

// TODO2 处理Offset和Length
} else if osc == math.MaxFloat64 {
if osc == math.MaxFloat64 {
// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
return nil, fmt.Errorf("no node has this object")
} else {
logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID)

toExec, handle := plans.NewToExecutor(-1)
ft.AddFrom(plans.NewFromNode(node, -1)).AddTo(toExec)
strHandle = handle
// TODO2 处理Offset和Length
}

parser := plans.DefaultParser{
EC: cdssdk.DefaultECRedundancy,
}
plans := plans.NewPlanBuilder()
if err := parser.Parse(ft, plans); err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

exec := plans.Execute()
go exec.Wait(context.TODO())

return exec.BeginRead(strHandle)
logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID)
return iter.downloadFromNode(node, obj)
}

func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
@@ -280,10 +254,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
}

logger.Debugf("downloading ec object from node %v(%v)", node.Name, node.NodeID)
return NewIPFSReaderWithRange(*node, req.Detail.Object.FileHash, ipfs.ReadOption{
Offset: req.Raw.Offset,
Length: req.Raw.Length,
}), nil
return iter.downloadFromNode(node, req)
}

func (iter *DownloadObjectIterator) sortDownloadNodes(req downloadReqeust2) ([]*DownloadNodeInfo, error) {
@@ -389,3 +360,30 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 {

return consts.NodeDistanceOther
}

func (iter *DownloadObjectIterator) downloadFromNode(node *cdssdk.Node, req downloadReqeust2) (io.ReadCloser, error) {
var strHandle *plans.ExecutorReadStream
ft := plans.NewFromTo()

toExec, handle := plans.NewToExecutor(-1)
toExec.Range = plans.Range{
Offset: req.Raw.Offset,
}
if req.Raw.Length != -1 {
len := req.Raw.Length
toExec.Range.Length = &len
}
ft.AddFrom(plans.NewFromNode(req.Detail.Object.FileHash, node, -1)).AddTo(toExec)
strHandle = handle

parser := plans.NewParser(cdssdk.DefaultECRedundancy)
plans := plans.NewPlanBuilder()
if err := parser.Parse(ft, plans); err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

exec := plans.Execute()
go exec.Wait(context.TODO())

return exec.BeginRead(strHandle)
}

+ 33
- 41
common/pkgs/downloader/strip_iterator.go View File

@@ -1,15 +1,15 @@
package downloader

import (
"context"
"io"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sync2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
)

type downloadBlock struct {
@@ -108,15 +108,33 @@ func (s *StripIterator) Close() {
}

func (s *StripIterator) downloading() {
rs, err := ec.NewRs(s.red.K, s.red.N)
ft := plans.NewFromTo()
for _, b := range s.blocks {
ft.AddFrom(plans.NewFromNode(s.object.FileHash, &b.Node, b.Block.Index))
}

toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{
Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K),
})
ft.AddTo(toExec)

parser := plans.NewParser(*s.red)
plans := plans.NewPlanBuilder()
err := parser.Parse(ft, plans)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}
exec := plans.Execute()

var blockStrs []*IPFSReader
for _, b := range s.blocks {
blockStrs = append(blockStrs, NewIPFSReader(b.Node, b.Block.FileHash))
ctx, cancel := context.WithCancel(context.Background())
go exec.Wait(ctx)
defer cancel()

str, err := exec.BeginRead(hd)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}

curStripIndex := s.curStripIndex
@@ -148,40 +166,18 @@ loop:
}
}

for _, str := range blockStrs {
_, err := str.Seek(curStripIndex*int64(s.red.ChunkSize), io.SeekStart)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
break loop
}
}

dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
blockArrs := make([][]byte, s.red.N)
for i := 0; i < s.red.K; i++ {
// 放入的slice长度为0,但容量为ChunkSize,EC库发现长度为0的块后才会认为是待恢复块
blockArrs[i] = dataBuf[i*s.red.ChunkSize : i*s.red.ChunkSize]
}
for _, b := range s.blocks {
// 用于恢复的块则要将其长度变回ChunkSize,用于后续读取块数据
if b.Block.Index < s.red.K {
// 此处扩容不会导致slice指向一个新内存
blockArrs[b.Block.Index] = blockArrs[b.Block.Index][0:s.red.ChunkSize]
} else {
blockArrs[b.Block.Index] = make([]byte, s.red.ChunkSize)
}
}

err := sync2.ParallelDo(s.blocks, func(b downloadBlock, idx int) error {
_, err := io.ReadFull(blockStrs[idx], blockArrs[b.Block.Index])
return err
})
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
n, err := io.ReadFull(str, dataBuf)
if err == io.ErrUnexpectedEOF {
s.cache.Add(stripKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: s.object.FileHash,
})

s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos})
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break loop
}

err = rs.ReconstructData(blockArrs)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
break loop
@@ -199,10 +195,6 @@ loop:
curStripIndex++
}

for _, str := range blockStrs {
str.Close()
}

close(s.dataChan)
}



+ 69
- 0
common/pkgs/ioswitch/ops/range.go View File

@@ -0,0 +1,69 @@
package ops

import (
"context"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type Range struct {
Input *ioswitch.StreamVar `json:"input"`
Output *ioswitch.StreamVar `json:"output"`
Offset int64 `json:"offset"`
Length *int64 `json:"length"`
}

func (o *Range) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

buf := make([]byte, 1024*16)

// 跳过前Offset个字节
for o.Offset > 0 {
rdCnt := math2.Min(o.Offset, int64(len(buf)))
rd, err := o.Input.Stream.Read(buf[:rdCnt])
if err == io.EOF {
// 输入流不够长度也不报错,只是产生一个空的流
break
}
if err != nil {
return err
}
o.Offset -= int64(rd)
}

fut := future.NewSetVoid()

if o.Length == nil {
o.Output.Stream = io2.AfterEOF(o.Input.Stream, func(closer io.ReadCloser, err error) {
fut.SetVoid()
})

sw.PutVars(o.Output)
return fut.Wait(ctx)
}

o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) {
fut.SetVoid()
})

sw.PutVars(o.Output)
err = fut.Wait(ctx)
if err != nil {
return err
}

return io2.DropWithBuf(o.Input.Stream, buf)
}

func init() {
OpUnion.AddT((*Range)(nil))
}

+ 10
- 1
common/pkgs/ioswitch/plans/executor.go View File

@@ -7,6 +7,7 @@ import (
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)
@@ -20,7 +21,14 @@ type Executor struct {
executorSw *ioswitch.Switch
}

// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。
func (e *Executor) BeginWrite(str io.ReadCloser, handle *ExecutorWriteStream) {
handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)
e.executorSw.PutVars(handle.Var)
}

// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理
func (e *Executor) BeginWriteRanged(str io.ReadCloser, handle *ExecutorWriteStream) {
handle.Var.Stream = str
e.executorSw.PutVars(handle.Var)
}
@@ -99,7 +107,8 @@ func (e *Executor) stopWith(err error) {
}

type ExecutorWriteStream struct {
Var *ioswitch.StreamVar
Var *ioswitch.StreamVar
RangeHint *Range
}

type ExecutorReadStream struct {


+ 110
- 12
common/pkgs/ioswitch/plans/fromto.go View File

@@ -2,13 +2,16 @@ package plans

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/common/utils/math2"
)

type FromTo struct {
Object stgmod.ObjectDetail
Froms []From
Tos []To
Froms []From
Toes []To
}

func NewFromTo() FromTo {
return FromTo{}
}

func (ft *FromTo) AddFrom(from From) *FromTo {
@@ -17,7 +20,7 @@ func (ft *FromTo) AddFrom(from From) *FromTo {
}

func (ft *FromTo) AddTo(to To) *FromTo {
ft.Tos = append(ft.Tos, to)
ft.Toes = append(ft.Toes, to)
return ft
}

@@ -28,13 +31,78 @@ type From interface {
}

type To interface {
// To所需要的文件流的范围。具体含义与DataIndex有关系:
// 如果DataIndex == -1,则表示在整个文件的范围。
// 如果DataIndex >= 0,则表示在文件的某个分片的范围。
GetRange() Range
GetDataIndex() int
}

type Range struct {
Offset int64
Length int64
Length *int64
}

func (r *Range) Extend(other Range) {
newOffset := math2.Min(r.Offset, other.Offset)

if r.Length == nil {
r.Offset = newOffset
return
}

if other.Length == nil {
r.Offset = newOffset
r.Length = nil
return
}

otherEnd := other.Offset + *other.Length
rEnd := r.Offset + *r.Length

newEnd := math2.Max(otherEnd, rEnd)
r.Offset = newOffset
*r.Length = newEnd - newOffset
}

func (r *Range) ExtendStart(start int64) {
r.Offset = math2.Min(r.Offset, start)
}

func (r *Range) ExtendEnd(end int64) {
if r.Length == nil {
return
}

rEnd := r.Offset + *r.Length
newLen := math2.Max(end, rEnd) - r.Offset
r.Length = &newLen
}

func (r *Range) Fix(maxLength int64) {
if r.Length != nil {
return
}

len := maxLength - r.Offset
r.Length = &len
}

func (r *Range) ToStartEnd(maxLen int64) (start int64, end int64) {
if r.Length == nil {
return r.Offset, maxLen
}

end = r.Offset + *r.Length
return r.Offset, end
}

func (r *Range) ClampLength(maxLen int64) {
if r.Length == nil {
return
}

*r.Length = math2.Min(*r.Length, maxLen-r.Offset)
}

type FromExecutor struct {
@@ -42,6 +110,16 @@ type FromExecutor struct {
DataIndex int
}

func NewFromExecutor(dataIndex int) (*FromExecutor, *ExecutorWriteStream) {
handle := &ExecutorWriteStream{
RangeHint: &Range{},
}
return &FromExecutor{
Handle: handle,
DataIndex: dataIndex,
}, handle
}

func (f *FromExecutor) GetDataIndex() int {
return f.DataIndex
}
@@ -58,12 +136,14 @@ func (f *FromExecutor) BuildNode(ft *FromTo) Node {
}

type FromNode struct {
FileHash string
Node *cdssdk.Node
DataIndex int
}

func NewFromNode(node *cdssdk.Node, dataIndex int) *FromNode {
func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode {
return &FromNode{
FileHash: fileHash,
Node: node,
DataIndex: dataIndex,
}
@@ -87,6 +167,15 @@ func NewToExecutor(dataIndex int) (*ToExecutor, *ExecutorReadStream) {
}, &str
}

func NewToExecutorWithRange(dataIndex int, rng Range) (*ToExecutor, *ExecutorReadStream) {
str := ExecutorReadStream{}
return &ToExecutor{
Handle: &str,
DataIndex: dataIndex,
Range: rng,
}, &str
}

func (t *ToExecutor) GetDataIndex() int {
return t.DataIndex
}
@@ -95,26 +184,35 @@ func (t *ToExecutor) GetRange() Range {
return t.Range
}

type ToAgent struct {
type ToNode struct {
Node cdssdk.Node
DataIndex int
Range Range
FileHashStoreKey string
}

func NewToAgent(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToAgent {
return &ToAgent{
func NewToNode(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToNode {
return &ToNode{
Node: node,
DataIndex: dataIndex,
FileHashStoreKey: fileHashStoreKey,
}
}

func NewToNodeWithRange(node cdssdk.Node, dataIndex int, fileHashStoreKey string, rng Range) *ToNode {
return &ToNode{
Node: node,
DataIndex: dataIndex,
FileHashStoreKey: fileHashStoreKey,
Range: rng,
}
}

func (t *ToAgent) GetDataIndex() int {
func (t *ToNode) GetDataIndex() int {
return t.DataIndex
}

func (t *ToAgent) GetRange() Range {
func (t *ToNode) GetRange() Range {
return t.Range
}



+ 16
- 0
common/pkgs/ioswitch/plans/ops.go View File

@@ -143,6 +143,7 @@ func (t *IPFSReadType) GenerateOp(node *Node, blder *PlanBuilder) error {

type IPFSWriteType struct {
FileHashStoreKey string
Range Range
}

func (t *IPFSWriteType) GenerateOp(op *Node, blder *PlanBuilder) error {
@@ -274,6 +275,7 @@ func (t *FromExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error {

type ToExecutorOp struct {
Handle *ExecutorReadStream
Range Range
}

func (t *ToExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error {
@@ -351,6 +353,20 @@ func (t *GetVarOp) GenerateOp(op *Node, blder *PlanBuilder) error {
return nil
}

type RangeType struct {
Range Range
}

func (t *RangeType) GenerateOp(op *Node, blder *PlanBuilder) error {
addOpByEnv(&ops.Range{
Input: op.InputStreams[0].Var,
Output: op.OutputStreams[0].Var,
Offset: t.Range.Offset,
Length: t.Range.Length,
}, op.Env, blder)
return nil
}

func addOpByEnv(op ioswitch.Op, env OpEnv, blder *PlanBuilder) {
switch env := env.(type) {
case *AgentEnv:


+ 135
- 10
common/pkgs/ioswitch/plans/parser.go View File

@@ -2,9 +2,12 @@ package plans

import (
"fmt"
"math"

"gitlink.org.cn/cloudream/common/pkgs/ipfs"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
)

type FromToParser interface {
@@ -15,9 +18,19 @@ type DefaultParser struct {
EC cdssdk.ECRedundancy
}

func NewParser(ec cdssdk.ECRedundancy) *DefaultParser {
return &DefaultParser{
EC: ec,
}
}

type ParseContext struct {
Ft FromTo
Nodes []*Node
Ft FromTo
Nodes []*Node
ToNodes []*Node
// 为了产生所有To所需的数据范围,而需要From打开的范围。
// 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。
StreamRange Range
}

func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error {
@@ -25,6 +38,10 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error {

// 分成两个阶段:
// 1. 基于From和To生成更多指令,初步匹配to的需求

// 计算一下打开流的范围
p.calcStreamRange(&ctx)

err := p.extend(&ctx, ft, blder)
if err != nil {
return err
@@ -80,6 +97,7 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error {
p.dropUnused(&ctx)
p.storeIPFSWriteResult(&ctx)
p.generateClone(&ctx)
p.generateRange(&ctx)
p.generateSend(&ctx)

return p.buildPlan(&ctx, blder)
@@ -96,9 +114,44 @@ func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *Stre
return nil
}

// 计算输入流的打开范围。会把流的范围按条带大小取整
func (p *DefaultParser) calcStreamRange(ctx *ParseContext) {
stripSize := int64(p.EC.ChunkSize * p.EC.K)

rng := Range{
Offset: math.MaxInt64,
}

for _, to := range ctx.Ft.Toes {
if to.GetDataIndex() == -1 {
toRng := to.GetRange()
rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
if toRng.Length != nil {
rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize))
} else {
rng.Length = nil
}

} else {
toRng := to.GetRange()

blkStartIndex := math2.FloorDiv(toRng.Offset, int64(p.EC.ChunkSize))
rng.ExtendStart(blkStartIndex * stripSize)
if toRng.Length != nil {
blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(p.EC.ChunkSize))
rng.ExtendEnd(blkEndIndex * stripSize)
} else {
rng.Length = nil
}
}
}

ctx.StreamRange = rng
}

func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) error {
for _, f := range ft.Froms {
n, err := p.buildFromNode(&ft, f)
n, err := p.buildFromNode(ctx, &ft, f)
if err != nil {
return err
}
@@ -157,13 +210,14 @@ loop:
}

// 为每一个To找到一个输入流
for _, t := range ft.Tos {
for _, t := range ft.Toes {
n, err := p.buildToNode(&ft, t)
if err != nil {
return err
}

ctx.Nodes = append(ctx.Nodes, n)
ctx.ToNodes = append(ctx.ToNodes, n)

str := p.findOutputStream(ctx, t.GetDataIndex())
if str == nil {
@@ -176,12 +230,43 @@ loop:
return nil
}

func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) {
func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*Node, error) {
var repRange Range
var blkRange Range

repRange.Offset = ctx.StreamRange.Offset
blkRange.Offset = ctx.StreamRange.Offset / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize)
if ctx.StreamRange.Length != nil {
repRngLen := *ctx.StreamRange.Length
repRange.Length = &repRngLen

blkRngLen := *ctx.StreamRange.Length / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize)
blkRange.Length = &blkRngLen
}

switch f := f.(type) {
case *FromNode:
ty := &IPFSReadType{
FileHash: f.FileHash,
Option: ipfs.ReadOption{
Offset: 0,
Length: -1,
},
}
if f.DataIndex == -1 {
ty.Option.Offset = repRange.Offset
if repRange.Length != nil {
ty.Option.Length = *repRange.Length
}
} else {
ty.Option.Offset = blkRange.Offset
if blkRange.Length != nil {
ty.Option.Length = *blkRange.Length
}
}

n := &Node{
// TODO2 需要FromTo的Range来设置Option
Type: &IPFSReadType{FileHash: ft.Object.Object.FileHash},
Type: ty,
}
n.NewOutput(f.DataIndex)

@@ -197,6 +282,15 @@ func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) {
Type: &FromExecutorOp{Handle: f.Handle},
}
n.NewOutput(f.DataIndex)

if f.DataIndex == -1 {
f.Handle.RangeHint.Offset = repRange.Offset
f.Handle.RangeHint.Length = repRange.Length
} else {
f.Handle.RangeHint.Offset = blkRange.Offset
f.Handle.RangeHint.Length = blkRange.Length
}

return n, nil

default:
@@ -206,16 +300,16 @@ func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) {

func (p *DefaultParser) buildToNode(ft *FromTo, t To) (*Node, error) {
switch t := t.(type) {
case *ToAgent:
case *ToNode:
return &Node{
Env: &AgentEnv{t.Node},
Type: &IPFSWriteType{FileHashStoreKey: t.FileHashStoreKey},
Type: &IPFSWriteType{FileHashStoreKey: t.FileHashStoreKey, Range: t.Range},
}, nil

case *ToExecutor:
return &Node{
Env: &ExecutorEnv{},
Type: &ToExecutorOp{Handle: t.Handle},
Type: &ToExecutorOp{Handle: t.Handle, Range: t.Range},
}, nil

default:
@@ -636,6 +730,37 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
}
}

// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
func (p *DefaultParser) generateRange(ctx *ParseContext) {
for i, to := range ctx.ToNodes {
toDataIdx := ctx.Ft.Toes[i].GetDataIndex()
toRng := ctx.Ft.Toes[i].GetRange()

if toDataIdx == -1 {
rngType := &RangeType{Range: Range{Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length}}
rngNode := &Node{
Env: to.InputStreams[0].From.Env,
Type: rngType,
}

to.ReplaceInput(to.InputStreams[0], rngNode.NewOutput(toDataIdx))
} else {
stripSize := int64(p.EC.ChunkSize * p.EC.K)
blkStartIdx := ctx.StreamRange.Offset / stripSize

blkStart := blkStartIdx * int64(p.EC.ChunkSize)

rngType := &RangeType{Range: Range{Offset: toRng.Offset - blkStart, Length: toRng.Length}}
rngNode := &Node{
Env: to.InputStreams[0].From.Env,
Type: rngType,
}

to.ReplaceInput(to.InputStreams[0], rngNode.NewOutput(toDataIdx))
}
}
}

// 生成Clone指令
func (p *DefaultParser) generateClone(ctx *ParseContext) {
for _, op := range ctx.Nodes {


+ 33
- 16
scanner/internal/event/check_package_redundancy.go View File

@@ -413,14 +413,19 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E
return nil, fmt.Errorf("requesting to get nodes: %w", err)
}

planBlder := plans.NewPlanBuilder()
inputStrs := planBlder.AtAgent(getNodes.Nodes[0]).IPFSRead(obj.Object.FileHash).ChunkedSplit(red.ChunkSize, red.K, true)
outputStrs := planBlder.AtAgent(getNodes.Nodes[0]).ECReconstructAny(*red, lo.Range(red.K), lo.Range(red.N), inputStrs)
ft := plans.NewFromTo()
ft.AddFrom(plans.NewFromNode(obj.Object.FileHash, &getNodes.Nodes[0], -1))
for i := 0; i < red.N; i++ {
outputStrs[i].To(uploadNodes[i].Node).IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d", i))
ft.AddTo(plans.NewToNode(uploadNodes[i].Node, i, fmt.Sprintf("%d", i)))
}
parser := plans.NewParser(*red)
plans := plans.NewPlanBuilder()
err = parser.Parse(ft, plans)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

ioRet, err := planBlder.Execute().Wait(context.TODO())
ioRet, err := plans.Execute().Wait(context.TODO())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}
@@ -510,17 +515,25 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk
uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID })

// 每个被选节点都在自己节点上重建原始数据
parser := plans.NewParser(*srcRed)
planBlder := plans.NewPlanBuilder()
for i := range uploadNodes {
tarNode := planBlder.AtAgent(uploadNodes[i].Node)
ft := plans.NewFromTo()

var inputs []*plans.AgentStreamVar
for _, block := range chosenBlocks {
inputs = append(inputs, tarNode.IPFSRead(block.FileHash))
ft.AddFrom(plans.NewFromNode(block.FileHash, &uploadNodes[i].Node, block.Index))
}

outputs := tarNode.ECReconstruct(*srcRed, chosenBlockIndexes, inputs)
tarNode.ChunkedJoin(srcRed.ChunkSize, outputs).Length(obj.Object.Size).IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d", i))
len := obj.Object.Size
ft.AddTo(plans.NewToNodeWithRange(uploadNodes[i].Node, -1, fmt.Sprintf("%d", i), plans.Range{
Offset: 0,
Length: &len,
}))

err := parser.Parse(ft, planBlder)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}
}

ioRet, err := planBlder.Execute().Wait(context.TODO())
@@ -555,11 +568,9 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.
grpBlocks := obj.GroupBlocks()

var chosenBlocks []stgmod.GrouppedObjectBlock
var chosenBlockIndexes []int
for _, block := range grpBlocks {
if len(block.NodeIDs) > 0 {
chosenBlocks = append(chosenBlocks, block)
chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
}

if len(chosenBlocks) == srcRed.K {
@@ -572,6 +583,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.
}

// 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
parser := plans.NewParser(*srcRed)
planBlder := plans.NewPlanBuilder()

var newBlocks []stgmod.ObjectBlock
@@ -595,15 +607,20 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.
shouldUpdateBlocks = true

// 否则就要重建出这个节点需要的块
tarNode := planBlder.AtAgent(node.Node)

var inputs []*plans.AgentStreamVar
ft := plans.NewFromTo()
for _, block := range chosenBlocks {
inputs = append(inputs, tarNode.IPFSRead(block.FileHash))
ft.AddFrom(plans.NewFromNode(block.FileHash, &node.Node, block.Index))
}

// 输出只需要自己要保存的那一块
tarNode.ECReconstructAny(*srcRed, chosenBlockIndexes, []int{i}, inputs)[0].IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d", i))
ft.AddTo(plans.NewToNode(node.Node, i, fmt.Sprintf("%d", i)))

err := parser.Parse(ft, planBlder)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

newBlocks = append(newBlocks, newBlock)
}



+ 11
- 5
scanner/internal/event/clean_pinned.go View File

@@ -781,14 +781,20 @@ func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssd
}

ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy)
parser := plans.NewParser(*ecRed)

for id, idxs := range reconstrct {
agt := planBld.AtAgent(*allNodeInfos[id])
ft := plans.NewFromTo()
ft.AddFrom(plans.NewFromNode(obj.Object.FileHash, allNodeInfos[id], -1))

strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true)
ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs)
for i, s := range ss {
s.IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d.%d", obj.Object.ObjectID, (*idxs)[i]))
for _, i := range *idxs {
ft.AddTo(plans.NewToNode(*allNodeInfos[id], i, fmt.Sprintf("%d.%d", obj.Object.ObjectID, i)))
}

err := parser.Parse(ft, planBld)
if err != nil {
// TODO 错误处理
continue
}

planningNodeIDs[id] = true


Loading…
Cancel
Save