From 0ca90cc7096684500097f9d6a5d3b6c1b0bf0ddc Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 1 Jul 2024 11:35:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9D=A1=E5=B8=A6=E9=A2=84=E8=AF=BB=E5=8F=AF?= =?UTF-8?q?=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/assets/confs/agent.config.json | 3 +- common/assets/confs/client.config.json | 3 +- common/pkgs/downloader/config.go | 2 + common/pkgs/downloader/iterator.go | 145 ++------------- common/pkgs/downloader/strip_iterator.go | 214 +++++++++++++++++++++++ 5 files changed, 235 insertions(+), 132 deletions(-) create mode 100644 common/pkgs/downloader/strip_iterator.go diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index e347b41..161a7b7 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -38,6 +38,7 @@ }, "downloader": { "maxStripCacheCount": 100, - "highLatencyNode": 35 + "highLatencyNode": 35, + "ecStripPrefetchCount": 1 } } \ No newline at end of file diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index a0b6a12..94a0f29 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -31,6 +31,7 @@ }, "downloader": { "maxStripCacheCount": 100, - "highLatencyNode": 35 + "highLatencyNode": 35, + "ecStripPrefetchCount": 1 } } \ No newline at end of file diff --git a/common/pkgs/downloader/config.go b/common/pkgs/downloader/config.go index 103bb9a..a19a358 100644 --- a/common/pkgs/downloader/config.go +++ b/common/pkgs/downloader/config.go @@ -5,4 +5,6 @@ type Config struct { MaxStripCacheCount int `json:"maxStripCacheCount"` // 当到下载节点的延迟高于这个值时,该节点在评估时会有更高的分数惩罚,单位:ms HighLatencyNodeMs float64 `json:"highLatencyNodeMs"` + // EC模式下,每个Object的条带的预取数量,最少为1 + ECStripPrefetchCount int `json:"ecStripPrefetchCount"` } diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 183b8bd..b894a45 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -1,7 +1,6 @@ package downloader import ( - "context" "fmt" "io" "math" @@ -11,7 +10,6 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" - "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -19,12 +17,10 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sort2" - "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -216,93 +212,39 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed } logger.Debug(logStrs...) - var fileStrs []*IPFSReader - for _, b := range blocks { - str := NewIPFSReader(b.Node, b.Block.FileHash) - - fileStrs = append(fileStrs, str) - } - - rs, err := ec.NewRs(ecRed.K, ecRed.N) - if err != nil { - return nil, fmt.Errorf("new rs: %w", err) - } - pr, pw := io.Pipe() go func() { - defer func() { - for _, str := range fileStrs { - str.Close() - } - }() - readPos := req.Raw.Offset totalReadLen := req.Detail.Object.Size - req.Raw.Offset if req.Raw.Length >= 0 { totalReadLen = math2.Min(req.Raw.Length, totalReadLen) } - var downloadStripCb *future.SetValueFuture[[]byte] + firstStripPos := readPos / int64(ecRed.K) / int64(ecRed.ChunkSize) + stripIter := NewStripIterator(req.Detail.Object, blocks, ecRed, firstStripPos, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount) + defer stripIter.Close() for totalReadLen > 0 { - curStripPos := readPos / int64(ecRed.K) / int64(ecRed.ChunkSize) - curStripPosInBytes := curStripPos * int64(ecRed.K) * int64(ecRed.ChunkSize) - nextStripPosInBytes := (curStripPos + 1) * int64(ecRed.K) * int64(ecRed.ChunkSize) - curReadLen := math2.Min(totalReadLen, nextStripPosInBytes-readPos) - readRelativePos := readPos - curStripPosInBytes - cacheKey := ECStripKey{ - ObjectID: req.Detail.Object.ObjectID, - StripPosition: curStripPos, - } - - var stripData []byte - - cache, ok := iter.downloader.strips.Get(cacheKey) - if ok { - if cache.ObjectFileHash != req.Detail.Object.FileHash { - // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载 - iter.downloader.strips.Remove(cacheKey) - ok = false - } else { - stripData = cache.Data - } + strip, err := stripIter.MoveNext() + if err == iterator.ErrNoMoreItem { + pw.CloseWithError(io.ErrUnexpectedEOF) + return } - - if !ok { - // 缓存中没有条带,也没有启动预加载,可能是因为在极短的时间内缓存过期了,需要重新下载 - if downloadStripCb == nil { - downloadStripCb = future.NewSetValue[[]byte]() - go iter.downloadECStrip(curStripPos, &req.Detail.Object, ecRed, rs, fileStrs, blocks, downloadStripCb) - } - - logger.Debugf("cache missed, waitting for ec strip %v of object %v to be downloaded", curStripPos, req.Detail.Object.ObjectID) - stripData, err = downloadStripCb.WaitValue(context.Background()) - if err != nil { - pw.CloseWithError(err) - return - } + if err != nil { + pw.CloseWithError(err) + return } - // 不管有没有WaitValue都要清零 - downloadStripCb = nil + nextStripPos := strip.Position + int64(ecRed.ChunkSize) + readRelativePos := readPos - strip.Position + curReadLen := math2.Min(totalReadLen, nextStripPos-readPos) - // 看一眼缓存中是否有下一个条带,如果没有,则启动下载。 - // 注:现在缓存中有不代表读取的时候还有 - nextStripCacheKey := ECStripKey{ - ObjectID: req.Detail.Object.ObjectID, - StripPosition: curStripPos + 1, - } - _, ok = iter.downloader.strips.Peek(nextStripCacheKey) - if !ok { - downloadStripCb = future.NewSetValue[[]byte]() - go iter.downloadECStrip(curStripPos+1, &req.Detail.Object, ecRed, rs, fileStrs, blocks, downloadStripCb) - } - - err := io2.WriteAll(pw, stripData[readRelativePos:readRelativePos+curReadLen]) + err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen]) if err != nil { pw.CloseWithError(err) return } + totalReadLen -= curReadLen readPos += curReadLen } @@ -324,58 +266,6 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed }), nil } -func (i *DownloadObjectIterator) downloadECStrip(stripPos int64, obj *cdssdk.Object, ecRed *cdssdk.ECRedundancy, rs *ec.Rs, blockStrs []*IPFSReader, blocks []downloadBlock, cb *future.SetValueFuture[[]byte]) { - for _, str := range blockStrs { - _, err := str.Seek(stripPos*int64(ecRed.ChunkSize), io.SeekStart) - if err != nil { - cb.SetError(err) - return - } - } - - dataBuf := make([]byte, int64(ecRed.K*ecRed.ChunkSize)) - blockArrs := make([][]byte, ecRed.N) - for i := 0; i < ecRed.K; i++ { - // 放入的slice长度为0,但容量为ChunkSize,EC库发现长度为0的块后才会认为是待恢复块 - blockArrs[i] = dataBuf[i*ecRed.ChunkSize : i*ecRed.ChunkSize] - } - for _, b := range blocks { - // 用于恢复的块则要将其长度变回ChunkSize,用于后续读取块数据 - if b.Block.Index < ecRed.K { - // 此处扩容不会导致slice指向一个新内存 - blockArrs[b.Block.Index] = blockArrs[b.Block.Index][0:ecRed.ChunkSize] - } else { - blockArrs[b.Block.Index] = make([]byte, ecRed.ChunkSize) - } - } - - err := sync2.ParallelDo(blocks, func(b downloadBlock, idx int) error { - _, err := io.ReadFull(blockStrs[idx], blockArrs[b.Block.Index]) - return err - }) - if err != nil { - cb.SetError(err) - return - } - - err = rs.ReconstructData(blockArrs) - if err != nil { - cb.SetError(err) - return - } - - cacheKey := ECStripKey{ - ObjectID: obj.ObjectID, - StripPosition: stripPos, - } - - i.downloader.strips.Add(cacheKey, ObjectECStrip{ - Data: dataBuf, - ObjectFileHash: obj.FileHash, - }) - cb.SetValue(dataBuf) -} - func (iter *DownloadObjectIterator) sortDownloadNodes(req downloadReqeust2) ([]*DownloadNodeInfo, error) { var nodeIDs []cdssdk.NodeID for _, id := range req.Detail.PinnedAt { @@ -424,11 +314,6 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(req downloadReqeust2) ([]* }), nil } -type downloadBlock struct { - Node cdssdk.Node - Block stgmod.ObjectBlock -} - func (iter *DownloadObjectIterator) getMinReadingBlockSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, []downloadBlock) { gotBlocksMap := bitmap.Bitmap64(0) var gotBlocks []downloadBlock diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go new file mode 100644 index 0000000..c38c545 --- /dev/null +++ b/common/pkgs/downloader/strip_iterator.go @@ -0,0 +1,214 @@ +package downloader + +import ( + "io" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sync2" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" +) + +type downloadBlock struct { + Node cdssdk.Node + Block stgmod.ObjectBlock +} + +type Strip struct { + Data []byte + Position int64 +} + +type StripIterator struct { + object cdssdk.Object + blocks []downloadBlock + red *cdssdk.ECRedundancy + curStripPos int64 + cache *StripCache + dataChan chan dataChanEntry + downloadingDone chan any + downloadingDoneOnce sync.Once + inited bool +} + +type dataChanEntry struct { + Data []byte + Position int64 // 条带在文件中的位置。字节为单位 + Error error +} + +func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripPos int64, cache *StripCache, maxPrefetch int) *StripIterator { + if maxPrefetch <= 0 { + maxPrefetch = 1 + } + + iter := &StripIterator{ + object: object, + blocks: blocks, + red: red, + curStripPos: beginStripPos, + cache: cache, + dataChan: make(chan dataChanEntry, maxPrefetch-1), + downloadingDone: make(chan any), + } + + return iter +} + +func (s *StripIterator) MoveNext() (Strip, error) { + if !s.inited { + go s.downloading() + s.inited = true + } + + // 先尝试获取一下,用于判断本次获取是否发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripPos++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + default: + logger.Debugf("waitting for ec strip %v for object %v", s.curStripPos, s.object.ObjectID) + } + + // 发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripPos++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + case <-s.downloadingDone: + return Strip{}, iterator.ErrNoMoreItem + } +} + +func (s *StripIterator) Close() { + s.downloadingDoneOnce.Do(func() { + close(s.downloadingDone) + }) +} + +func (s *StripIterator) downloading() { + rs, err := ec.NewRs(s.red.K, s.red.N) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + return + } + + var blockStrs []*IPFSReader + for _, b := range s.blocks { + blockStrs = append(blockStrs, NewIPFSReader(b.Node, b.Block.FileHash)) + } + + curStripPos := s.curStripPos +loop: + for { + stripBytesPos := curStripPos * int64(s.red.ChunkSize) + if stripBytesPos >= s.object.Size { + s.sendToDataChan(dataChanEntry{Error: io.EOF}) + break + } + + stripKey := ECStripKey{ + ObjectID: s.object.ObjectID, + StripPosition: curStripPos, + } + + item, ok := s.cache.Get(stripKey) + if ok { + if item.ObjectFileHash == s.object.FileHash { + if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) { + break loop + } + curStripPos++ + continue + + } else { + // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载 + s.cache.Remove(stripKey) + } + } + + for _, str := range blockStrs { + _, err := str.Seek(stripBytesPos*int64(s.red.ChunkSize), io.SeekStart) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + break loop + } + } + + dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize)) + blockArrs := make([][]byte, s.red.N) + for i := 0; i < s.red.K; i++ { + // 放入的slice长度为0,但容量为ChunkSize,EC库发现长度为0的块后才会认为是待恢复块 + blockArrs[i] = dataBuf[i*s.red.ChunkSize : i*s.red.ChunkSize] + } + for _, b := range s.blocks { + // 用于恢复的块则要将其长度变回ChunkSize,用于后续读取块数据 + if b.Block.Index < s.red.K { + // 此处扩容不会导致slice指向一个新内存 + blockArrs[b.Block.Index] = blockArrs[b.Block.Index][0:s.red.ChunkSize] + } else { + blockArrs[b.Block.Index] = make([]byte, s.red.ChunkSize) + } + } + + err := sync2.ParallelDo(s.blocks, func(b downloadBlock, idx int) error { + _, err := io.ReadFull(blockStrs[idx], blockArrs[b.Block.Index]) + return err + }) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + break loop + } + + err = rs.ReconstructData(blockArrs) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + break loop + } + + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) { + break loop + } + } + + for _, str := range blockStrs { + str.Close() + } + + close(s.dataChan) +} + +func (s *StripIterator) sendToDataChan(entry dataChanEntry) bool { + select { + case s.dataChan <- entry: + return true + case <-s.downloadingDone: + return false + } +}