Browse Source

修复调试错误

gitlink
Sydonian 1 year ago
parent
commit
eb68056b92
3 changed files with 19 additions and 17 deletions
  1. +2
    -2
      common/pkgs/downloader/downloader.go
  2. +3
    -3
      common/pkgs/downloader/iterator.go
  3. +14
    -12
      common/pkgs/downloader/strip_iterator.go

+ 2
- 2
common/pkgs/downloader/downloader.go View File

@@ -129,8 +129,8 @@ type ObjectECStrip struct {
}

type ECStripKey struct {
ObjectID cdssdk.ObjectID
StripPosition int64
ObjectID cdssdk.ObjectID
StripIndex int64
}

type StripCache = lru.Cache[ECStripKey, ObjectECStrip]

+ 3
- 3
common/pkgs/downloader/iterator.go View File

@@ -220,8 +220,8 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
totalReadLen = math2.Min(req.Raw.Length, totalReadLen)
}

firstStripPos := readPos / int64(ecRed.K) / int64(ecRed.ChunkSize)
stripIter := NewStripIterator(req.Detail.Object, blocks, ecRed, firstStripPos, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
firstStripIndex := readPos / int64(ecRed.K) / int64(ecRed.ChunkSize)
stripIter := NewStripIterator(req.Detail.Object, blocks, ecRed, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
defer stripIter.Close()

for totalReadLen > 0 {
@@ -235,8 +235,8 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
return
}

nextStripPos := strip.Position + int64(ecRed.ChunkSize)
readRelativePos := readPos - strip.Position
nextStripPos := strip.Position + int64(ecRed.K)*int64(ecRed.ChunkSize)
curReadLen := math2.Min(totalReadLen, nextStripPos-readPos)

err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen])


+ 14
- 12
common/pkgs/downloader/strip_iterator.go View File

@@ -26,7 +26,7 @@ type StripIterator struct {
object cdssdk.Object
blocks []downloadBlock
red *cdssdk.ECRedundancy
curStripPos int64
curStripIndex int64
cache *StripCache
dataChan chan dataChanEntry
downloadingDone chan any
@@ -40,7 +40,7 @@ type dataChanEntry struct {
Error error
}

func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripPos int64, cache *StripCache, maxPrefetch int) *StripIterator {
func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator {
if maxPrefetch <= 0 {
maxPrefetch = 1
}
@@ -49,7 +49,7 @@ func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.
object: object,
blocks: blocks,
red: red,
curStripPos: beginStripPos,
curStripIndex: beginStripIndex,
cache: cache,
dataChan: make(chan dataChanEntry, maxPrefetch-1),
downloadingDone: make(chan any),
@@ -75,11 +75,11 @@ func (s *StripIterator) MoveNext() (Strip, error) {
return Strip{}, entry.Error
}

s.curStripPos++
s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

default:
logger.Debugf("waitting for ec strip %v for object %v", s.curStripPos, s.object.ObjectID)
logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID)
}

// 发生了等待
@@ -93,7 +93,7 @@ func (s *StripIterator) MoveNext() (Strip, error) {
return Strip{}, entry.Error
}

s.curStripPos++
s.curStripIndex++
return Strip{Data: entry.Data, Position: entry.Position}, nil

case <-s.downloadingDone:
@@ -119,18 +119,18 @@ func (s *StripIterator) downloading() {
blockStrs = append(blockStrs, NewIPFSReader(b.Node, b.Block.FileHash))
}

curStripPos := s.curStripPos
curStripIndex := s.curStripIndex
loop:
for {
stripBytesPos := curStripPos * int64(s.red.ChunkSize)
stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize)
if stripBytesPos >= s.object.Size {
s.sendToDataChan(dataChanEntry{Error: io.EOF})
break
}

stripKey := ECStripKey{
ObjectID: s.object.ObjectID,
StripPosition: curStripPos,
ObjectID: s.object.ObjectID,
StripIndex: curStripIndex,
}

item, ok := s.cache.Get(stripKey)
@@ -139,7 +139,7 @@ loop:
if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) {
break loop
}
curStripPos++
curStripIndex++
continue

} else {
@@ -149,7 +149,7 @@ loop:
}

for _, str := range blockStrs {
_, err := str.Seek(stripBytesPos*int64(s.red.ChunkSize), io.SeekStart)
_, err := str.Seek(curStripIndex*int64(s.red.ChunkSize), io.SeekStart)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
break loop
@@ -195,6 +195,8 @@ loop:
if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) {
break loop
}

curStripIndex++
}

for _, str := range blockStrs {


Loading…
Cancel
Save