Browse Source

Merge pull request '增加LRC冗余方式' (#31) from feature_gxh into master

gitlink
baohan 1 year ago
parent
commit
e6c125b076
27 changed files with 2598 additions and 89 deletions
  1. +155
    -26
      client/internal/cmdline/test.go
  2. +2
    -1
      common/models/models.go
  3. +12
    -0
      common/pkgs/downloader/iterator.go
  4. +87
    -0
      common/pkgs/downloader/lrc.go
  5. +191
    -0
      common/pkgs/downloader/lrc_strip_iterator.go
  6. +36
    -0
      common/pkgs/ec/lrc/lrc.go
  7. +6
    -0
      common/pkgs/ioswitch2/agent_worker.go
  8. +23
    -1
      common/pkgs/ioswitch2/ops2/chunked.go
  9. +21
    -12
      common/pkgs/ioswitch2/ops2/clone.go
  10. +19
    -14
      common/pkgs/ioswitch2/ops2/ec.go
  11. +8
    -0
      common/pkgs/ioswitch2/ops2/file.go
  12. +9
    -1
      common/pkgs/ioswitch2/ops2/ipfs.go
  13. +4
    -0
      common/pkgs/ioswitch2/ops2/range.go
  14. +22
    -12
      common/pkgs/ioswitch2/parser/parser.go
  15. +67
    -0
      common/pkgs/ioswitchlrc/agent_worker.go
  16. +134
    -0
      common/pkgs/ioswitchlrc/fromto.go
  17. +23
    -0
      common/pkgs/ioswitchlrc/ioswitch.go
  18. +157
    -0
      common/pkgs/ioswitchlrc/ops2/chunked.go
  19. +121
    -0
      common/pkgs/ioswitchlrc/ops2/clone.go
  20. +201
    -0
      common/pkgs/ioswitchlrc/ops2/ec.go
  21. +137
    -0
      common/pkgs/ioswitchlrc/ops2/ipfs.go
  22. +75
    -0
      common/pkgs/ioswitchlrc/ops2/ops.go
  23. +99
    -0
      common/pkgs/ioswitchlrc/ops2/range.go
  24. +301
    -0
      common/pkgs/ioswitchlrc/parser/generator.go
  25. +320
    -0
      common/pkgs/ioswitchlrc/parser/passes.go
  26. +17
    -0
      common/pkgs/ioswitchlrc/utils.go
  27. +351
    -22
      scanner/internal/event/check_package_redundancy.go

+ 155
- 26
client/internal/cmdline/test.go View File

@@ -10,14 +10,15 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func init() {
cmd := &cobra.Command{
rootCmd.AddCommand(&cobra.Command{
Use: "test2",
Short: "test2",
// Args: cobra.ExactArgs(1),
@@ -36,22 +37,22 @@ func init() {

ft := ioswitch2.NewFromTo()

// ft.AddFrom(plans.NewFromNode("Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", &nodes.Nodes[0], -1))
// ft.AddTo(plans.NewToNode(nodes.Nodes[1], -1, "asd"))
// len := int64(3)
// toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{Offset: 5, Length: &len})
// ft.AddTo(toExec)
// ft.AddTo(plans.NewToNode(nodes.Nodes[1], 0, "0"))
// ft.AddTo(plans.NewToNode(nodes.Nodes[1], 1, "1"))
// ft.AddTo(plans.NewToNode(nodes.Nodes[1], 2, "2"))

ft.AddFrom(ioswitch2.NewFromNode("QmS2s8GRYHEurXL7V1zUtKvf2H1BGcQc5NN1T1hiSnWvbd", &nodes.Nodes[0], 1))
ft.AddFrom(ioswitch2.NewFromNode("QmUgUEUMzdnjPNx6xu9PDGXpSyXTk8wzPWvyYZ9zasE1WW", &nodes.Nodes[1], 2))
ft.AddFrom(ioswitch2.NewFromNode("Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", &nodes.Nodes[0], -1))
ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], -1, "asd"))
le := int64(3)
toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{Offset: 5, Length: &le})
ft.AddTo(toExec)
ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], 0, "0"))
ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], 1, "1"))
ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], 2, "2"))

// ft.AddFrom(ioswitch2.NewFromNode("QmS2s8GRYHEurXL7V1zUtKvf2H1BGcQc5NN1T1hiSnWvbd", &nodes.Nodes[0], 1))
// ft.AddFrom(ioswitch2.NewFromNode("QmUgUEUMzdnjPNx6xu9PDGXpSyXTk8wzPWvyYZ9zasE1WW", &nodes.Nodes[1], 2))
// le := int64(5)
// toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{Offset: 3, Length: &le})
// toExec, hd := plans.NewToExecutorWithRange(1, plans.Range{Offset: 0, Length: nil})
// toExec2, hd2 := plans.NewToExecutorWithRange(2, plans.Range{Offset: 0, Length: nil})
ft.AddTo(toExec)
// ft.AddTo(toExec)
// ft.AddTo(toExec2)

// fromExec, hd := plans.NewFromExecutor(-1)
@@ -99,23 +100,151 @@ func init() {

fut.Wait(context.TODO())
},
}
})

// rootCmd.AddCommand(&cobra.Command{
// Use: "test",
// Short: "test",
// // Args: cobra.ExactArgs(1),
// Run: func(cmd *cobra.Command, args []string) {
// cmdCtx := GetCmdCtx(cmd)
// file, _ := cmdCtx.Cmdline.Svc.ObjectSvc().Download(1, downloader.DownloadReqeust{
// ObjectID: 27379,
// Length: -1,
// })
// data, _ := io.ReadAll(file.File)
// fmt.Printf("data: %v(%v)\n", string(data), len(data))
// },
// })

rootCmd.AddCommand(&cobra.Command{
Use: "test3",
Short: "test3",
// Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
// cmdCtx := GetCmdCtx(cmd)

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
panic(err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

nodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{1, 2}))
if err != nil {
panic(err)
}

red := cdssdk.DefaultLRCRedundancy

var toes []ioswitchlrc.To
for i := 0; i < red.N; i++ {
toes = append(toes, ioswitchlrc.NewToNode(nodes.Nodes[i%2], i, fmt.Sprintf("%d", i)))
}

plans := exec.NewPlanBuilder()
err = lrcparser.Encode(ioswitchlrc.NewFromNode("QmNspjDLxQbAsuh37jRXKvLWHE2f7JpqY4HEJ8x7Jgbzqa", &nodes.Nodes[0], -1), toes, plans)
if err != nil {
panic(err)
// return nil, fmt.Errorf("parsing plan: %w", err)
}

ioRet, err := plans.Execute().Wait(context.TODO())
if err != nil {
panic(err)
// return nil, fmt.Errorf("executing io plan: %w", err)
}

fmt.Printf("ioRet: %v\n", ioRet)
},
})

cmd2 := &cobra.Command{
rootCmd.AddCommand(&cobra.Command{
Use: "test",
Short: "test",
// Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
cmdCtx := GetCmdCtx(cmd)
file, _ := cmdCtx.Cmdline.Svc.ObjectSvc().Download(1, downloader.DownloadReqeust{
ObjectID: 27379,
Length: -1,
})
data, _ := io.ReadAll(file.File)
fmt.Printf("data: %v(%v)\n", string(data), len(data))
// cmdCtx := GetCmdCtx(cmd)

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
panic(err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

nodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{1, 2}))
if err != nil {
panic(err)
}

// red := cdssdk.DefaultLRCRedundancy

plans := exec.NewPlanBuilder()
err = lrcparser.ReconstructGroup([]ioswitchlrc.From{
ioswitchlrc.NewFromNode("QmVAZzVQEvnvTvzSz2SvpziAcDSQ8aYCoTyGrZNuV8raEQ", &nodes.Nodes[1], 0),
ioswitchlrc.NewFromNode("QmVAZzVQEvnvTvzSz2SvpziAcDSQ8aYCoTyGrZNuV8raEQ", &nodes.Nodes[1], 1),
}, []ioswitchlrc.To{
ioswitchlrc.NewToNode(nodes.Nodes[1], 3, "3"),
}, plans)
if err != nil {
panic(err)
// return nil, fmt.Errorf("parsing plan: %w", err)
}

ioRet, err := plans.Execute().Wait(context.TODO())
if err != nil {
panic(err)
// return nil, fmt.Errorf("executing io plan: %w", err)
}

fmt.Printf("ioRet: %v\n", ioRet)
},
}
})

rootCmd.AddCommand(&cobra.Command{
Use: "test4",
Short: "test4",
// Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
// cmdCtx := GetCmdCtx(cmd)

rootCmd.AddCommand(cmd)
rootCmd.AddCommand(cmd2)
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
panic(err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

nodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{1, 2}))
if err != nil {
panic(err)
}

// red := cdssdk.DefaultLRCRedundancy

plans := exec.NewPlanBuilder()
le := int64(1293)
err = lrcparser.ReconstructAny([]ioswitchlrc.From{
ioswitchlrc.NewFromNode("QmVAZzVQEvnvTvzSz2SvpziAcDSQ8aYCoTyGrZNuV8raEQ", &nodes.Nodes[0], 0),
ioswitchlrc.NewFromNode("QmQBKncEDqxw3BrGr3th3gS3jUC2fizGz1w29ZxxrrKfNv", &nodes.Nodes[0], 2),
}, []ioswitchlrc.To{
ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], -1, "-1", exec.Range{0, &le}),
ioswitchlrc.NewToNode(nodes.Nodes[1], 0, "0"),
ioswitchlrc.NewToNode(nodes.Nodes[1], 1, "1"),
ioswitchlrc.NewToNode(nodes.Nodes[1], 2, "2"),
ioswitchlrc.NewToNode(nodes.Nodes[1], 3, "3"),
}, plans)
if err != nil {
panic(err)
// return nil, fmt.Errorf("parsing plan: %w", err)
}

ioRet, err := plans.Execute().Wait(context.TODO())
if err != nil {
panic(err)
// return nil, fmt.Errorf("executing io plan: %w", err)
}

fmt.Printf("ioRet: %v\n", ioRet)
},
})
}

+ 2
- 1
common/models/models.go View File

@@ -3,6 +3,7 @@ package stgmod
import (
"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sort2"
)

type ObjectBlock struct {
@@ -48,7 +49,7 @@ func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock {
grps[block.Index] = grp
}

return lo.Values(grps)
return sort2.Sort(lo.Values(grps), func(l, r GrouppedObjectBlock) int { return l.Index - r.Index })
}

type LocalMachineInfo struct {


+ 12
- 0
common/pkgs/downloader/iterator.go View File

@@ -151,6 +151,18 @@ func (iter *DownloadObjectIterator) doMove() (*Downloading, error) {
return nil, fmt.Errorf("downloading ec object: %w", err)
}

return &Downloading{
Object: &req.Detail.Object,
File: reader,
Request: req.Raw,
}, nil

case *cdssdk.LRCRedundancy:
reader, err := iter.downloadLRCObject(req, red)
if err != nil {
return nil, fmt.Errorf("downloading lrc object: %w", err)
}

return &Downloading{
Object: &req.Detail.Object,
File: reader,


+ 87
- 0
common/pkgs/downloader/lrc.go View File

@@ -0,0 +1,87 @@
package downloader

import (
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
)

func (iter *DownloadObjectIterator) downloadLRCObject(req downloadReqeust2, red *cdssdk.LRCRedundancy) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(req)
if err != nil {
return nil, err
}

var blocks []downloadBlock
selectedBlkIdx := make(map[int]bool)
for _, node := range allNodes {
for _, b := range node.Blocks {
if b.Index >= red.M() || selectedBlkIdx[b.Index] {
continue
}
blocks = append(blocks, downloadBlock{
Node: node.Node,
Block: b,
})
selectedBlkIdx[b.Index] = true
}
}
if len(blocks) < red.K {
return nil, fmt.Errorf("not enough blocks to download lrc object")
}

var logStrs []any = []any{"downloading lrc object from blocks: "}
for i, b := range blocks {
if i > 0 {
logStrs = append(logStrs, ", ")
}
logStrs = append(logStrs, fmt.Sprintf("%v@%v(%v)", b.Block.Index, b.Node.Name, b.Node.NodeID))
}
logger.Debug(logStrs...)

pr, pw := io.Pipe()
go func() {
readPos := req.Raw.Offset
totalReadLen := req.Detail.Object.Size - req.Raw.Offset
if req.Raw.Length >= 0 {
totalReadLen = math2.Min(req.Raw.Length, totalReadLen)
}

firstStripIndex := readPos / int64(red.K) / int64(red.ChunkSize)
stripIter := NewLRCStripIterator(req.Detail.Object, blocks, red, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
defer stripIter.Close()

for totalReadLen > 0 {
strip, err := stripIter.MoveNext()
if err == iterator.ErrNoMoreItem {
pw.CloseWithError(io.ErrUnexpectedEOF)
return
}
if err != nil {
pw.CloseWithError(err)
return
}

readRelativePos := readPos - strip.Position
nextStripPos := strip.Position + int64(red.K)*int64(red.ChunkSize)
curReadLen := math2.Min(totalReadLen, nextStripPos-readPos)

err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen])
if err != nil {
pw.CloseWithError(err)
return
}

totalReadLen -= curReadLen
readPos += curReadLen
}
pw.Close()
}()

return pr, nil
}

+ 191
- 0
common/pkgs/downloader/lrc_strip_iterator.go View File

@@ -0,0 +1,191 @@
package downloader

import (
"context"
"io"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser"
)

type LRCStripIterator struct {
object cdssdk.Object
blocks []downloadBlock
red *cdssdk.LRCRedundancy
curStripIndex int64
cache *StripCache
dataChan chan dataChanEntry
downloadingDone chan any
downloadingDoneOnce sync.Once
inited bool
}

func NewLRCStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator {
if maxPrefetch <= 0 {
maxPrefetch = 1
}

iter := &LRCStripIterator{
object: object,
blocks: blocks,
red: red,
curStripIndex: beginStripIndex,
cache: cache,
dataChan: make(chan dataChanEntry, maxPrefetch-1),
downloadingDone: make(chan any),
}

return iter
}

func (s *LRCStripIterator) MoveNext() (Strip, error) {
if !s.inited {
go s.downloading()
s.inited = true
}

// 先尝试获取一下,用于判断本次获取是否发生了等待
select {
case entry, ok := <-s.dataChan:
if !ok || entry.Error == io.EOF {
return Strip{}, iterator.ErrNoMoreItem
}

if entry.Error != nil {
return Strip{}, entry.Error
}

s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

default:
logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID)
}

// 发生了等待
select {
case entry, ok := <-s.dataChan:
if !ok || entry.Error == io.EOF {
return Strip{}, iterator.ErrNoMoreItem
}

if entry.Error != nil {
return Strip{}, entry.Error
}

s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

case <-s.downloadingDone:
return Strip{}, iterator.ErrNoMoreItem
}
}

func (s *LRCStripIterator) Close() {
s.downloadingDoneOnce.Do(func() {
close(s.downloadingDone)
})
}

func (s *LRCStripIterator) downloading() {
var froms []ioswitchlrc.From
for _, b := range s.blocks {
froms = append(froms, ioswitchlrc.NewFromNode(b.Block.FileHash, &b.Node, b.Block.Index))
}

toExec, hd := ioswitchlrc.NewToDriverWithRange(-1, exec.Range{
Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K),
})

plans := exec.NewPlanBuilder()
err := parser.ReconstructAny(froms, []ioswitchlrc.To{toExec}, plans)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}
exec := plans.Execute()

ctx, cancel := context.WithCancel(context.Background())
go exec.Wait(ctx)
defer cancel()

str, err := exec.BeginRead(hd)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}

curStripIndex := s.curStripIndex
loop:
for {
stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize)
if stripBytesPos >= s.object.Size {
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break
}

stripKey := ECStripKey{
ObjectID: s.object.ObjectID,
StripIndex: curStripIndex,
}

item, ok := s.cache.Get(stripKey)
if ok {
if item.ObjectFileHash == s.object.FileHash {
if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) {
break loop
}
curStripIndex++
continue

} else {
// 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载
s.cache.Remove(stripKey)
}
}

dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
n, err := io.ReadFull(str, dataBuf)
if err == io.ErrUnexpectedEOF {
s.cache.Add(stripKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: s.object.FileHash,
})

s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos})
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break loop
}
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
break loop
}

s.cache.Add(stripKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: s.object.FileHash,
})

if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) {
break loop
}

curStripIndex++
}

close(s.dataChan)
}

func (s *LRCStripIterator) sendToDataChan(entry dataChanEntry) bool {
select {
case s.dataChan <- entry:
return true
case <-s.downloadingDone:
return false
}
}

+ 36
- 0
common/pkgs/ec/lrc/lrc.go View File

@@ -0,0 +1,36 @@
package lrc

import "github.com/klauspost/reedsolomon"

type LRC struct {
n int // 总块数,包括局部块
k int // 数据块数量
groups []int // 分组校验块生成时使用的数据块
l *reedsolomon.LRC
}

func New(n int, k int, groups []int) (*LRC, error) {
lrc := &LRC{
n: n,
k: k,
groups: groups,
}

l, err := reedsolomon.NewLRC(k, n-len(groups)-k, groups)
if err != nil {
return nil, err
}

lrc.l = l
return lrc, nil
}

// 根据全局修复的原理,生成根据输入修复指定块的矩阵。要求input内元素的值<n-len(r),且至少包含k个。
func (l *LRC) GenerateMatrix(inputIdxs []int, outputIdxs []int) ([][]byte, error) {
return l.l.GenerateMatrix(inputIdxs, outputIdxs)
}

// 生成修复组内某个块的矩阵。只支持组内缺少一个块的情况,且默认组内的其他块都存在。
func (l *LRC) GenerateGroupMatrix(outputIdx int) ([][]byte, error) {
return l.l.GenerateGroupMatrix(outputIdx)
}

+ 6
- 0
common/pkgs/ioswitch2/agent_worker.go View File

@@ -5,11 +5,17 @@ import (
"io"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/types"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
)

var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo](
(*AgentWorker)(nil),
)))

type AgentWorker struct {
Node cdssdk.Node
}


+ 23
- 1
common/pkgs/ioswitch2/ops2/chunked.go View File

@@ -9,6 +9,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"golang.org/x/sync/semaphore"
@@ -50,6 +51,16 @@ func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error {
return sem.Acquire(ctx, int64(len(outputs)))
}

func (o *ChunkedSplit) String() string {
return fmt.Sprintf(
"ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)",
o.ChunkSize,
o.PaddingZeros,
o.Input.ID,
utils.FormatVarIDs(o.Outputs),
)
}

type ChunkedJoin struct {
Inputs []*exec.StreamVar `json:"inputs"`
Output *exec.StreamVar `json:"output"`
@@ -81,6 +92,15 @@ func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error {
return fut.Wait(ctx)
}

func (o *ChunkedJoin) String() string {
return fmt.Sprintf(
"ChunkedJoin(chunkSize=%v), (%v) -> %v",
o.ChunkSize,
utils.FormatVarIDs(o.Inputs),
o.Output.ID,
)
}

type ChunkedSplitType struct {
OutputCount int
ChunkSize int
@@ -89,7 +109,9 @@ type ChunkedSplitType struct {
func (t *ChunkedSplitType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
for i := 0; i < t.OutputCount; i++ {
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{
StreamIndex: i,
})
}
}



+ 21
- 12
common/pkgs/ioswitch2/ops2/clone.go View File

@@ -8,6 +8,7 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
"gitlink.org.cn/cloudream/common/utils/io2"
"golang.org/x/sync/semaphore"
)
@@ -18,30 +19,34 @@ func init() {
}

type CloneStream struct {
Input *exec.StreamVar `json:"input"`
Outputs []*exec.StreamVar `json:"outputs"`
Raw *exec.StreamVar `json:"raw"`
Cloneds []*exec.StreamVar `json:"cloneds"`
}

func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}
defer o.Input.Stream.Close()
defer o.Raw.Stream.Close()

cloned := io2.Clone(o.Input.Stream, len(o.Outputs))
cloned := io2.Clone(o.Raw.Stream, len(o.Cloneds))

sem := semaphore.NewWeighted(int64(len(o.Outputs)))
sem := semaphore.NewWeighted(int64(len(o.Cloneds)))
for i, s := range cloned {
sem.Acquire(ctx, 1)

o.Outputs[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) {
o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) {
sem.Release(1)
})
}
exec.PutArrayVars(e, o.Outputs)
exec.PutArrayVars(e, o.Cloneds)

return sem.Acquire(ctx, int64(len(o.Cloneds)))
}

return sem.Acquire(ctx, int64(len(o.Outputs)))
func (o *CloneStream) String() string {
return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.ID, utils.FormatVarIDs(o.Cloneds))
}

type CloneVar struct {
@@ -65,6 +70,10 @@ func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error {
return nil
}

func (o *CloneVar) String() string {
return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds))
}

type CloneStreamType struct{}

func (t *CloneStreamType) InitNode(node *dag.Node) {
@@ -73,8 +82,8 @@ func (t *CloneStreamType) InitNode(node *dag.Node) {

func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &CloneStream{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
Raw: op.InputStreams[0].Var,
Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
}, nil
@@ -104,7 +113,7 @@ func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
}

func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar {
return dag.NodeNewOutputValue(node, nil)
return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil)
}

func (t *CloneVarType) String(node *dag.Node) string {


+ 19
- 14
common/pkgs/ioswitch2/ops2/ec.go View File

@@ -9,6 +9,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/sync2"
@@ -18,8 +19,8 @@ import (
)

func init() {
exec.UseOp[*ECReconstructAny]()
exec.UseOp[*ECReconstruct]()
// exec.UseOp[*ECReconstructAny]()
// exec.UseOp[*ECReconstruct]()
exec.UseOp[*ECMultiply]()
}

@@ -194,27 +195,29 @@ func (o *ECMultiply) Execute(ctx context.Context, e *exec.Executor) error {
return nil
}

func (o *ECMultiply) String() string {
return fmt.Sprintf(
"ECMultiply(coef=%v) (%v) -> (%v)",
o.Coef,
utils.FormatVarIDs(o.Inputs),
utils.FormatVarIDs(o.Outputs),
)
}

type MultiplyType struct {
EC cdssdk.ECRedundancy
EC cdssdk.ECRedundancy
InputIndexes []int
OutputIndexes []int
}

func (t *MultiplyType) InitNode(node *dag.Node) {}

func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) {
var inputIdxs []int
var outputIdxs []int
for _, in := range op.InputStreams {
inputIdxs = append(inputIdxs, ioswitch2.SProps(in).StreamIndex)
}
for _, out := range op.OutputStreams {
outputIdxs = append(outputIdxs, ioswitch2.SProps(out).StreamIndex)
}

rs, err := ec.NewRs(t.EC.K, t.EC.N)
if err != nil {
return nil, err
}
coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs)
coef, err := rs.GenerateMatrix(t.InputIndexes, t.OutputIndexes)
if err != nil {
return nil, err
}
@@ -227,12 +230,14 @@ func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) {
}, nil
}

func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar) {
func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
node.InputStreams = append(node.InputStreams, str)
str.To(node, len(node.InputStreams)-1)
}

func (t *MultiplyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex})
}



+ 8
- 0
common/pkgs/ioswitch2/ops2/file.go View File

@@ -51,6 +51,10 @@ func (o *FileWrite) Execute(ctx context.Context, e *exec.Executor) error {
return nil
}

func (o *FileWrite) String() string {
return fmt.Sprintf("FileWrite %v -> %s", o.Input.ID, o.FilePath)
}

type FileRead struct {
Output *exec.StreamVar `json:"output"`
FilePath string `json:"filePath"`
@@ -72,6 +76,10 @@ func (o *FileRead) Execute(ctx context.Context, e *exec.Executor) error {
return nil
}

func (o *FileRead) String() string {
return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output.ID)
}

type FileReadType struct {
FilePath string
}


+ 9
- 1
common/pkgs/ioswitch2/ops2/ipfs.go View File

@@ -53,6 +53,10 @@ func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error {
return fut.Wait(ctx)
}

func (o *IPFSRead) String() string {
return fmt.Sprintf("IPFSRead %v -> %v", o.FileHash, o.Output.ID)
}

type IPFSWrite struct {
Input *exec.StreamVar `json:"input"`
FileHash *exec.StringVar `json:"fileHash"`
@@ -86,6 +90,10 @@ func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error {
return nil
}

func (o *IPFSWrite) String() string {
return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID)
}

type IPFSReadType struct {
FileHash string
Option ipfs.ReadOption
@@ -114,7 +122,7 @@ type IPFSWriteType struct {

func (t *IPFSWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, &ioswitch2.VarProps{})
dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitch2.VarProps{})
}

func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {


+ 4
- 0
common/pkgs/ioswitch2/ops2/range.go View File

@@ -72,6 +72,10 @@ func (o *Range) Execute(ctx context.Context, e *exec.Executor) error {
return nil
}

func (o *Range) String() string {
return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID)
}

type RangeType struct {
Range exec.Range
}


+ 22
- 12
common/pkgs/ioswitch2/parser/parser.go View File

@@ -35,7 +35,7 @@ type ParseContext struct {
}

func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
ctx := ParseContext{Ft: ft}
ctx := ParseContext{Ft: ft, DAG: dag.NewGraph()}

// 分成两个阶段:
// 1. 基于From和To生成更多指令,初步匹配to的需求
@@ -136,17 +136,15 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) {

func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch2.FromTo) error {
for _, fr := range ft.Froms {
_, err := p.buildFromNode(ctx, &ft, fr)
frNode, err := p.buildFromNode(ctx, &ft, fr)
if err != nil {
return err
}

// 对于完整文件的From,生成Split指令
if fr.GetDataIndex() == -1 {
n, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{})
for i := 0; i < p.EC.K; i++ {
ioswitch2.SProps(n.OutputStreams[i]).StreamIndex = i
}
node, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{})
frNode.OutputStreams[0].To(node, 0)
}
}

@@ -170,7 +168,7 @@ loop:
}, &ioswitch2.NodeProps{})

for _, s := range ecInputStrs {
mulType.AddInput(mulNode, s)
mulType.AddInput(mulNode, s, ioswitch2.SProps(s).StreamIndex)
}
for i := 0; i < p.EC.N; i++ {
mulType.NewOutput(mulNode, i)
@@ -247,6 +245,7 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f

if f.Node != nil {
n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node})
n.Env.Pinned = true
}

return n, nil
@@ -254,6 +253,7 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f
case *ioswitch2.FromDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch2.NodeProps{From: f})
n.Env.ToEnvDriver()
n.Env.Pinned = true
ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
@@ -280,12 +280,15 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t i
}, &ioswitch2.NodeProps{
To: t,
})
n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node})
n.Env.Pinned = true

return n, nil

case *ioswitch2.ToDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch2.NodeProps{To: t})
n.Env.ToEnvDriver()
n.Env.Pinned = true

return n, nil

@@ -324,9 +327,11 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
}

node.OutputStreams[i2] = nil
typ.OutputIndexes[i2] = -2
changed = true
}
node.OutputStreams = lo2.RemoveAllDefault(node.OutputStreams)
typ.OutputIndexes = lo2.RemoveAll(typ.OutputIndexes, -2)

// 如果所有输出流都被删除,则删除该指令
if len(node.OutputStreams) == 0 {
@@ -422,6 +427,10 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
func (p *DefaultParser) pin(ctx *ParseContext) bool {
changed := false
ctx.DAG.Walk(func(node *dag.Node) bool {
if node.Env.Pinned {
return true
}

var toEnv *dag.NodeEnv
for _, out := range node.OutputStreams {
for _, to := range out.Toes {
@@ -496,12 +505,11 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
return true
}

n := ctx.DAG.NewNode(&ops.StoreType{
n, t := dag.NewNode(ctx.DAG, &ops.StoreType{
StoreKey: typ.FileHashStoreKey,
}, &ioswitch2.NodeProps{})
n.Env.ToEnvDriver()

node.OutputValues[0].To(n, 0)
t.Store(n, node.OutputValues[0])
return true
})
}
@@ -564,7 +572,9 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {
n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch2.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
str := t.NewOutput(n)
str.Props = &ioswitch2.VarProps{StreamIndex: ioswitch2.SProps(out).StreamIndex}
str.To(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
@@ -578,7 +588,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {
n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch2.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
t.NewOutput(n).To(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)


+ 67
- 0
common/pkgs/ioswitchlrc/agent_worker.go View File

@@ -0,0 +1,67 @@
package ioswitchlrc

import (
"context"
"io"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/types"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
)

var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo](
(*AgentWorker)(nil),
)))

type AgentWorker struct {
Node cdssdk.Node
}

func (w *AgentWorker) NewClient() (exec.WorkerClient, error) {
cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&w.Node))
if err != nil {
return nil, err
}

return &AgentWorkerClient{cli: cli}, nil
}

func (w *AgentWorker) String() string {
return w.Node.String()
}

func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool {
aw, ok := worker.(*AgentWorker)
if !ok {
return false
}

return w.Node.NodeID == aw.Node.NodeID
}

type AgentWorkerClient struct {
cli *agtrpc.PoolClient
}

func (c *AgentWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
return c.cli.ExecuteIOPlan(ctx, plan)
}
func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, str io.ReadCloser) error {
return c.cli.SendStream(ctx, planID, v.ID, str)
}
func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error {
return c.cli.SendVar(ctx, planID, v)
}
func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, signal *exec.SignalVar) (io.ReadCloser, error) {
return c.cli.GetStream(ctx, planID, v.ID, signal)
}
func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error {
return c.cli.GetVar(ctx, planID, v, signal)
}
func (c *AgentWorkerClient) Close() error {
stgglb.AgentRPCPool.Release(c.cli)
return nil
}

+ 134
- 0
common/pkgs/ioswitchlrc/fromto.go View File

@@ -0,0 +1,134 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type From interface {
GetDataIndex() int
}

type To interface {
// To所需要的文件流的范围。具体含义与DataIndex有关系:
// 如果DataIndex == -1,则表示在整个文件的范围。
// 如果DataIndex >= 0,则表示在文件的某个分片的范围。
GetRange() exec.Range
GetDataIndex() int
}

type FromDriver struct {
Handle *exec.DriverWriteStream
DataIndex int
}

func NewFromDriver(dataIndex int) (*FromDriver, *exec.DriverWriteStream) {
handle := &exec.DriverWriteStream{
RangeHint: &exec.Range{},
}
return &FromDriver{
Handle: handle,
DataIndex: dataIndex,
}, handle
}

func (f *FromDriver) GetDataIndex() int {
return f.DataIndex
}

type FromNode struct {
FileHash string
Node *cdssdk.Node
DataIndex int
}

func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode {
return &FromNode{
FileHash: fileHash,
Node: node,
DataIndex: dataIndex,
}
}

func (f *FromNode) GetDataIndex() int {
return f.DataIndex
}

type ToDriver struct {
Handle *exec.DriverReadStream
DataIndex int
Range exec.Range
}

func NewToDriver(dataIndex int) (*ToDriver, *exec.DriverReadStream) {
str := exec.DriverReadStream{}
return &ToDriver{
Handle: &str,
DataIndex: dataIndex,
}, &str
}

func NewToDriverWithRange(dataIndex int, rng exec.Range) (*ToDriver, *exec.DriverReadStream) {
str := exec.DriverReadStream{}
return &ToDriver{
Handle: &str,
DataIndex: dataIndex,
Range: rng,
}, &str
}

func (t *ToDriver) GetDataIndex() int {
return t.DataIndex
}

func (t *ToDriver) GetRange() exec.Range {
return t.Range
}

type ToNode struct {
Node cdssdk.Node
DataIndex int
Range exec.Range
FileHashStoreKey string
}

func NewToNode(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToNode {
return &ToNode{
Node: node,
DataIndex: dataIndex,
FileHashStoreKey: fileHashStoreKey,
}
}

func NewToNodeWithRange(node cdssdk.Node, dataIndex int, fileHashStoreKey string, rng exec.Range) *ToNode {
return &ToNode{
Node: node,
DataIndex: dataIndex,
FileHashStoreKey: fileHashStoreKey,
Range: rng,
}
}

func (t *ToNode) GetDataIndex() int {
return t.DataIndex
}

func (t *ToNode) GetRange() exec.Range {
return t.Range
}

// type ToStorage struct {
// Storage cdssdk.Storage
// DataIndex int
// }

// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage {
// return &ToStorage{
// Storage: storage,
// DataIndex: dataIndex,
// }
// }

// func (t *ToStorage) GetDataIndex() int {
// return t.DataIndex
// }

+ 23
- 0
common/pkgs/ioswitchlrc/ioswitch.go View File

@@ -0,0 +1,23 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type NodeProps struct {
From From
To To
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type VarProps struct {
StreamIndex int // 流的编号,只在StreamVar上有意义
ValueType ValueVarType // 值类型,只在ValueVar上有意义
Var exec.Var // 生成Plan的时候创建的对应的Var
}

+ 157
- 0
common/pkgs/ioswitchlrc/ops2/chunked.go View File

@@ -0,0 +1,157 @@
package ops2

import (
"context"
"fmt"
"io"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"golang.org/x/sync/semaphore"
)

func init() {
exec.UseOp[*ChunkedSplit]()
exec.UseOp[*ChunkedJoin]()
}

type ChunkedSplit struct {
Input *exec.StreamVar `json:"input"`
Outputs []*exec.StreamVar `json:"outputs"`
ChunkSize int `json:"chunkSize"`
PaddingZeros bool `json:"paddingZeros"`
}

func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

outputs := io2.ChunkedSplit(o.Input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{
PaddingZeros: o.PaddingZeros,
})

sem := semaphore.NewWeighted(int64(len(outputs)))
for i := range outputs {
sem.Acquire(ctx, 1)

o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
sem.Release(1)
})
}
exec.PutArrayVars(e, o.Outputs)

return sem.Acquire(ctx, int64(len(outputs)))
}

func (o *ChunkedSplit) String() string {
return fmt.Sprintf(
"ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)",
o.ChunkSize,
o.PaddingZeros,
o.Input.ID,
utils.FormatVarIDs(o.Outputs),
)
}

type ChunkedJoin struct {
Inputs []*exec.StreamVar `json:"inputs"`
Output *exec.StreamVar `json:"output"`
ChunkSize int `json:"chunkSize"`
}

func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error {
err := exec.BindArrayVars(e, ctx, o.Inputs)
if err != nil {
return err
}

var strReaders []io.Reader
for _, s := range o.Inputs {
strReaders = append(strReaders, s.Stream)
}
defer func() {
for _, str := range o.Inputs {
str.Stream.Close()
}
}()

fut := future.NewSetVoid()
o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
fut.SetVoid()
})
e.PutVars(o.Output)

return fut.Wait(ctx)
}

func (o *ChunkedJoin) String() string {
return fmt.Sprintf(
"ChunkedJoin(chunkSize=%v), (%v) -> %v",
o.ChunkSize,
utils.FormatVarIDs(o.Inputs),
o.Output.ID,
)
}

type ChunkedSplitType struct {
OutputCount int
ChunkSize int
}

func (t *ChunkedSplitType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
for i := 0; i < t.OutputCount; i++ {
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{
StreamIndex: i,
})
}
}

func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &ChunkedSplit{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
ChunkSize: t.ChunkSize,
PaddingZeros: true,
}, nil
}

func (t *ChunkedSplitType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}

type ChunkedJoinType struct {
InputCount int
ChunkSize int
}

func (t *ChunkedJoinType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, t.InputCount)
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{
StreamIndex: -1,
})
}

func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &ChunkedJoin{
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
Output: op.OutputStreams[0].Var,
ChunkSize: t.ChunkSize,
}, nil
}

func (t *ChunkedJoinType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}

+ 121
- 0
common/pkgs/ioswitchlrc/ops2/clone.go View File

@@ -0,0 +1,121 @@
package ops2

import (
"context"
"fmt"
"io"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
"gitlink.org.cn/cloudream/common/utils/io2"
"golang.org/x/sync/semaphore"
)

func init() {
exec.UseOp[*CloneStream]()
exec.UseOp[*CloneVar]()
}

type CloneStream struct {
Raw *exec.StreamVar `json:"raw"`
Cloneds []*exec.StreamVar `json:"cloneds"`
}

func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}
defer o.Raw.Stream.Close()

cloned := io2.Clone(o.Raw.Stream, len(o.Cloneds))

sem := semaphore.NewWeighted(int64(len(o.Cloneds)))
for i, s := range cloned {
sem.Acquire(ctx, 1)

o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) {
sem.Release(1)
})
}
exec.PutArrayVars(e, o.Cloneds)

return sem.Acquire(ctx, int64(len(o.Cloneds)))
}

func (o *CloneStream) String() string {
return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.ID, utils.FormatVarIDs(o.Cloneds))
}

type CloneVar struct {
Raw exec.Var `json:"raw"`
Cloneds []exec.Var `json:"cloneds"`
}

func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}

for _, v := range o.Cloneds {
if err := exec.AssignVar(o.Raw, v); err != nil {
return fmt.Errorf("clone var: %w", err)
}
}
e.PutVars(o.Cloneds...)

return nil
}

func (o *CloneVar) String() string {
return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds))
}

type CloneStreamType struct{}

func (t *CloneStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &CloneStream{
Raw: op.InputStreams[0].Var,
Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
}, nil
}

func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar {
return dag.NodeNewOutputStream(node, nil)
}

func (t *CloneStreamType) String(node *dag.Node) string {
return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type CloneVarType struct{}

func (t *CloneVarType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
}

func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &CloneVar{
Raw: op.InputValues[0].Var,
Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var {
return v.Var
}),
}, nil
}

func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar {
return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil)
}

func (t *CloneVarType) String(node *dag.Node) string {
return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 201
- 0
common/pkgs/ioswitchlrc/ops2/ec.go View File

@@ -0,0 +1,201 @@
package ops2

import (
"context"
"fmt"
"io"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec/lrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
)

func init() {
exec.UseOp[*GalMultiply]()
}

type GalMultiply struct {
Coef [][]byte `json:"coef"`
Inputs []*exec.StreamVar `json:"inputs"`
Outputs []*exec.StreamVar `json:"outputs"`
ChunkSize int `json:"chunkSize"`
}

func (o *GalMultiply) Execute(ctx context.Context, e *exec.Executor) error {
err := exec.BindArrayVars(e, ctx, o.Inputs)
if err != nil {
return err
}
defer func() {
for _, s := range o.Inputs {
s.Stream.Close()
}
}()

outputWrs := make([]*io.PipeWriter, len(o.Outputs))

for i := range o.Outputs {
rd, wr := io.Pipe()
o.Outputs[i].Stream = rd
outputWrs[i] = wr
}

fut := future.NewSetVoid()
go func() {
mul := ec.GaloisMultiplier().BuildGalois()

inputChunks := make([][]byte, len(o.Inputs))
for i := range o.Inputs {
inputChunks[i] = make([]byte, o.ChunkSize)
}
outputChunks := make([][]byte, len(o.Outputs))
for i := range o.Outputs {
outputChunks[i] = make([]byte, o.ChunkSize)
}

for {
err := sync2.ParallelDo(o.Inputs, func(s *exec.StreamVar, i int) error {
_, err := io.ReadFull(s.Stream, inputChunks[i])
return err
})
if err == io.EOF {
fut.SetVoid()
return
}
if err != nil {
fut.SetError(err)
return
}

err = mul.Multiply(o.Coef, inputChunks, outputChunks)
if err != nil {
fut.SetError(err)
return
}

for i := range o.Outputs {
err := io2.WriteAll(outputWrs[i], outputChunks[i])
if err != nil {
fut.SetError(err)
return
}
}
}
}()

exec.PutArrayVars(e, o.Outputs)
err = fut.Wait(ctx)
if err != nil {
for _, wr := range outputWrs {
wr.CloseWithError(err)
}
return err
}

for _, wr := range outputWrs {
wr.Close()
}
return nil
}

func (o *GalMultiply) String() string {
return fmt.Sprintf(
"ECMultiply(coef=%v) (%v) -> (%v)",
o.Coef,
utils.FormatVarIDs(o.Inputs),
utils.FormatVarIDs(o.Outputs),
)
}

type LRCConstructAnyType struct {
LRC cdssdk.LRCRedundancy
InputIndexes []int
OutputIndexes []int
}

func (t *LRCConstructAnyType) InitNode(node *dag.Node) {}

func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) {
l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups)
if err != nil {
return nil, err
}
coef, err := l.GenerateMatrix(t.InputIndexes, t.OutputIndexes)
if err != nil {
return nil, err
}

return &GalMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.LRC.ChunkSize,
}, nil
}

func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
node.InputStreams = append(node.InputStreams, str)
str.To(node, len(node.InputStreams)-1)
}

func (t *LRCConstructAnyType) RemoveAllInputs(n *dag.Node) {
for _, in := range n.InputStreams {
in.From.Node.OutputStreams[in.From.SlotIndex].NotTo(n)
}
n.InputStreams = nil
t.InputIndexes = nil
}

func (t *LRCConstructAnyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
return dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{StreamIndex: dataIndex})
}

func (t *LRCConstructAnyType) String(node *dag.Node) string {
return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type LRCConstructGroupType struct {
LRC cdssdk.LRCRedundancy
TargetBlockIndex int
}

func (t *LRCConstructGroupType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{
StreamIndex: t.TargetBlockIndex,
})

grpIdx := t.LRC.FindGroup(t.TargetBlockIndex)
dag.NodeDeclareInputStream(node, t.LRC.Groups[grpIdx])
}

func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) {
l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups)
if err != nil {
return nil, err
}
coef, err := l.GenerateGroupMatrix(t.TargetBlockIndex)
if err != nil {
return nil, err
}

return &GalMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.LRC.ChunkSize,
}, nil
}

func (t *LRCConstructGroupType) String(node *dag.Node) string {
return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 137
- 0
common/pkgs/ioswitchlrc/ops2/ipfs.go View File

@@ -0,0 +1,137 @@
package ops2

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
)

func init() {
exec.UseOp[*IPFSRead]()
exec.UseOp[*IPFSWrite]()
}

type IPFSRead struct {
Output *exec.StreamVar `json:"output"`
FileHash string `json:"fileHash"`
Option ipfs.ReadOption `json:"option"`
}

func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error {
logger.
WithField("FileHash", o.FileHash).
Debugf("ipfs read op")
defer logger.Debugf("ipfs read op finished")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

file, err := ipfsCli.OpenRead(o.FileHash, o.Option)
if err != nil {
return fmt.Errorf("reading ipfs: %w", err)
}
defer file.Close()

fut := future.NewSetVoid()
o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
fut.SetVoid()
})
e.PutVars(o.Output)

return fut.Wait(ctx)
}

func (o *IPFSRead) String() string {
return fmt.Sprintf("IPFSRead %v -> %v", o.FileHash, o.Output.ID)
}

type IPFSWrite struct {
Input *exec.StreamVar `json:"input"`
FileHash *exec.StringVar `json:"fileHash"`
}

func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error {
logger.
WithField("Input", o.Input.ID).
WithField("FileHashVar", o.FileHash.ID).
Debugf("ipfs write op")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

err = e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

o.FileHash.Value, err = ipfsCli.CreateFile(o.Input.Stream)
if err != nil {
return fmt.Errorf("creating ipfs file: %w", err)
}

e.PutVars(o.FileHash)

return nil
}

func (o *IPFSWrite) String() string {
return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID)
}

type IPFSReadType struct {
FileHash string
Option ipfs.ReadOption
}

func (t *IPFSReadType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{})
}

func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) {
return &IPFSRead{
Output: n.OutputStreams[0].Var,
FileHash: t.FileHash,
Option: t.Option,
}, nil
}

func (t *IPFSReadType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
}

type IPFSWriteType struct {
FileHashStoreKey string
Range exec.Range
}

func (t *IPFSWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitchlrc.VarProps{})
}

func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &IPFSWrite{
Input: op.InputStreams[0].Var,
FileHash: op.OutputValues[0].Var.(*exec.StringVar),
}, nil
}

func (t *IPFSWriteType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

+ 75
- 0
common/pkgs/ioswitchlrc/ops2/ops.go View File

@@ -0,0 +1,75 @@
package ops2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func formatStreamIO(node *dag.Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("S{%s>%s}", is, os)
}

func formatValueIO(node *dag.Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("V{%s>%s}", is, os)
}

+ 99
- 0
common/pkgs/ioswitchlrc/ops2/range.go View File

@@ -0,0 +1,99 @@
package ops2

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
)

func init() {
exec.UseOp[*Range]()
}

type Range struct {
Input *exec.StreamVar `json:"input"`
Output *exec.StreamVar `json:"output"`
Offset int64 `json:"offset"`
Length *int64 `json:"length"`
}

func (o *Range) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

buf := make([]byte, 1024*16)

// 跳过前Offset个字节
for o.Offset > 0 {
rdCnt := math2.Min(o.Offset, int64(len(buf)))
rd, err := o.Input.Stream.Read(buf[:rdCnt])
if err == io.EOF {
// 输入流不够长度也不报错,只是产生一个空的流
break
}
if err != nil {
return err
}
o.Offset -= int64(rd)
}

fut := future.NewSetVoid()

if o.Length == nil {
o.Output.Stream = io2.AfterEOF(o.Input.Stream, func(closer io.ReadCloser, err error) {
fut.SetVoid()
})

e.PutVars(o.Output)
return fut.Wait(ctx)
}

o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) {
fut.SetVoid()
})

e.PutVars(o.Output)
err = fut.Wait(ctx)
if err != nil {
return err
}

io2.DropWithBuf(o.Input.Stream, buf)
return nil
}

func (o *Range) String() string {
return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID)
}

type RangeType struct {
Range exec.Range
}

func (t *RangeType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{})
}

func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) {
return &Range{
Input: n.InputStreams[0].Var,
Output: n.OutputStreams[0].Var,
Offset: t.Range.Offset,
Length: t.Range.Length,
}, nil
}

func (t *RangeType) String(node *dag.Node) string {
return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

+ 301
- 0
common/pkgs/ioswitchlrc/parser/generator.go View File

@@ -0,0 +1,301 @@
package parser

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2"
)

type GenerateContext struct {
LRC cdssdk.LRCRedundancy
DAG *dag.Graph
Toes []ioswitchlrc.To
StreamRange exec.Range
}

// 输入一个完整文件,从这个完整文件产生任意文件块(也可再产生完整文件)。
func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
if fr.GetDataIndex() != -1 {
return fmt.Errorf("from data is not a complete file")
}

ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
}

calcStreamRange(&ctx)
err := buildDAGEncode(&ctx, fr, toes)
if err != nil {
return err
}

// 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
for pin(&ctx) {
}

// 下面这些只需要执行一次,但需要按顺序
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
}

func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlrc.To) error {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
return fmt.Errorf("building from node: %w", err)
}

var dataToes []ioswitchlrc.To
var parityToes []ioswitchlrc.To

// 先创建需要完整文件的To节点,同时统计一下需要哪些文件块
for _, to := range toes {
idx := to.GetDataIndex()
if idx == -1 {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

frNode.OutputStreams[0].To(toNode, 0)

} else if idx < ctx.LRC.K {
dataToes = append(dataToes, to)
} else {
parityToes = append(parityToes, to)
}
}

if len(dataToes) == 0 && len(parityToes) == 0 {
return nil
}

// 需要文件块,则生成Split指令
splitNode := ctx.DAG.NewNode(&ops2.ChunkedSplitType{
OutputCount: ctx.LRC.K,
ChunkSize: ctx.LRC.ChunkSize,
}, &ioswitchlrc.NodeProps{})
frNode.OutputStreams[0].To(splitNode, 0)

for _, to := range dataToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

splitNode.OutputStreams[to.GetDataIndex()].To(toNode, 0)
}

if len(parityToes) == 0 {
return nil
}

// 需要校验块,则进一步生成Construct指令

conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{
LRC: ctx.LRC,
}, &ioswitchlrc.NodeProps{})

for _, out := range splitNode.OutputStreams {
conType.AddInput(conNode, out, ioswitchlrc.SProps(out).StreamIndex)
}

for _, to := range parityToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0)
}
return nil
}

// 提供数据块+编码块中的k个块,重建任意块,包括完整文件。
func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
}

calcStreamRange(&ctx)
err := buildDAGReconstructAny(&ctx, frs, toes)
if err != nil {
return err
}

// 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
for pin(&ctx) {
}

// 下面这些只需要执行一次,但需要按顺序
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
}

func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error {
frNodes := make(map[int]*dag.Node)
for _, fr := range frs {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
return fmt.Errorf("building from node: %w", err)
}

frNodes[fr.GetDataIndex()] = frNode
}

var completeToes []ioswitchlrc.To
var missedToes []ioswitchlrc.To

// 先创建需要完整文件的To节点,同时统计一下需要哪些文件块
for _, to := range toes {
toIdx := to.GetDataIndex()
fr := frNodes[toIdx]
if fr != nil {
node, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

fr.OutputStreams[0].To(node, 0)
continue
}

if toIdx == -1 {
completeToes = append(completeToes, to)
} else {
missedToes = append(missedToes, to)
}
}

if len(completeToes) == 0 && len(missedToes) == 0 {
return nil
}

// 生成Construct指令来恢复缺少的块

conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{
LRC: ctx.LRC,
}, &ioswitchlrc.NodeProps{})

for _, fr := range frNodes {
conType.AddInput(conNode, fr.OutputStreams[0], ioswitchlrc.SProps(fr.OutputStreams[0]).StreamIndex)
}

for _, to := range missedToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0)
}

if len(completeToes) == 0 {
return nil
}

// 需要完整文件,则生成Join指令

joinNode := ctx.DAG.NewNode(&ops2.ChunkedJoinType{
InputCount: ctx.LRC.K,
ChunkSize: ctx.LRC.ChunkSize,
}, &ioswitchlrc.NodeProps{})

for i := 0; i < ctx.LRC.K; i++ {
n := frNodes[i]
if n == nil {
conType.NewOutput(conNode, i).To(joinNode, i)
} else {
n.OutputStreams[0].To(joinNode, i)
}
}

for _, to := range completeToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

joinNode.OutputStreams[0].To(toNode, 0)
}

// 如果不需要Construct任何块,则删除这个节点
if len(conNode.OutputStreams) == 0 {
conType.RemoveAllInputs(conNode)
ctx.DAG.RemoveNode(conNode)
}

return nil
}

// 输入同一组的多个块,恢复出剩下缺少的一个块。
func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
}

calcStreamRange(&ctx)
err := buildDAGReconstructGroup(&ctx, frs, toes)
if err != nil {
return err
}

// 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
for pin(&ctx) {
}

// 下面这些只需要执行一次,但需要按顺序
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
}

func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error {
missedGrpIdx := toes[0].GetDataIndex()

conNode := ctx.DAG.NewNode(&ops2.LRCConstructGroupType{
LRC: ctx.LRC,
TargetBlockIndex: missedGrpIdx,
}, &ioswitchlrc.NodeProps{})

for i, fr := range frs {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
return fmt.Errorf("building from node: %w", err)
}

frNode.OutputStreams[0].To(conNode, i)
}

for _, to := range toes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

conNode.OutputStreams[0].To(toNode, 0)
}

return nil
}

+ 320
- 0
common/pkgs/ioswitchlrc/parser/passes.go View File

@@ -0,0 +1,320 @@
package parser

import (
"fmt"
"math"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2"
)

// 计算输入流的打开范围。会把流的范围按条带大小取整
func calcStreamRange(ctx *GenerateContext) {
stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K)

rng := exec.Range{
Offset: math.MaxInt64,
}

for _, to := range ctx.Toes {
if to.GetDataIndex() == -1 {
toRng := to.GetRange()
rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
if toRng.Length != nil {
rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize))
} else {
rng.Length = nil
}

} else {
toRng := to.GetRange()

blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.LRC.ChunkSize))
rng.ExtendStart(blkStartIndex * stripSize)
if toRng.Length != nil {
blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.LRC.ChunkSize))
rng.ExtendEnd(blkEndIndex * stripSize)
} else {
rng.Length = nil
}
}
}

ctx.StreamRange = rng
}

func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) {
var repRange exec.Range
var blkRange exec.Range

repRange.Offset = ctx.StreamRange.Offset
blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize)
if ctx.StreamRange.Length != nil {
repRngLen := *ctx.StreamRange.Length
repRange.Length = &repRngLen

blkRngLen := *ctx.StreamRange.Length / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize)
blkRange.Length = &blkRngLen
}

switch f := f.(type) {
case *ioswitchlrc.FromNode:
n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{
FileHash: f.FileHash,
Option: ipfs.ReadOption{
Offset: 0,
Length: -1,
},
}, &ioswitchlrc.NodeProps{
From: f,
})
ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
t.Option.Offset = repRange.Offset
if repRange.Length != nil {
t.Option.Length = *repRange.Length
}
} else {
t.Option.Offset = blkRange.Offset
if blkRange.Length != nil {
t.Option.Length = *blkRange.Length
}
}

if f.Node != nil {
n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node})
n.Env.Pinned = true
}

return n, nil

case *ioswitchlrc.FromDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitchlrc.NodeProps{From: f})
n.Env.ToEnvDriver()
n.Env.Pinned = true
ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
f.Handle.RangeHint.Offset = repRange.Offset
f.Handle.RangeHint.Length = repRange.Length
} else {
f.Handle.RangeHint.Offset = blkRange.Offset
f.Handle.RangeHint.Length = blkRange.Length
}

return n, nil

default:
return nil, fmt.Errorf("unsupported from type %T", f)
}
}

func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) {
switch t := t.(type) {
case *ioswitchlrc.ToNode:
n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{
FileHashStoreKey: t.FileHashStoreKey,
Range: t.Range,
}, &ioswitchlrc.NodeProps{
To: t,
})
n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node})
n.Env.Pinned = true

return n, nil

case *ioswitchlrc.ToDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitchlrc.NodeProps{To: t})
n.Env.ToEnvDriver()
n.Env.Pinned = true

return n, nil

default:
return nil, fmt.Errorf("unsupported to type %T", t)
}
}

// 通过流的输入输出位置来确定指令的执行位置。
// To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG,
// 所以理论上不会出现有指令的位置始终无法确定的情况。
func pin(ctx *GenerateContext) bool {
changed := false
ctx.DAG.Walk(func(node *dag.Node) bool {
if node.Env.Pinned {
return true
}

var toEnv *dag.NodeEnv
for _, out := range node.OutputStreams {
for _, to := range out.Toes {
if to.Node.Env.Type == dag.EnvUnknown {
continue
}

if toEnv == nil {
toEnv = &to.Node.Env
} else if !toEnv.Equals(to.Node.Env) {
toEnv = nil
break
}
}
}

if toEnv != nil {
if !node.Env.Equals(*toEnv) {
changed = true
}

node.Env = *toEnv
return true
}

// 否则根据输入流的始发地来固定
var fromEnv *dag.NodeEnv
for _, in := range node.InputStreams {
if in.From.Node.Env.Type == dag.EnvUnknown {
continue
}

if fromEnv == nil {
fromEnv = &in.From.Node.Env
} else if !fromEnv.Equals(in.From.Node.Env) {
fromEnv = nil
break
}
}

if fromEnv != nil {
if !node.Env.Equals(*fromEnv) {
changed = true
}

node.Env = *fromEnv
}
return true
})

return changed
}

// 对于所有未使用的流,增加Drop指令
func dropUnused(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) == 0 {
n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
out.To(n, 0)
}
}
return true
})
}

// 为IPFS写入指令存储结果
func storeIPFSWriteResult(ctx *GenerateContext) {
dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool {
if typ.FileHashStoreKey == "" {
return true
}

n, t := dag.NewNode(ctx.DAG, &ops.StoreType{
StoreKey: typ.FileHashStoreKey,
}, &ioswitchlrc.NodeProps{})
n.Env.ToEnvDriver()

t.Store(n, node.OutputValues[0])
return true
})
}

// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
func generateRange(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
props := ioswitchlrc.NProps(node)
if props.To == nil {
return true
}

toDataIdx := props.To.GetDataIndex()
toRng := props.To.GetRange()

if toDataIdx == -1 {
n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
},
}, &ioswitchlrc.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)

} else {
stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K)
blkStartIdx := ctx.StreamRange.Offset / stripSize

blkStart := blkStartIdx * int64(ctx.LRC.ChunkSize)

n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
},
}, &ioswitchlrc.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)
}

return true
})
}

// 生成Clone指令
func generateClone(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
str := t.NewOutput(n)
str.Props = &ioswitchlrc.VarProps{StreamIndex: ioswitchlrc.SProps(out).StreamIndex}
str.To(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
}

for _, out := range node.OutputValues {
if len(out.Toes) <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
}

return true
})
}

+ 17
- 0
common/pkgs/ioswitchlrc/utils.go View File

@@ -0,0 +1,17 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func NProps(n *dag.Node) *NodeProps {
return dag.NProps[*NodeProps](n)
}

func SProps(str *dag.StreamVar) *VarProps {
return dag.SProps[*VarProps](str)
}

func VProps(v *dag.ValueVar) *VarProps {
return dag.VProps[*VarProps](v)
}

+ 351
- 22
scanner/internal/event/check_package_redundancy.go View File

@@ -16,6 +16,8 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
@@ -91,15 +93,15 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
return
}

allNodes := make(map[cdssdk.NodeID]*NodeLoadInfo)
userAllNodes := make(map[cdssdk.NodeID]*NodeLoadInfo)
for _, node := range getNodes.Nodes {
allNodes[node.NodeID] = &NodeLoadInfo{
userAllNodes[node.NodeID] = &NodeLoadInfo{
Node: node,
}
}

for _, log := range getLogs.Logs {
info, ok := allNodes[log.Storage.NodeID]
info, ok := userAllNodes[log.Storage.NodeID]
if !ok {
continue
}
@@ -118,10 +120,11 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
defEC := cdssdk.DefaultECRedundancy

// TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点
// TODO 放到chooseRedundancy函数中
mostBlockNodeIDs := t.summaryRepObjectBlockNodes(getObjs.Objects, 2)
newRepNodes := t.chooseNewNodesForRep(&defRep, allNodes)
rechoosedRepNodes := t.rechooseNodesForRep(mostBlockNodeIDs, &defRep, allNodes)
newECNodes := t.chooseNewNodesForEC(&defEC, allNodes)
newRepNodes := t.chooseNewNodesForRep(&defRep, userAllNodes)
rechoosedRepNodes := t.rechooseNodesForRep(mostBlockNodeIDs, &defRep, userAllNodes)
newECNodes := t.chooseNewNodesForEC(&defEC, userAllNodes)

// 加锁
builder := reqbuilder.NewBuilder()
@@ -142,33 +145,50 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
var updating *coormq.UpdatingObjectRedundancy
var err error

shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
newRed, selectedNodes := t.chooseRedundancy(obj, userAllNodes)

switch red := obj.Object.Redundancy.(type) {
switch srcRed := obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
if shouldUseEC {
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
updating, err = t.noneToEC(obj, &defEC, newECNodes)
} else {
switch newRed := newRed.(type) {
case *cdssdk.RepRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
updating, err = t.noneToRep(obj, &defRep, newRepNodes)
updating, err = t.noneToRep(obj, newRed, newRepNodes)

case *cdssdk.ECRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
updating, err = t.noneToEC(obj, newRed, newECNodes)

case *cdssdk.LRCRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
updating, err = t.noneToLRC(obj, newRed, selectedNodes)
}

case *cdssdk.RepRedundancy:
if shouldUseEC {
switch newRed := newRed.(type) {
case *cdssdk.RepRedundancy:
updating, err = t.repToRep(obj, srcRed, rechoosedRepNodes)

case *cdssdk.ECRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
updating, err = t.repToEC(obj, &defEC, newECNodes)
} else {
updating, err = t.repToRep(obj, &defRep, rechoosedRepNodes)
updating, err = t.repToEC(obj, newRed, newECNodes)
}

case *cdssdk.ECRedundancy:
if shouldUseEC {
uploadNodes := t.rechooseNodesForEC(obj, red, allNodes)
updating, err = t.ecToEC(obj, red, &defEC, uploadNodes)
} else {
switch newRed := newRed.(type) {
case *cdssdk.RepRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
updating, err = t.ecToRep(obj, red, &defRep, newRepNodes)
updating, err = t.ecToRep(obj, srcRed, newRed, newRepNodes)

case *cdssdk.ECRedundancy:
uploadNodes := t.rechooseNodesForEC(obj, srcRed, userAllNodes)
updating, err = t.ecToEC(obj, srcRed, newRed, uploadNodes)
}

case *cdssdk.LRCRedundancy:
switch newRed := newRed.(type) {
case *cdssdk.LRCRedundancy:
uploadNodes := t.rechooseNodesForLRC(obj, srcRed, userAllNodes)
updating, err = t.lrcToLRC(obj, srcRed, newRed, uploadNodes)
}
}

@@ -192,6 +212,20 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
}
}

func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllNodes map[cdssdk.NodeID]*NodeLoadInfo) (cdssdk.Redundancy, []*NodeLoadInfo) {
switch obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
newLRCNodes := t.chooseNewNodesForLRC(&cdssdk.DefaultLRCRedundancy, userAllNodes)
return &cdssdk.DefaultLRCRedundancy, newLRCNodes

case *cdssdk.LRCRedundancy:
newLRCNodes := t.rechooseNodesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllNodes)
return &cdssdk.DefaultLRCRedundancy, newLRCNodes

}
return nil, nil
}

// 统计每个对象块所在的节点,选出块最多的不超过nodeCnt个节点
func (t *CheckPackageRedundancy) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail, nodeCnt int) []cdssdk.NodeID {
type nodeBlocks struct {
@@ -253,6 +287,19 @@ func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, a
return t.chooseSoManyNodes(red.N, sortedNodes)
}

func (t *CheckPackageRedundancy) chooseNewNodesForLRC(red *cdssdk.LRCRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

return right.LoadsRecentYear - left.LoadsRecentYear
})

return t.chooseSoManyNodes(red.N, sortedNodes)
}

func (t *CheckPackageRedundancy) rechooseNodesForRep(mostBlockNodeIDs []cdssdk.NodeID, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
type rechooseNode struct {
*NodeLoadInfo
@@ -334,6 +381,47 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red
return t.chooseSoManyNodes(red.N, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo }))
}

func (t *CheckPackageRedundancy) rechooseNodesForLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
type rechooseNode struct {
*NodeLoadInfo
CachedBlockIndex int
}

var rechooseNodes []*rechooseNode
for _, node := range allNodes {
cachedBlockIndex := -1
for _, block := range obj.Blocks {
if block.NodeID == node.Node.NodeID {
cachedBlockIndex = block.Index
break
}
}

rechooseNodes = append(rechooseNodes, &rechooseNode{
NodeLoadInfo: node,
CachedBlockIndex: cachedBlockIndex,
})
}

sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

// 已经缓存了文件块的节点优先选择
v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
if v != 0 {
return v
}

return right.LoadsRecentYear - left.LoadsRecentYear
})

// TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
return t.chooseSoManyNodes(red.N, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo }))
}

func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadInfo) []*NodeLoadInfo {
repeateCount := (count + len(nodes) - 1) / len(nodes)
extedNodes := make([]*NodeLoadInfo, repeateCount*len(nodes))
@@ -449,6 +537,55 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E
}, nil
}

func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

if len(obj.Blocks) == 0 {
return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to ec")
}

getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.Blocks[0].NodeID}))
if err != nil {
return nil, fmt.Errorf("requesting to get nodes: %w", err)
}

var toes []ioswitchlrc.To
for i := 0; i < red.N; i++ {
toes = append(toes, ioswitchlrc.NewToNode(uploadNodes[i].Node, i, fmt.Sprintf("%d", i)))
}

plans := exec.NewPlanBuilder()
err = lrcparser.Encode(ioswitchlrc.NewFromNode(obj.Object.FileHash, &getNodes.Nodes[0], -1), toes, plans)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

ioRet, err := plans.Execute().Wait(context.TODO())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}

var blocks []stgmod.ObjectBlock
for i := 0; i < red.N; i++ {
blocks = append(blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: i,
NodeID: uploadNodes[i].Node.NodeID,
FileHash: ioRet[fmt.Sprintf("%d", i)].(string),
})
}

return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID,
Redundancy: red,
Blocks: blocks,
}, nil
}

func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
if len(obj.Blocks) == 0 {
return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep")
@@ -652,6 +789,198 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.
}, nil
}

func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

blocksGrpByIndex := obj.GroupBlocks()

var lostBlocks []int
var lostBlockGrps []int
canGroupReconstruct := true

allBlockFlags := make([]bool, srcRed.N)
for _, block := range blocksGrpByIndex {
allBlockFlags[block.Index] = true
}

for i, ok := range allBlockFlags {
grpID := srcRed.FindGroup(i)
if !ok {
if grpID == -1 {
canGroupReconstruct = false
break
}

if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID {
canGroupReconstruct = false
break
}

lostBlocks = append(lostBlocks, i)
lostBlockGrps = append(lostBlockGrps, grpID)
}
}

if canGroupReconstruct {
return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadNodes)
}

return t.reconstructLRC(obj, blocksGrpByIndex, srcRed, uploadNodes)
}

func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
grped := make(map[int]stgmod.GrouppedObjectBlock)
for _, b := range grpedBlocks {
grped[b.Index] = b
}

plans := exec.NewPlanBuilder()

for i := 0; i < len(lostBlocks); i++ {
var froms []ioswitchlrc.From
grpEles := red.GetGroupElements(lostBlockGrps[i])
for _, ele := range grpEles {
if ele == lostBlocks[i] {
continue
}

froms = append(froms, ioswitchlrc.NewFromNode(grped[ele].FileHash, nil, ele))
}

err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{
ioswitchlrc.NewToNode(uploadNodes[i].Node, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])),
}, plans)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}
}

fmt.Printf("plans: %v\n", plans)

// 如果没有任何Plan,Wait会直接返回成功
ret, err := plans.Execute().Wait(context.TODO())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}

var newBlocks []stgmod.ObjectBlock
for _, i := range lostBlocks {
newBlocks = append(newBlocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: i,
NodeID: uploadNodes[i].Node.NodeID,
FileHash: ret[fmt.Sprintf("%d", i)].(string),
})
}
for _, b := range grpedBlocks {
for _, nodeID := range b.NodeIDs {
newBlocks = append(newBlocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: b.Index,
NodeID: nodeID,
FileHash: b.FileHash,
})
}
}

return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID,
Redundancy: red,
Blocks: newBlocks,
}, nil
}

func (t *CheckPackageRedundancy) reconstructLRC(obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
var chosenBlocks []stgmod.GrouppedObjectBlock
for _, block := range grpBlocks {
if len(block.NodeIDs) > 0 && block.Index < red.M() {
chosenBlocks = append(chosenBlocks, block)
}

if len(chosenBlocks) == red.K {
break
}
}

if len(chosenBlocks) < red.K {
return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
}

// 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
planBlder := exec.NewPlanBuilder()

var froms []ioswitchlrc.From
var toes []ioswitchlrc.To
var newBlocks []stgmod.ObjectBlock
shouldUpdateBlocks := false
for i, node := range uploadNodes {
newBlock := stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: i,
NodeID: node.Node.NodeID,
}

grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })

// 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
if ok && lo.Contains(grp.NodeIDs, node.Node.NodeID) {
newBlock.FileHash = grp.FileHash
newBlocks = append(newBlocks, newBlock)
continue
}

shouldUpdateBlocks = true

// 否则就要重建出这个节点需要的块

for _, block := range chosenBlocks {
fmt.Printf("b: %v\n", block.Index)
froms = append(froms, ioswitchlrc.NewFromNode(block.FileHash, &node.Node, block.Index))
}

// 输出只需要自己要保存的那一块
toes = append(toes, ioswitchlrc.NewToNode(node.Node, i, fmt.Sprintf("%d", i)))

newBlocks = append(newBlocks, newBlock)
}

err := lrcparser.ReconstructAny(froms, toes, planBlder)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

fmt.Printf("plans: %v\n", planBlder)

// 如果没有任何Plan,Wait会直接返回成功
ret, err := planBlder.Execute().Wait(context.TODO())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}

if !shouldUpdateBlocks {
return nil, nil
}

for k, v := range ret {
idx, err := strconv.ParseInt(k, 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
}

newBlocks[idx].FileHash = v.(string)
}

return &coormq.UpdatingObjectRedundancy{
ObjectID: obj.Object.ObjectID,
Redundancy: red,
Blocks: newBlocks,
}, nil
}

func (t *CheckPackageRedundancy) pinObject(nodeID cdssdk.NodeID, fileHash string) error {
agtCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {


Loading…
Cancel
Save