Browse Source

迁移代码

gitlink
Sydonian 7 months ago
parent
commit
2948d1023f
9 changed files with 1326 additions and 0 deletions
  1. +104
    -0
      client/internal/downloader/cache.go
  2. +8
    -0
      client/internal/downloader/config.go
  3. +154
    -0
      client/internal/downloader/downloader.go
  4. +204
    -0
      client/internal/downloader/iterator.go
  5. +73
    -0
      client/internal/downloader/lrc.go
  6. +199
    -0
      client/internal/downloader/lrc_strip_iterator.go
  7. +6
    -0
      client/internal/downloader/strategy/config.go
  8. +337
    -0
      client/internal/downloader/strategy/selector.go
  9. +241
    -0
      client/internal/downloader/strip_iterator.go

+ 104
- 0
client/internal/downloader/cache.go View File

@@ -0,0 +1,104 @@
package downloader

import (
"sort"

lru "github.com/hashicorp/golang-lru/v2"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type LRUID int64

type cacheFile struct {
Segments []*fileSegment
}

type fileSegment struct {
LRUID LRUID
Offset int64
Data []byte
}

type lruEntry struct {
FileHash string
Offset int64
}

type Cache struct {
lru *lru.Cache[LRUID, lruEntry]
nextLRUID LRUID
files map[string]*cacheFile
}

func NewCache(size int) *Cache {
c := &Cache{
files: make(map[string]*cacheFile),
}

lru, _ := lru.NewWithEvict(size, c.onEvict)
c.lru = lru

return c
}

func (c *Cache) Put(fileHash string, offset int64, data []byte) {
file, ok := c.files[fileHash]
if !ok {
file = &cacheFile{}
c.files[fileHash] = file
}

idx := sort.Search(len(file.Segments), upperBound(file.Segments, offset))

// 允许不同Segment之间有重叠,只在Offset相等时替换数据
if idx < len(file.Segments) && file.Segments[idx].Offset == offset {
file.Segments[idx].Data = data
// Get一下更新LRU
c.lru.Get(file.Segments[idx].LRUID)
} else {
file.Segments = lo2.Insert(file.Segments, idx, &fileSegment{
LRUID: c.nextLRUID,
Offset: offset,
Data: data,
})
c.lru.Add(c.nextLRUID, lruEntry{
FileHash: fileHash,
Offset: offset,
})
c.nextLRUID++
}
}

func (c *Cache) Get(fileHash string, offset int64) []byte {
file, ok := c.files[fileHash]
if !ok {
return nil
}

idx := sort.Search(len(file.Segments), upperBound(file.Segments, offset))
if idx == 0 {
return nil
}
seg := file.Segments[idx-1]
// Get一下更新LRU
c.lru.Get(seg.LRUID)

return seg.Data[offset-seg.Offset:]
}

func (c *Cache) onEvict(key LRUID, value lruEntry) {
// 不应该找不到文件或者分片
file := c.files[value.FileHash]
idx := sort.Search(len(file.Segments), upperBound(file.Segments, value.Offset))
file.Segments = lo2.RemoveAt(file.Segments, idx)
if len(file.Segments) == 0 {
delete(c.files, value.FileHash)
}
}

// 使用此函数会找到第一个大于等于 target 的索引,如果找不到,则返回 len(seg)
func upperBound(seg []*fileSegment, target int64) func(int) bool {
return func(i int) bool {
return seg[i].Offset >= target
}
}

+ 8
- 0
client/internal/downloader/config.go View File

@@ -0,0 +1,8 @@
package downloader

type Config struct {
// EC模式的Object的条带缓存数量
MaxStripCacheCount int `json:"maxStripCacheCount"`
// EC模式下,每个Object的条带的预取数量,最少为1
ECStripPrefetchCount int `json:"ecStripPrefetchCount"`
}

+ 154
- 0
client/internal/downloader/downloader.go View File

@@ -0,0 +1,154 @@
package downloader

import (
"fmt"
"io"

lru "github.com/hashicorp/golang-lru/v2"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/types"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool"
)

const (
DefaultMaxStripCacheCount = 128
)

type DownloadIterator = iterator.Iterator[*Downloading]

type DownloadReqeust struct {
ObjectID types.ObjectID
Offset int64
Length int64
}

type downloadReqeust2 struct {
Detail *types.ObjectDetail
Raw DownloadReqeust
}

type Downloading struct {
Object *types.Object
File io.ReadCloser // 文件流,如果文件不存在,那么为nil
Request DownloadReqeust
}

type Downloader struct {
strips *StripCache
cfg Config
conn *connectivity.Collector
stgAgts *agtpool.AgentPool
selector *strategy.Selector
db *db.DB
}

func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.AgentPool, sel *strategy.Selector, db *db.DB) Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}

ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount)
return Downloader{
strips: ch,
cfg: cfg,
conn: conn,
stgAgts: stgAgts,
selector: sel,
db: db,
}
}

func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

objIDs := make([]types.ObjectID, len(reqs))
for i, req := range reqs {
objIDs[i] = req.ObjectID
}

if len(objIDs) == 0 {
return iterator.Empty[*Downloading]()
}

// objDetails, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails(objIDs))
// if err != nil {
// return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err))
// }
objDetails, err := d.db.GetObjectDetails(objIDs)
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("request to db: %w", err))
}

req2s := make([]downloadReqeust2, len(reqs))
for i, req := range reqs {
req2s[i] = downloadReqeust2{
Detail: objDetails.Objects[i],
Raw: req,
}
}

return NewDownloadObjectIterator(d, req2s)
}

func (d *Downloader) DownloadObjectByDetail(detail types.ObjectDetail, off int64, length int64) (*Downloading, error) {
req2s := []downloadReqeust2{{
Detail: &detail,
Raw: DownloadReqeust{
ObjectID: detail.Object.ObjectID,
Offset: off,
Length: length,
},
}}

iter := NewDownloadObjectIterator(d, req2s)
return iter.MoveNext()
}

func (d *Downloader) DownloadPackage(pkgID types.PackageID) DownloadIterator {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

pkgDetail, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(pkgID))
if err != nil {
return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err))
}

req2s := make([]downloadReqeust2, len(pkgDetail.Objects))
for i, objDetail := range pkgDetail.Objects {
dt := objDetail
req2s[i] = downloadReqeust2{
Detail: &dt,
Raw: DownloadReqeust{
ObjectID: objDetail.Object.ObjectID,
Offset: 0,
Length: objDetail.Object.Size,
},
}
}

return NewDownloadObjectIterator(d, req2s)
}

type ObjectECStrip struct {
Data []byte
ObjectFileHash types.FileHash // 添加这条缓存时,Object的FileHash
}

type ECStripKey struct {
ObjectID types.ObjectID
StripIndex int64
}

type StripCache = lru.Cache[ECStripKey, ObjectECStrip]

+ 204
- 0
client/internal/downloader/iterator.go View File

@@ -0,0 +1,204 @@
package downloader

import (
"context"
"fmt"
"io"
"reflect"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/storage2/common/pkgs/iterator"
)

type downloadStorageInfo struct {
Storage stgmod.StorageDetail
ObjectPinned bool
Blocks []stgmod.ObjectBlock
Distance float64
}

type DownloadContext struct {
Distlock *distlock.Service
}
type DownloadObjectIterator struct {
OnClosing func()
downloader *Downloader
reqs []downloadReqeust2
currentIndex int
}

func NewDownloadObjectIterator(downloader *Downloader, downloadObjs []downloadReqeust2) *DownloadObjectIterator {
return &DownloadObjectIterator{
downloader: downloader,
reqs: downloadObjs,
}
}

func (i *DownloadObjectIterator) MoveNext() (*Downloading, error) {
if i.currentIndex >= len(i.reqs) {
return nil, iterator.ErrNoMoreItem
}

req := i.reqs[i.currentIndex]
if req.Detail == nil {
return &Downloading{
Object: nil,
File: nil,
Request: req.Raw,
}, nil
}

destHub := cdssdk.HubID(0)
if stgglb.Local.HubID != nil {
destHub = *stgglb.Local.HubID
}

strg, err := i.downloader.selector.Select(strategy.Request{
Detail: *req.Detail,
Range: math2.NewRange(req.Raw.Offset, req.Raw.Length),
DestHub: destHub,
DestLocation: stgglb.Local.LocationID,
})
if err != nil {
return nil, fmt.Errorf("selecting download strategy: %w", err)
}

var reader io.ReadCloser
switch strg := strg.(type) {
case *strategy.DirectStrategy:
reader, err = i.downloadDirect(req, *strg)
if err != nil {
return nil, fmt.Errorf("downloading object %v: %w", req.Raw.ObjectID, err)
}

case *strategy.ECReconstructStrategy:
reader, err = i.downloadECReconstruct(req, *strg)
if err != nil {
return nil, fmt.Errorf("downloading ec object %v: %w", req.Raw.ObjectID, err)
}

case *strategy.LRCReconstructStrategy:
reader, err = i.downloadLRCReconstruct(req, *strg)
if err != nil {
return nil, fmt.Errorf("downloading lrc object %v: %w", req.Raw.ObjectID, err)
}

default:
return nil, fmt.Errorf("unsupported strategy type: %v", reflect.TypeOf(strg))
}

i.currentIndex++
return &Downloading{
Object: &req.Detail.Object,
File: reader,
Request: req.Raw,
}, nil
}

func (i *DownloadObjectIterator) Close() {
if i.OnClosing != nil {
i.OnClosing()
}
}

func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strategy.DirectStrategy) (io.ReadCloser, error) {
logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.Storage.Storage.String())

var strHandle *exec.DriverReadStream
ft := ioswitch2.NewFromTo()

toExec, handle := ioswitch2.NewToDriver(ioswitch2.RawStream())
toExec.Range = math2.Range{
Offset: req.Raw.Offset,
}
if req.Raw.Length != -1 {
len := req.Raw.Length
toExec.Range.Length = &len
}

ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage, ioswitch2.RawStream())).AddTo(toExec)
strHandle = handle

plans := exec.NewPlanBuilder()
if err := parser.Parse(ft, plans); err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, i.downloader.stgAgts)
exec := plans.Execute(exeCtx)
go exec.Wait(context.TODO())

return exec.BeginRead(strHandle)
}

func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, strg strategy.ECReconstructStrategy) (io.ReadCloser, error) {
var logStrs []any = []any{fmt.Sprintf("downloading ec object %v from: ", req.Raw.ObjectID)}
for i, b := range strg.Blocks {
if i > 0 {
logStrs = append(logStrs, ", ")
}

logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String()))
}
logger.Debug(logStrs...)

downloadBlks := make([]downloadBlock, len(strg.Blocks))
for i, b := range strg.Blocks {
downloadBlks[i] = downloadBlock{
Block: b,
Storage: strg.Storages[i],
}
}

pr, pw := io.Pipe()
go func() {
readPos := req.Raw.Offset
totalReadLen := req.Detail.Object.Size - req.Raw.Offset
if req.Raw.Length >= 0 {
totalReadLen = math2.Min(req.Raw.Length, totalReadLen)
}

firstStripIndex := readPos / strg.Redundancy.StripSize()
stripIter := NewStripIterator(i.downloader, req.Detail.Object, downloadBlks, strg.Redundancy, firstStripIndex, i.downloader.strips, i.downloader.cfg.ECStripPrefetchCount)
defer stripIter.Close()

for totalReadLen > 0 {
strip, err := stripIter.MoveNext()
if err == iterator.ErrNoMoreItem {
pw.CloseWithError(io.ErrUnexpectedEOF)
return
}
if err != nil {
pw.CloseWithError(err)
return
}

readRelativePos := readPos - strip.Position
curReadLen := math2.Min(totalReadLen, strg.Redundancy.StripSize()-readRelativePos)

err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen])
if err != nil {
pw.CloseWithError(err)
return
}

totalReadLen -= curReadLen
readPos += curReadLen
}
pw.Close()
}()

return pr, nil
}

+ 73
- 0
client/internal/downloader/lrc.go View File

@@ -0,0 +1,73 @@
package downloader

import (
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
)

func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2, strg strategy.LRCReconstructStrategy) (io.ReadCloser, error) {
var logStrs []any = []any{fmt.Sprintf("downloading lrc object %v from: ", req.Raw.ObjectID)}
for i, b := range strg.Blocks {
if i > 0 {
logStrs = append(logStrs, ", ")
}

logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String()))
}
logger.Debug(logStrs...)

downloadBlks := make([]downloadBlock, len(strg.Blocks))
for i, b := range strg.Blocks {
downloadBlks[i] = downloadBlock{
Block: b,
Storage: strg.Storages[i],
}
}

pr, pw := io.Pipe()
go func() {
readPos := req.Raw.Offset
totalReadLen := req.Detail.Object.Size - req.Raw.Offset
if req.Raw.Length >= 0 {
totalReadLen = math2.Min(req.Raw.Length, totalReadLen)
}

firstStripIndex := readPos / int64(strg.Redundancy.K) / int64(strg.Redundancy.ChunkSize)
stripIter := NewLRCStripIterator(iter.downloader, req.Detail.Object, downloadBlks, strg.Redundancy, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
defer stripIter.Close()

for totalReadLen > 0 {
strip, err := stripIter.MoveNext()
if err == iterator.ErrNoMoreItem {
pw.CloseWithError(io.ErrUnexpectedEOF)
return
}
if err != nil {
pw.CloseWithError(err)
return
}

readRelativePos := readPos - strip.Position
nextStripPos := strip.Position + int64(strg.Redundancy.K)*int64(strg.Redundancy.ChunkSize)
curReadLen := math2.Min(totalReadLen, nextStripPos-readPos)

err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen])
if err != nil {
pw.CloseWithError(err)
return
}

totalReadLen -= curReadLen
readPos += curReadLen
}
pw.Close()
}()

return pr, nil
}

+ 199
- 0
client/internal/downloader/lrc_strip_iterator.go View File

@@ -0,0 +1,199 @@
package downloader

import (
"context"
"io"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/parser"
)

type LRCStripIterator struct {
downloder *Downloader
object cdssdk.Object
blocks []downloadBlock
red cdssdk.LRCRedundancy
curStripIndex int64
cache *StripCache
dataChan chan dataChanEntry
downloadingDone chan any
downloadingDoneOnce sync.Once
inited bool
}

func NewLRCStripIterator(downloder *Downloader, object cdssdk.Object, blocks []downloadBlock, red cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator {
if maxPrefetch <= 0 {
maxPrefetch = 1
}

iter := &LRCStripIterator{
downloder: downloder,
object: object,
blocks: blocks,
red: red,
curStripIndex: beginStripIndex,
cache: cache,
dataChan: make(chan dataChanEntry, maxPrefetch-1),
downloadingDone: make(chan any),
}

return iter
}

func (s *LRCStripIterator) MoveNext() (Strip, error) {
if !s.inited {
go s.downloading()
s.inited = true
}

// 先尝试获取一下,用于判断本次获取是否发生了等待
select {
case entry, ok := <-s.dataChan:
if !ok || entry.Error == io.EOF {
return Strip{}, iterator.ErrNoMoreItem
}

if entry.Error != nil {
return Strip{}, entry.Error
}

s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

default:
logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID)
}

// 发生了等待
select {
case entry, ok := <-s.dataChan:
if !ok || entry.Error == io.EOF {
return Strip{}, iterator.ErrNoMoreItem
}

if entry.Error != nil {
return Strip{}, entry.Error
}

s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

case <-s.downloadingDone:
return Strip{}, iterator.ErrNoMoreItem
}
}

func (s *LRCStripIterator) Close() {
s.downloadingDoneOnce.Do(func() {
close(s.downloadingDone)
})
}

func (s *LRCStripIterator) downloading() {
var froms []ioswitchlrc.From
for _, b := range s.blocks {
stg := b.Storage
froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *stg.MasterHub, stg.Storage, b.Block.Index))
}

toExec, hd := ioswitchlrc.NewToDriverWithRange(-1, math2.Range{
Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K),
})

plans := exec.NewPlanBuilder()
err := parser.ReconstructAny(froms, []ioswitchlrc.To{toExec}, plans)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, s.downloder.stgAgts)

exec := plans.Execute(exeCtx)

ctx, cancel := context.WithCancel(context.Background())
go exec.Wait(ctx)
defer cancel()

str, err := exec.BeginRead(hd)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}

curStripIndex := s.curStripIndex
loop:
for {
stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize)
if stripBytesPos >= s.object.Size {
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break
}

stripKey := ECStripKey{
ObjectID: s.object.ObjectID,
StripIndex: curStripIndex,
}

item, ok := s.cache.Get(stripKey)
if ok {
if item.ObjectFileHash == s.object.FileHash {
if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) {
break loop
}
curStripIndex++
continue

} else {
// 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载
s.cache.Remove(stripKey)
}
}

dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
n, err := io.ReadFull(str, dataBuf)
if err == io.ErrUnexpectedEOF {
s.cache.Add(stripKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: s.object.FileHash,
})

s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos})
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break loop
}
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
break loop
}

s.cache.Add(stripKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: s.object.FileHash,
})

if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) {
break loop
}

curStripIndex++
}

close(s.dataChan)
}

func (s *LRCStripIterator) sendToDataChan(entry dataChanEntry) bool {
select {
case s.dataChan <- entry:
return true
case <-s.downloadingDone:
return false
}
}

+ 6
- 0
client/internal/downloader/strategy/config.go View File

@@ -0,0 +1,6 @@
package strategy

type Config struct {
// 当到下载节点的延迟高于这个值时,该节点在评估时会有更高的分数惩罚,单位:ms
HighLatencyHubMs float64 `json:"highLatencyHubMs"`
}

+ 337
- 0
client/internal/downloader/strategy/selector.go View File

@@ -0,0 +1,337 @@
package strategy

import (
"fmt"
"math"
"reflect"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/bitmap"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/storage2/common/consts"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/metacache"
)

type Request struct {
Detail stgmod.ObjectDetail
Range math2.Range
DestHub cdssdk.HubID // 可以为0。此字段不为0时,DestLocation字段无意义。
DestLocation cdssdk.LocationID // 可以为0
}

type Strategy interface {
GetDetail() stgmod.ObjectDetail
}

// 直接下载完整对象
type DirectStrategy struct {
Detail stgmod.ObjectDetail
Storage stgmod.StorageDetail
}

func (s *DirectStrategy) GetDetail() stgmod.ObjectDetail {
return s.Detail
}

// 从指定对象重建对象
type ECReconstructStrategy struct {
Detail stgmod.ObjectDetail
Redundancy cdssdk.ECRedundancy
Blocks []stgmod.ObjectBlock
Storages []stgmod.StorageDetail
}

func (s *ECReconstructStrategy) GetDetail() stgmod.ObjectDetail {
return s.Detail
}

type LRCReconstructStrategy struct {
Detail stgmod.ObjectDetail
Redundancy cdssdk.LRCRedundancy
Blocks []stgmod.ObjectBlock
Storages []stgmod.StorageDetail
}

func (s *LRCReconstructStrategy) GetDetail() stgmod.ObjectDetail {
return s.Detail
}

type Selector struct {
cfg Config
storageMeta *metacache.UserSpaceMeta
hubMeta *metacache.HubMeta
connectivity *metacache.Connectivity
}

func NewSelector(cfg Config, storageMeta *metacache.UserSpaceMeta, hubMeta *metacache.HubMeta, connectivity *metacache.Connectivity) *Selector {
return &Selector{
cfg: cfg,
storageMeta: storageMeta,
hubMeta: hubMeta,
connectivity: connectivity,
}
}

func (s *Selector) Select(req Request) (Strategy, error) {
req2 := request2{
Detail: req.Detail,
Range: req.Range,
DestLocation: req.DestLocation,
}

if req.DestHub != 0 {
req2.DestHub = s.hubMeta.Get(req.DestHub)
}

switch red := req.Detail.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
return s.selectForNoneOrRep(req2)

case *cdssdk.RepRedundancy:
return s.selectForNoneOrRep(req2)

case *cdssdk.ECRedundancy:
return s.selectForEC(req2, *red)

case *cdssdk.LRCRedundancy:
return s.selectForLRC(req2, *red)
}

return nil, fmt.Errorf("unsupported redundancy type: %v of object %v", reflect.TypeOf(req.Detail.Object.Redundancy), req.Detail.Object.ObjectID)
}

type downloadStorageInfo struct {
Storage stgmod.StorageDetail
ObjectPinned bool
Blocks []stgmod.ObjectBlock
Distance float64
}

type downloadBlock struct {
Storage stgmod.StorageDetail
Block stgmod.ObjectBlock
}

type request2 struct {
Detail stgmod.ObjectDetail
Range math2.Range
DestHub *cdssdk.Hub
DestLocation cdssdk.LocationID
}

func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) {
sortedStgs := s.sortDownloadStorages(req)
if len(sortedStgs) == 0 {
return nil, fmt.Errorf("no storage available for download")
}

_, blks := s.getMinReadingBlockSolution(sortedStgs, 1)
if len(blks) == 0 {
return nil, fmt.Errorf("no block available for download")
}

return &DirectStrategy{
Detail: req.Detail,
Storage: sortedStgs[0].Storage,
}, nil
}

func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy, error) {
sortedStgs := s.sortDownloadStorages(req)
if len(sortedStgs) == 0 {
return nil, fmt.Errorf("no storage available for download")
}

bsc, blocks := s.getMinReadingBlockSolution(sortedStgs, red.K)
osc, stg := s.getMinReadingObjectSolution(sortedStgs, red.K)

if bsc < osc {
bs := make([]stgmod.ObjectBlock, len(blocks))
ss := make([]stgmod.StorageDetail, len(blocks))
for i, b := range blocks {
bs[i] = b.Block
ss[i] = b.Storage
}

return &ECReconstructStrategy{
Detail: req.Detail,
Redundancy: red,
Blocks: bs,
Storages: ss,
}, nil
}

// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
if osc == math.MaxFloat64 {
return nil, fmt.Errorf("no enough blocks to reconstruct the object %v , want %d, get only %d", req.Detail.Object.ObjectID, red.K, len(blocks))
}

return &DirectStrategy{
Detail: req.Detail,
Storage: stg,
}, nil
}

func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strategy, error) {
sortedStgs := s.sortDownloadStorages(req)
if len(sortedStgs) == 0 {
return nil, fmt.Errorf("no storage available for download")
}

var blocks []downloadBlock
selectedBlkIdx := make(map[int]bool)
for _, stg := range sortedStgs {
for _, b := range stg.Blocks {
if b.Index >= red.M() || selectedBlkIdx[b.Index] {
continue
}
blocks = append(blocks, downloadBlock{
Storage: stg.Storage,
Block: b,
})
selectedBlkIdx[b.Index] = true
}
}
if len(blocks) < red.K {
return nil, fmt.Errorf("not enough blocks to download lrc object")
}

bs := make([]stgmod.ObjectBlock, len(blocks))
ss := make([]stgmod.StorageDetail, len(blocks))
for i, b := range blocks {
bs[i] = b.Block
ss[i] = b.Storage
}

return &LRCReconstructStrategy{
Detail: req.Detail,
Redundancy: red,
Blocks: bs,
Storages: ss,
}, nil
}

func (s *Selector) sortDownloadStorages(req request2) []*downloadStorageInfo {
var stgIDs []cdssdk.StorageID
for _, id := range req.Detail.PinnedAt {
if !lo.Contains(stgIDs, id) {
stgIDs = append(stgIDs, id)
}
}
for _, b := range req.Detail.Blocks {
if !lo.Contains(stgIDs, b.StorageID) {
stgIDs = append(stgIDs, b.StorageID)
}
}

downloadStorageMap := make(map[cdssdk.StorageID]*downloadStorageInfo)
for _, id := range req.Detail.PinnedAt {
storage, ok := downloadStorageMap[id]
if !ok {
mod := s.storageMeta.Get(id)
if mod == nil || mod.MasterHub == nil {
continue
}

storage = &downloadStorageInfo{
Storage: *mod,
ObjectPinned: true,
Distance: s.getStorageDistance(req, *mod),
}
downloadStorageMap[id] = storage
}

storage.ObjectPinned = true
}

for _, b := range req.Detail.Blocks {
storage, ok := downloadStorageMap[b.StorageID]
if !ok {
mod := s.storageMeta.Get(b.StorageID)
if mod == nil || mod.MasterHub == nil {
continue
}

storage = &downloadStorageInfo{
Storage: *mod,
Distance: s.getStorageDistance(req, *mod),
}
downloadStorageMap[b.StorageID] = storage
}

storage.Blocks = append(storage.Blocks, b)
}

return sort2.Sort(lo.Values(downloadStorageMap), func(left, right *downloadStorageInfo) int {
return sort2.Cmp(left.Distance, right.Distance)
})
}

func (s *Selector) getStorageDistance(req request2, src stgmod.StorageDetail) float64 {
if req.DestHub != nil {
if src.MasterHub.HubID == req.DestHub.HubID {
return consts.StorageDistanceSameStorage
}

if src.MasterHub.LocationID == req.DestHub.LocationID {
return consts.StorageDistanceSameLocation
}

latency := s.connectivity.Get(src.MasterHub.HubID, req.DestHub.HubID)
if latency == nil || *latency > time.Duration(float64(time.Millisecond)*s.cfg.HighLatencyHubMs) {
return consts.HubDistanceHighLatencyHub
}

return consts.StorageDistanceOther
}

if req.DestLocation != 0 {
if src.MasterHub.LocationID == req.DestLocation {
return consts.StorageDistanceSameLocation
}
}

return consts.StorageDistanceOther
}

func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo, k int) (float64, []downloadBlock) {
gotBlocksMap := bitmap.Bitmap64(0)
var gotBlocks []downloadBlock
dist := float64(0.0)
for _, n := range sortedStgs {
for _, b := range n.Blocks {
if !gotBlocksMap.Get(b.Index) {
gotBlocks = append(gotBlocks, downloadBlock{
Storage: n.Storage,
Block: b,
})
gotBlocksMap.Set(b.Index, true)
dist += n.Distance
}

if len(gotBlocks) >= k {
return dist, gotBlocks
}
}
}

return math.MaxFloat64, gotBlocks
}

func (s *Selector) getMinReadingObjectSolution(sortedStgs []*downloadStorageInfo, k int) (float64, stgmod.StorageDetail) {
dist := math.MaxFloat64
var downloadStg stgmod.StorageDetail
for _, n := range sortedStgs {
if n.ObjectPinned && float64(k)*n.Distance < dist {
dist = float64(k) * n.Distance
stg := n.Storage
downloadStg = stg
}
}

return dist, downloadStg
}

+ 241
- 0
client/internal/downloader/strip_iterator.go View File

@@ -0,0 +1,241 @@
package downloader

import (
"context"
"io"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/math2"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
)

type downloadBlock struct {
Storage stgmod.StorageDetail
Block stgmod.ObjectBlock
}

type Strip struct {
Data []byte
Position int64
}

type StripIterator struct {
downloader *Downloader
object cdssdk.Object
blocks []downloadBlock
red cdssdk.ECRedundancy
curStripIndex int64
cache *StripCache
dataChan chan dataChanEntry
downloadingDone chan any
downloadingDoneOnce sync.Once
inited bool
downloadingStream io.ReadCloser
downloadingStripIndex int64
downloadingPlanCtxCancel func()
}

type dataChanEntry struct {
Data []byte
Position int64 // 条带在文件中的位置。字节为单位
Error error
}

func NewStripIterator(downloader *Downloader, object cdssdk.Object, blocks []downloadBlock, red cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator {
if maxPrefetch <= 0 {
maxPrefetch = 1
}

iter := &StripIterator{
downloader: downloader,
object: object,
blocks: blocks,
red: red,
curStripIndex: beginStripIndex,
cache: cache,
dataChan: make(chan dataChanEntry, maxPrefetch-1),
downloadingDone: make(chan any),
}

return iter
}

func (s *StripIterator) MoveNext() (Strip, error) {
if !s.inited {
go s.downloading(s.curStripIndex)
s.inited = true
}

// 先尝试获取一下,用于判断本次获取是否发生了等待
select {
case entry, ok := <-s.dataChan:
if !ok || entry.Error == io.EOF {
return Strip{}, iterator.ErrNoMoreItem
}

if entry.Error != nil {
return Strip{}, entry.Error
}

s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

default:
logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID)
}

// 发生了等待
select {
case entry, ok := <-s.dataChan:
if !ok || entry.Error == io.EOF {
return Strip{}, iterator.ErrNoMoreItem
}

if entry.Error != nil {
return Strip{}, entry.Error
}

s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

case <-s.downloadingDone:
return Strip{}, iterator.ErrNoMoreItem
}
}

func (s *StripIterator) Close() {
s.downloadingDoneOnce.Do(func() {
close(s.downloadingDone)
})
}

func (s *StripIterator) downloading(startStripIndex int64) {
curStripIndex := startStripIndex
loop:
for {
stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize)
if stripBytesPos >= s.object.Size {
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break
}

stripKey := ECStripKey{
ObjectID: s.object.ObjectID,
StripIndex: curStripIndex,
}

item, ok := s.cache.Get(stripKey)
if ok {
if item.ObjectFileHash == s.object.FileHash {
if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) {
break loop
}
curStripIndex++
continue

} else {
// 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载
s.cache.Remove(stripKey)
}
}

dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
n, err := s.readStrip(curStripIndex, dataBuf)
if err == io.ErrUnexpectedEOF {
// dataBuf中的内容可能不足一个条带,但仍然将其完整放入cache中,外部应该自行计算该从这个buffer中读多少数据
s.cache.Add(stripKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: s.object.FileHash,
})

s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos})
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break loop
}
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
break loop
}

s.cache.Add(stripKey, ObjectECStrip{
Data: dataBuf,
ObjectFileHash: s.object.FileHash,
})

if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) {
break loop
}

curStripIndex++
}

if s.downloadingStream != nil {
s.downloadingStream.Close()
s.downloadingPlanCtxCancel()
}
close(s.dataChan)
}

func (s *StripIterator) sendToDataChan(entry dataChanEntry) bool {
select {
case s.dataChan <- entry:
return true
case <-s.downloadingDone:
return false
}
}

func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) {
// 如果需求的条带不当前正在下载的条带的位置不符合,则需要重新打开下载流
if s.downloadingStream == nil || s.downloadingStripIndex != stripIndex {
if s.downloadingStream != nil {
s.downloadingStream.Close()
s.downloadingPlanCtxCancel()
}

ft := ioswitch2.NewFromTo()
ft.ECParam = &s.red
for _, b := range s.blocks {
stg := b.Storage
ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg, ioswitch2.ECStream(b.Block.Index)))
}

toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.Range{
Offset: stripIndex * s.red.StripSize(),
})
ft.AddTo(toExec)

plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)
if err != nil {
return 0, err
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, s.downloader.stgAgts)
exec := plans.Execute(exeCtx)

ctx, cancel := context.WithCancel(context.Background())
go exec.Wait(ctx)

str, err := exec.BeginRead(hd)
if err != nil {
cancel()
return 0, err
}

s.downloadingStream = str
s.downloadingStripIndex = stripIndex
s.downloadingPlanCtxCancel = cancel
}

n, err := io.ReadFull(s.downloadingStream, buf)
s.downloadingStripIndex += 1
return n, err
}

Loading…
Cancel
Save