diff --git a/client/internal/downloader/cache.go b/client/internal/downloader/cache.go new file mode 100644 index 0000000..f8043f1 --- /dev/null +++ b/client/internal/downloader/cache.go @@ -0,0 +1,104 @@ +package downloader + +import ( + "sort" + + lru "github.com/hashicorp/golang-lru/v2" + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type LRUID int64 + +type cacheFile struct { + Segments []*fileSegment +} + +type fileSegment struct { + LRUID LRUID + Offset int64 + Data []byte +} + +type lruEntry struct { + FileHash string + Offset int64 +} + +type Cache struct { + lru *lru.Cache[LRUID, lruEntry] + nextLRUID LRUID + files map[string]*cacheFile +} + +func NewCache(size int) *Cache { + c := &Cache{ + files: make(map[string]*cacheFile), + } + + lru, _ := lru.NewWithEvict(size, c.onEvict) + c.lru = lru + + return c +} + +func (c *Cache) Put(fileHash string, offset int64, data []byte) { + file, ok := c.files[fileHash] + if !ok { + file = &cacheFile{} + c.files[fileHash] = file + } + + idx := sort.Search(len(file.Segments), upperBound(file.Segments, offset)) + + // 允许不同Segment之间有重叠,只在Offset相等时替换数据 + if idx < len(file.Segments) && file.Segments[idx].Offset == offset { + file.Segments[idx].Data = data + // Get一下更新LRU + c.lru.Get(file.Segments[idx].LRUID) + } else { + file.Segments = lo2.Insert(file.Segments, idx, &fileSegment{ + LRUID: c.nextLRUID, + Offset: offset, + Data: data, + }) + c.lru.Add(c.nextLRUID, lruEntry{ + FileHash: fileHash, + Offset: offset, + }) + c.nextLRUID++ + } +} + +func (c *Cache) Get(fileHash string, offset int64) []byte { + file, ok := c.files[fileHash] + if !ok { + return nil + } + + idx := sort.Search(len(file.Segments), upperBound(file.Segments, offset)) + if idx == 0 { + return nil + } + seg := file.Segments[idx-1] + // Get一下更新LRU + c.lru.Get(seg.LRUID) + + return seg.Data[offset-seg.Offset:] +} + +func (c *Cache) onEvict(key LRUID, value lruEntry) { + // 不应该找不到文件或者分片 + file := c.files[value.FileHash] + idx := sort.Search(len(file.Segments), upperBound(file.Segments, value.Offset)) + file.Segments = lo2.RemoveAt(file.Segments, idx) + if len(file.Segments) == 0 { + delete(c.files, value.FileHash) + } +} + +// 使用此函数会找到第一个大于等于 target 的索引,如果找不到,则返回 len(seg) +func upperBound(seg []*fileSegment, target int64) func(int) bool { + return func(i int) bool { + return seg[i].Offset >= target + } +} diff --git a/client/internal/downloader/config.go b/client/internal/downloader/config.go new file mode 100644 index 0000000..9a235a6 --- /dev/null +++ b/client/internal/downloader/config.go @@ -0,0 +1,8 @@ +package downloader + +type Config struct { + // EC模式的Object的条带缓存数量 + MaxStripCacheCount int `json:"maxStripCacheCount"` + // EC模式下,每个Object的条带的预取数量,最少为1 + ECStripPrefetchCount int `json:"ecStripPrefetchCount"` +} diff --git a/client/internal/downloader/downloader.go b/client/internal/downloader/downloader.go new file mode 100644 index 0000000..9e728c2 --- /dev/null +++ b/client/internal/downloader/downloader.go @@ -0,0 +1,154 @@ +package downloader + +import ( + "fmt" + "io" + + lru "github.com/hashicorp/golang-lru/v2" + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/types" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" +) + +const ( + DefaultMaxStripCacheCount = 128 +) + +type DownloadIterator = iterator.Iterator[*Downloading] + +type DownloadReqeust struct { + ObjectID types.ObjectID + Offset int64 + Length int64 +} + +type downloadReqeust2 struct { + Detail *types.ObjectDetail + Raw DownloadReqeust +} + +type Downloading struct { + Object *types.Object + File io.ReadCloser // 文件流,如果文件不存在,那么为nil + Request DownloadReqeust +} + +type Downloader struct { + strips *StripCache + cfg Config + conn *connectivity.Collector + stgAgts *agtpool.AgentPool + selector *strategy.Selector + db *db.DB +} + +func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.AgentPool, sel *strategy.Selector, db *db.DB) Downloader { + if cfg.MaxStripCacheCount == 0 { + cfg.MaxStripCacheCount = DefaultMaxStripCacheCount + } + + ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount) + return Downloader{ + strips: ch, + cfg: cfg, + conn: conn, + stgAgts: stgAgts, + selector: sel, + db: db, + } +} + +func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err)) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + objIDs := make([]types.ObjectID, len(reqs)) + for i, req := range reqs { + objIDs[i] = req.ObjectID + } + + if len(objIDs) == 0 { + return iterator.Empty[*Downloading]() + } + + // objDetails, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails(objIDs)) + // if err != nil { + // return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err)) + // } + objDetails, err := d.db.GetObjectDetails(objIDs) + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("request to db: %w", err)) + } + + req2s := make([]downloadReqeust2, len(reqs)) + for i, req := range reqs { + req2s[i] = downloadReqeust2{ + Detail: objDetails.Objects[i], + Raw: req, + } + } + + return NewDownloadObjectIterator(d, req2s) +} + +func (d *Downloader) DownloadObjectByDetail(detail types.ObjectDetail, off int64, length int64) (*Downloading, error) { + req2s := []downloadReqeust2{{ + Detail: &detail, + Raw: DownloadReqeust{ + ObjectID: detail.Object.ObjectID, + Offset: off, + Length: length, + }, + }} + + iter := NewDownloadObjectIterator(d, req2s) + return iter.MoveNext() +} + +func (d *Downloader) DownloadPackage(pkgID types.PackageID) DownloadIterator { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err)) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + pkgDetail, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(pkgID)) + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err)) + } + + req2s := make([]downloadReqeust2, len(pkgDetail.Objects)) + for i, objDetail := range pkgDetail.Objects { + dt := objDetail + req2s[i] = downloadReqeust2{ + Detail: &dt, + Raw: DownloadReqeust{ + ObjectID: objDetail.Object.ObjectID, + Offset: 0, + Length: objDetail.Object.Size, + }, + } + } + + return NewDownloadObjectIterator(d, req2s) +} + +type ObjectECStrip struct { + Data []byte + ObjectFileHash types.FileHash // 添加这条缓存时,Object的FileHash +} + +type ECStripKey struct { + ObjectID types.ObjectID + StripIndex int64 +} + +type StripCache = lru.Cache[ECStripKey, ObjectECStrip] diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go new file mode 100644 index 0000000..2046bd6 --- /dev/null +++ b/client/internal/downloader/iterator.go @@ -0,0 +1,204 @@ +package downloader + +import ( + "context" + "fmt" + "io" + "reflect" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" + "gitlink.org.cn/cloudream/storage2/common/pkgs/iterator" +) + +type downloadStorageInfo struct { + Storage stgmod.StorageDetail + ObjectPinned bool + Blocks []stgmod.ObjectBlock + Distance float64 +} + +type DownloadContext struct { + Distlock *distlock.Service +} +type DownloadObjectIterator struct { + OnClosing func() + downloader *Downloader + reqs []downloadReqeust2 + currentIndex int +} + +func NewDownloadObjectIterator(downloader *Downloader, downloadObjs []downloadReqeust2) *DownloadObjectIterator { + return &DownloadObjectIterator{ + downloader: downloader, + reqs: downloadObjs, + } +} + +func (i *DownloadObjectIterator) MoveNext() (*Downloading, error) { + if i.currentIndex >= len(i.reqs) { + return nil, iterator.ErrNoMoreItem + } + + req := i.reqs[i.currentIndex] + if req.Detail == nil { + return &Downloading{ + Object: nil, + File: nil, + Request: req.Raw, + }, nil + } + + destHub := cdssdk.HubID(0) + if stgglb.Local.HubID != nil { + destHub = *stgglb.Local.HubID + } + + strg, err := i.downloader.selector.Select(strategy.Request{ + Detail: *req.Detail, + Range: math2.NewRange(req.Raw.Offset, req.Raw.Length), + DestHub: destHub, + DestLocation: stgglb.Local.LocationID, + }) + if err != nil { + return nil, fmt.Errorf("selecting download strategy: %w", err) + } + + var reader io.ReadCloser + switch strg := strg.(type) { + case *strategy.DirectStrategy: + reader, err = i.downloadDirect(req, *strg) + if err != nil { + return nil, fmt.Errorf("downloading object %v: %w", req.Raw.ObjectID, err) + } + + case *strategy.ECReconstructStrategy: + reader, err = i.downloadECReconstruct(req, *strg) + if err != nil { + return nil, fmt.Errorf("downloading ec object %v: %w", req.Raw.ObjectID, err) + } + + case *strategy.LRCReconstructStrategy: + reader, err = i.downloadLRCReconstruct(req, *strg) + if err != nil { + return nil, fmt.Errorf("downloading lrc object %v: %w", req.Raw.ObjectID, err) + } + + default: + return nil, fmt.Errorf("unsupported strategy type: %v", reflect.TypeOf(strg)) + } + + i.currentIndex++ + return &Downloading{ + Object: &req.Detail.Object, + File: reader, + Request: req.Raw, + }, nil +} + +func (i *DownloadObjectIterator) Close() { + if i.OnClosing != nil { + i.OnClosing() + } +} + +func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strategy.DirectStrategy) (io.ReadCloser, error) { + logger.Debugf("downloading object %v from storage %v", req.Raw.ObjectID, strg.Storage.Storage.String()) + + var strHandle *exec.DriverReadStream + ft := ioswitch2.NewFromTo() + + toExec, handle := ioswitch2.NewToDriver(ioswitch2.RawStream()) + toExec.Range = math2.Range{ + Offset: req.Raw.Offset, + } + if req.Raw.Length != -1 { + len := req.Raw.Length + toExec.Range.Length = &len + } + + ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage, ioswitch2.RawStream())).AddTo(toExec) + strHandle = handle + + plans := exec.NewPlanBuilder() + if err := parser.Parse(ft, plans); err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, i.downloader.stgAgts) + exec := plans.Execute(exeCtx) + go exec.Wait(context.TODO()) + + return exec.BeginRead(strHandle) +} + +func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, strg strategy.ECReconstructStrategy) (io.ReadCloser, error) { + var logStrs []any = []any{fmt.Sprintf("downloading ec object %v from: ", req.Raw.ObjectID)} + for i, b := range strg.Blocks { + if i > 0 { + logStrs = append(logStrs, ", ") + } + + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String())) + } + logger.Debug(logStrs...) + + downloadBlks := make([]downloadBlock, len(strg.Blocks)) + for i, b := range strg.Blocks { + downloadBlks[i] = downloadBlock{ + Block: b, + Storage: strg.Storages[i], + } + } + + pr, pw := io.Pipe() + go func() { + readPos := req.Raw.Offset + totalReadLen := req.Detail.Object.Size - req.Raw.Offset + if req.Raw.Length >= 0 { + totalReadLen = math2.Min(req.Raw.Length, totalReadLen) + } + + firstStripIndex := readPos / strg.Redundancy.StripSize() + stripIter := NewStripIterator(i.downloader, req.Detail.Object, downloadBlks, strg.Redundancy, firstStripIndex, i.downloader.strips, i.downloader.cfg.ECStripPrefetchCount) + defer stripIter.Close() + + for totalReadLen > 0 { + strip, err := stripIter.MoveNext() + if err == iterator.ErrNoMoreItem { + pw.CloseWithError(io.ErrUnexpectedEOF) + return + } + if err != nil { + pw.CloseWithError(err) + return + } + + readRelativePos := readPos - strip.Position + curReadLen := math2.Min(totalReadLen, strg.Redundancy.StripSize()-readRelativePos) + + err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen]) + if err != nil { + pw.CloseWithError(err) + return + } + + totalReadLen -= curReadLen + readPos += curReadLen + } + pw.Close() + }() + + return pr, nil +} diff --git a/client/internal/downloader/lrc.go b/client/internal/downloader/lrc.go new file mode 100644 index 0000000..4a1c007 --- /dev/null +++ b/client/internal/downloader/lrc.go @@ -0,0 +1,73 @@ +package downloader + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" +) + +func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2, strg strategy.LRCReconstructStrategy) (io.ReadCloser, error) { + var logStrs []any = []any{fmt.Sprintf("downloading lrc object %v from: ", req.Raw.ObjectID)} + for i, b := range strg.Blocks { + if i > 0 { + logStrs = append(logStrs, ", ") + } + + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Index, strg.Storages[i].Storage.String())) + } + logger.Debug(logStrs...) + + downloadBlks := make([]downloadBlock, len(strg.Blocks)) + for i, b := range strg.Blocks { + downloadBlks[i] = downloadBlock{ + Block: b, + Storage: strg.Storages[i], + } + } + + pr, pw := io.Pipe() + go func() { + readPos := req.Raw.Offset + totalReadLen := req.Detail.Object.Size - req.Raw.Offset + if req.Raw.Length >= 0 { + totalReadLen = math2.Min(req.Raw.Length, totalReadLen) + } + + firstStripIndex := readPos / int64(strg.Redundancy.K) / int64(strg.Redundancy.ChunkSize) + stripIter := NewLRCStripIterator(iter.downloader, req.Detail.Object, downloadBlks, strg.Redundancy, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount) + defer stripIter.Close() + + for totalReadLen > 0 { + strip, err := stripIter.MoveNext() + if err == iterator.ErrNoMoreItem { + pw.CloseWithError(io.ErrUnexpectedEOF) + return + } + if err != nil { + pw.CloseWithError(err) + return + } + + readRelativePos := readPos - strip.Position + nextStripPos := strip.Position + int64(strg.Redundancy.K)*int64(strg.Redundancy.ChunkSize) + curReadLen := math2.Min(totalReadLen, nextStripPos-readPos) + + err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen]) + if err != nil { + pw.CloseWithError(err) + return + } + + totalReadLen -= curReadLen + readPos += curReadLen + } + pw.Close() + }() + + return pr, nil +} diff --git a/client/internal/downloader/lrc_strip_iterator.go b/client/internal/downloader/lrc_strip_iterator.go new file mode 100644 index 0000000..df40aca --- /dev/null +++ b/client/internal/downloader/lrc_strip_iterator.go @@ -0,0 +1,199 @@ +package downloader + +import ( + "context" + "io" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/parser" +) + +type LRCStripIterator struct { + downloder *Downloader + object cdssdk.Object + blocks []downloadBlock + red cdssdk.LRCRedundancy + curStripIndex int64 + cache *StripCache + dataChan chan dataChanEntry + downloadingDone chan any + downloadingDoneOnce sync.Once + inited bool +} + +func NewLRCStripIterator(downloder *Downloader, object cdssdk.Object, blocks []downloadBlock, red cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator { + if maxPrefetch <= 0 { + maxPrefetch = 1 + } + + iter := &LRCStripIterator{ + downloder: downloder, + object: object, + blocks: blocks, + red: red, + curStripIndex: beginStripIndex, + cache: cache, + dataChan: make(chan dataChanEntry, maxPrefetch-1), + downloadingDone: make(chan any), + } + + return iter +} + +func (s *LRCStripIterator) MoveNext() (Strip, error) { + if !s.inited { + go s.downloading() + s.inited = true + } + + // 先尝试获取一下,用于判断本次获取是否发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripIndex++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + default: + logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID) + } + + // 发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripIndex++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + case <-s.downloadingDone: + return Strip{}, iterator.ErrNoMoreItem + } +} + +func (s *LRCStripIterator) Close() { + s.downloadingDoneOnce.Do(func() { + close(s.downloadingDone) + }) +} + +func (s *LRCStripIterator) downloading() { + var froms []ioswitchlrc.From + for _, b := range s.blocks { + stg := b.Storage + froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *stg.MasterHub, stg.Storage, b.Block.Index)) + } + + toExec, hd := ioswitchlrc.NewToDriverWithRange(-1, math2.Range{ + Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K), + }) + + plans := exec.NewPlanBuilder() + err := parser.ReconstructAny(froms, []ioswitchlrc.To{toExec}, plans) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + return + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, s.downloder.stgAgts) + + exec := plans.Execute(exeCtx) + + ctx, cancel := context.WithCancel(context.Background()) + go exec.Wait(ctx) + defer cancel() + + str, err := exec.BeginRead(hd) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + return + } + + curStripIndex := s.curStripIndex +loop: + for { + stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize) + if stripBytesPos >= s.object.Size { + s.sendToDataChan(dataChanEntry{Error: io.EOF}) + break + } + + stripKey := ECStripKey{ + ObjectID: s.object.ObjectID, + StripIndex: curStripIndex, + } + + item, ok := s.cache.Get(stripKey) + if ok { + if item.ObjectFileHash == s.object.FileHash { + if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) { + break loop + } + curStripIndex++ + continue + + } else { + // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载 + s.cache.Remove(stripKey) + } + } + + dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize)) + n, err := io.ReadFull(str, dataBuf) + if err == io.ErrUnexpectedEOF { + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos}) + s.sendToDataChan(dataChanEntry{Error: io.EOF}) + break loop + } + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + break loop + } + + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) { + break loop + } + + curStripIndex++ + } + + close(s.dataChan) +} + +func (s *LRCStripIterator) sendToDataChan(entry dataChanEntry) bool { + select { + case s.dataChan <- entry: + return true + case <-s.downloadingDone: + return false + } +} diff --git a/client/internal/downloader/strategy/config.go b/client/internal/downloader/strategy/config.go new file mode 100644 index 0000000..29c4bb2 --- /dev/null +++ b/client/internal/downloader/strategy/config.go @@ -0,0 +1,6 @@ +package strategy + +type Config struct { + // 当到下载节点的延迟高于这个值时,该节点在评估时会有更高的分数惩罚,单位:ms + HighLatencyHubMs float64 `json:"highLatencyHubMs"` +} diff --git a/client/internal/downloader/strategy/selector.go b/client/internal/downloader/strategy/selector.go new file mode 100644 index 0000000..92570cb --- /dev/null +++ b/client/internal/downloader/strategy/selector.go @@ -0,0 +1,337 @@ +package strategy + +import ( + "fmt" + "math" + "reflect" + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/common/utils/sort2" + "gitlink.org.cn/cloudream/storage2/common/consts" + stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" +) + +type Request struct { + Detail stgmod.ObjectDetail + Range math2.Range + DestHub cdssdk.HubID // 可以为0。此字段不为0时,DestLocation字段无意义。 + DestLocation cdssdk.LocationID // 可以为0 +} + +type Strategy interface { + GetDetail() stgmod.ObjectDetail +} + +// 直接下载完整对象 +type DirectStrategy struct { + Detail stgmod.ObjectDetail + Storage stgmod.StorageDetail +} + +func (s *DirectStrategy) GetDetail() stgmod.ObjectDetail { + return s.Detail +} + +// 从指定对象重建对象 +type ECReconstructStrategy struct { + Detail stgmod.ObjectDetail + Redundancy cdssdk.ECRedundancy + Blocks []stgmod.ObjectBlock + Storages []stgmod.StorageDetail +} + +func (s *ECReconstructStrategy) GetDetail() stgmod.ObjectDetail { + return s.Detail +} + +type LRCReconstructStrategy struct { + Detail stgmod.ObjectDetail + Redundancy cdssdk.LRCRedundancy + Blocks []stgmod.ObjectBlock + Storages []stgmod.StorageDetail +} + +func (s *LRCReconstructStrategy) GetDetail() stgmod.ObjectDetail { + return s.Detail +} + +type Selector struct { + cfg Config + storageMeta *metacache.UserSpaceMeta + hubMeta *metacache.HubMeta + connectivity *metacache.Connectivity +} + +func NewSelector(cfg Config, storageMeta *metacache.UserSpaceMeta, hubMeta *metacache.HubMeta, connectivity *metacache.Connectivity) *Selector { + return &Selector{ + cfg: cfg, + storageMeta: storageMeta, + hubMeta: hubMeta, + connectivity: connectivity, + } +} + +func (s *Selector) Select(req Request) (Strategy, error) { + req2 := request2{ + Detail: req.Detail, + Range: req.Range, + DestLocation: req.DestLocation, + } + + if req.DestHub != 0 { + req2.DestHub = s.hubMeta.Get(req.DestHub) + } + + switch red := req.Detail.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + return s.selectForNoneOrRep(req2) + + case *cdssdk.RepRedundancy: + return s.selectForNoneOrRep(req2) + + case *cdssdk.ECRedundancy: + return s.selectForEC(req2, *red) + + case *cdssdk.LRCRedundancy: + return s.selectForLRC(req2, *red) + } + + return nil, fmt.Errorf("unsupported redundancy type: %v of object %v", reflect.TypeOf(req.Detail.Object.Redundancy), req.Detail.Object.ObjectID) +} + +type downloadStorageInfo struct { + Storage stgmod.StorageDetail + ObjectPinned bool + Blocks []stgmod.ObjectBlock + Distance float64 +} + +type downloadBlock struct { + Storage stgmod.StorageDetail + Block stgmod.ObjectBlock +} + +type request2 struct { + Detail stgmod.ObjectDetail + Range math2.Range + DestHub *cdssdk.Hub + DestLocation cdssdk.LocationID +} + +func (s *Selector) selectForNoneOrRep(req request2) (Strategy, error) { + sortedStgs := s.sortDownloadStorages(req) + if len(sortedStgs) == 0 { + return nil, fmt.Errorf("no storage available for download") + } + + _, blks := s.getMinReadingBlockSolution(sortedStgs, 1) + if len(blks) == 0 { + return nil, fmt.Errorf("no block available for download") + } + + return &DirectStrategy{ + Detail: req.Detail, + Storage: sortedStgs[0].Storage, + }, nil +} + +func (s *Selector) selectForEC(req request2, red cdssdk.ECRedundancy) (Strategy, error) { + sortedStgs := s.sortDownloadStorages(req) + if len(sortedStgs) == 0 { + return nil, fmt.Errorf("no storage available for download") + } + + bsc, blocks := s.getMinReadingBlockSolution(sortedStgs, red.K) + osc, stg := s.getMinReadingObjectSolution(sortedStgs, red.K) + + if bsc < osc { + bs := make([]stgmod.ObjectBlock, len(blocks)) + ss := make([]stgmod.StorageDetail, len(blocks)) + for i, b := range blocks { + bs[i] = b.Block + ss[i] = b.Storage + } + + return &ECReconstructStrategy{ + Detail: req.Detail, + Redundancy: red, + Blocks: bs, + Storages: ss, + }, nil + } + + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 + if osc == math.MaxFloat64 { + return nil, fmt.Errorf("no enough blocks to reconstruct the object %v , want %d, get only %d", req.Detail.Object.ObjectID, red.K, len(blocks)) + } + + return &DirectStrategy{ + Detail: req.Detail, + Storage: stg, + }, nil +} + +func (s *Selector) selectForLRC(req request2, red cdssdk.LRCRedundancy) (Strategy, error) { + sortedStgs := s.sortDownloadStorages(req) + if len(sortedStgs) == 0 { + return nil, fmt.Errorf("no storage available for download") + } + + var blocks []downloadBlock + selectedBlkIdx := make(map[int]bool) + for _, stg := range sortedStgs { + for _, b := range stg.Blocks { + if b.Index >= red.M() || selectedBlkIdx[b.Index] { + continue + } + blocks = append(blocks, downloadBlock{ + Storage: stg.Storage, + Block: b, + }) + selectedBlkIdx[b.Index] = true + } + } + if len(blocks) < red.K { + return nil, fmt.Errorf("not enough blocks to download lrc object") + } + + bs := make([]stgmod.ObjectBlock, len(blocks)) + ss := make([]stgmod.StorageDetail, len(blocks)) + for i, b := range blocks { + bs[i] = b.Block + ss[i] = b.Storage + } + + return &LRCReconstructStrategy{ + Detail: req.Detail, + Redundancy: red, + Blocks: bs, + Storages: ss, + }, nil +} + +func (s *Selector) sortDownloadStorages(req request2) []*downloadStorageInfo { + var stgIDs []cdssdk.StorageID + for _, id := range req.Detail.PinnedAt { + if !lo.Contains(stgIDs, id) { + stgIDs = append(stgIDs, id) + } + } + for _, b := range req.Detail.Blocks { + if !lo.Contains(stgIDs, b.StorageID) { + stgIDs = append(stgIDs, b.StorageID) + } + } + + downloadStorageMap := make(map[cdssdk.StorageID]*downloadStorageInfo) + for _, id := range req.Detail.PinnedAt { + storage, ok := downloadStorageMap[id] + if !ok { + mod := s.storageMeta.Get(id) + if mod == nil || mod.MasterHub == nil { + continue + } + + storage = &downloadStorageInfo{ + Storage: *mod, + ObjectPinned: true, + Distance: s.getStorageDistance(req, *mod), + } + downloadStorageMap[id] = storage + } + + storage.ObjectPinned = true + } + + for _, b := range req.Detail.Blocks { + storage, ok := downloadStorageMap[b.StorageID] + if !ok { + mod := s.storageMeta.Get(b.StorageID) + if mod == nil || mod.MasterHub == nil { + continue + } + + storage = &downloadStorageInfo{ + Storage: *mod, + Distance: s.getStorageDistance(req, *mod), + } + downloadStorageMap[b.StorageID] = storage + } + + storage.Blocks = append(storage.Blocks, b) + } + + return sort2.Sort(lo.Values(downloadStorageMap), func(left, right *downloadStorageInfo) int { + return sort2.Cmp(left.Distance, right.Distance) + }) +} + +func (s *Selector) getStorageDistance(req request2, src stgmod.StorageDetail) float64 { + if req.DestHub != nil { + if src.MasterHub.HubID == req.DestHub.HubID { + return consts.StorageDistanceSameStorage + } + + if src.MasterHub.LocationID == req.DestHub.LocationID { + return consts.StorageDistanceSameLocation + } + + latency := s.connectivity.Get(src.MasterHub.HubID, req.DestHub.HubID) + if latency == nil || *latency > time.Duration(float64(time.Millisecond)*s.cfg.HighLatencyHubMs) { + return consts.HubDistanceHighLatencyHub + } + + return consts.StorageDistanceOther + } + + if req.DestLocation != 0 { + if src.MasterHub.LocationID == req.DestLocation { + return consts.StorageDistanceSameLocation + } + } + + return consts.StorageDistanceOther +} + +func (s *Selector) getMinReadingBlockSolution(sortedStgs []*downloadStorageInfo, k int) (float64, []downloadBlock) { + gotBlocksMap := bitmap.Bitmap64(0) + var gotBlocks []downloadBlock + dist := float64(0.0) + for _, n := range sortedStgs { + for _, b := range n.Blocks { + if !gotBlocksMap.Get(b.Index) { + gotBlocks = append(gotBlocks, downloadBlock{ + Storage: n.Storage, + Block: b, + }) + gotBlocksMap.Set(b.Index, true) + dist += n.Distance + } + + if len(gotBlocks) >= k { + return dist, gotBlocks + } + } + } + + return math.MaxFloat64, gotBlocks +} + +func (s *Selector) getMinReadingObjectSolution(sortedStgs []*downloadStorageInfo, k int) (float64, stgmod.StorageDetail) { + dist := math.MaxFloat64 + var downloadStg stgmod.StorageDetail + for _, n := range sortedStgs { + if n.ObjectPinned && float64(k)*n.Distance < dist { + dist = float64(k) * n.Distance + stg := n.Storage + downloadStg = stg + } + } + + return dist, downloadStg +} diff --git a/client/internal/downloader/strip_iterator.go b/client/internal/downloader/strip_iterator.go new file mode 100644 index 0000000..0ae904f --- /dev/null +++ b/client/internal/downloader/strip_iterator.go @@ -0,0 +1,241 @@ +package downloader + +import ( + "context" + "io" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/math2" + stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" +) + +type downloadBlock struct { + Storage stgmod.StorageDetail + Block stgmod.ObjectBlock +} + +type Strip struct { + Data []byte + Position int64 +} + +type StripIterator struct { + downloader *Downloader + object cdssdk.Object + blocks []downloadBlock + red cdssdk.ECRedundancy + curStripIndex int64 + cache *StripCache + dataChan chan dataChanEntry + downloadingDone chan any + downloadingDoneOnce sync.Once + inited bool + downloadingStream io.ReadCloser + downloadingStripIndex int64 + downloadingPlanCtxCancel func() +} + +type dataChanEntry struct { + Data []byte + Position int64 // 条带在文件中的位置。字节为单位 + Error error +} + +func NewStripIterator(downloader *Downloader, object cdssdk.Object, blocks []downloadBlock, red cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator { + if maxPrefetch <= 0 { + maxPrefetch = 1 + } + + iter := &StripIterator{ + downloader: downloader, + object: object, + blocks: blocks, + red: red, + curStripIndex: beginStripIndex, + cache: cache, + dataChan: make(chan dataChanEntry, maxPrefetch-1), + downloadingDone: make(chan any), + } + + return iter +} + +func (s *StripIterator) MoveNext() (Strip, error) { + if !s.inited { + go s.downloading(s.curStripIndex) + s.inited = true + } + + // 先尝试获取一下,用于判断本次获取是否发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripIndex++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + default: + logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID) + } + + // 发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripIndex++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + case <-s.downloadingDone: + return Strip{}, iterator.ErrNoMoreItem + } +} + +func (s *StripIterator) Close() { + s.downloadingDoneOnce.Do(func() { + close(s.downloadingDone) + }) +} + +func (s *StripIterator) downloading(startStripIndex int64) { + curStripIndex := startStripIndex +loop: + for { + stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize) + if stripBytesPos >= s.object.Size { + s.sendToDataChan(dataChanEntry{Error: io.EOF}) + break + } + + stripKey := ECStripKey{ + ObjectID: s.object.ObjectID, + StripIndex: curStripIndex, + } + + item, ok := s.cache.Get(stripKey) + if ok { + if item.ObjectFileHash == s.object.FileHash { + if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) { + break loop + } + curStripIndex++ + continue + + } else { + // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载 + s.cache.Remove(stripKey) + } + } + + dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize)) + n, err := s.readStrip(curStripIndex, dataBuf) + if err == io.ErrUnexpectedEOF { + // dataBuf中的内容可能不足一个条带,但仍然将其完整放入cache中,外部应该自行计算该从这个buffer中读多少数据 + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos}) + s.sendToDataChan(dataChanEntry{Error: io.EOF}) + break loop + } + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + break loop + } + + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) { + break loop + } + + curStripIndex++ + } + + if s.downloadingStream != nil { + s.downloadingStream.Close() + s.downloadingPlanCtxCancel() + } + close(s.dataChan) +} + +func (s *StripIterator) sendToDataChan(entry dataChanEntry) bool { + select { + case s.dataChan <- entry: + return true + case <-s.downloadingDone: + return false + } +} + +func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { + // 如果需求的条带不当前正在下载的条带的位置不符合,则需要重新打开下载流 + if s.downloadingStream == nil || s.downloadingStripIndex != stripIndex { + if s.downloadingStream != nil { + s.downloadingStream.Close() + s.downloadingPlanCtxCancel() + } + + ft := ioswitch2.NewFromTo() + ft.ECParam = &s.red + for _, b := range s.blocks { + stg := b.Storage + ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg, ioswitch2.ECStream(b.Block.Index))) + } + + toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.Range{ + Offset: stripIndex * s.red.StripSize(), + }) + ft.AddTo(toExec) + + plans := exec.NewPlanBuilder() + err := parser.Parse(ft, plans) + if err != nil { + return 0, err + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, s.downloader.stgAgts) + exec := plans.Execute(exeCtx) + + ctx, cancel := context.WithCancel(context.Background()) + go exec.Wait(ctx) + + str, err := exec.BeginRead(hd) + if err != nil { + cancel() + return 0, err + } + + s.downloadingStream = str + s.downloadingStripIndex = stripIndex + s.downloadingPlanCtxCancel = cancel + } + + n, err := io.ReadFull(s.downloadingStream, buf) + s.downloadingStripIndex += 1 + return n, err +}