diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index c4f59e3..43b9e21 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -387,7 +387,7 @@ func (iter *DownloadObjectIterator) downloadFromStorage(stg *stgmod.StorageDetai var strHandle *exec.DriverReadStream ft := ioswitch2.NewFromTo() - toExec, handle := ioswitch2.NewToDriver(-1) + toExec, handle := ioswitch2.NewToDriver(ioswitch2.RawStream()) toExec.Range = exec.Range{ Offset: req.Raw.Offset, } @@ -396,7 +396,7 @@ func (iter *DownloadObjectIterator) downloadFromStorage(stg *stgmod.StorageDetai toExec.Range.Length = &len } - ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *stg.MasterHub, stg.Storage, -1)).AddTo(toExec) + ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *stg.MasterHub, stg.Storage, ioswitch2.RawStream())).AddTo(toExec) strHandle = handle plans := exec.NewPlanBuilder() diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index da06dcc..a08c1d2 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -201,10 +201,10 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { ft := ioswitch2.NewFromTo() for _, b := range s.blocks { stg := b.Storage - ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg.Storage, b.Block.Index)) + ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg.Storage, ioswitch2.ECSrteam(b.Block.Index))) } - toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{ + toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), exec.Range{ Offset: stripIndex * s.red.StripSize(), }) ft.AddTo(toExec) diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 4993640..679164c 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -6,7 +6,7 @@ import ( ) type From interface { - GetDataIndex() int + GetStreamType() StreamType } type To interface { @@ -14,7 +14,53 @@ type To interface { // 如果DataIndex == -1,则表示在整个文件的范围。 // 如果DataIndex >= 0,则表示在文件的某个分片的范围。 GetRange() exec.Range - GetDataIndex() int + GetStreamType() StreamType +} + +const ( + // 未处理的完整文件流 + StreamTypeRaw = iota + // EC编码的某一块的流 + StreamTypeEC + // 分段编码的某一段的流 + StreamTypeSegment +) + +type StreamType struct { + Type int + Index int +} + +func RawStream() StreamType { + return StreamType{ + Type: StreamTypeRaw, + } +} + +func ECSrteam(index int) StreamType { + return StreamType{ + Type: StreamTypeEC, + Index: index, + } +} + +func SegmentStream(index int) StreamType { + return StreamType{ + Type: StreamTypeSegment, + Index: index, + } +} + +func (s StreamType) IsRaw() bool { + return s.Type == StreamTypeRaw +} + +func (s StreamType) IsEC() bool { + return s.Type == StreamTypeEC +} + +func (s StreamType) IsSegment() bool { + return s.Type == StreamTypeSegment } type FromTos []FromTo @@ -39,69 +85,69 @@ func (ft *FromTo) AddTo(to To) *FromTo { } type FromDriver struct { - Handle *exec.DriverWriteStream - DataIndex int + Handle *exec.DriverWriteStream + StreamType StreamType } -func NewFromDriver(dataIndex int) (*FromDriver, *exec.DriverWriteStream) { +func NewFromDriver(strType StreamType) (*FromDriver, *exec.DriverWriteStream) { handle := &exec.DriverWriteStream{ RangeHint: &exec.Range{}, } return &FromDriver{ - Handle: handle, - DataIndex: dataIndex, + Handle: handle, + StreamType: strType, }, handle } -func (f *FromDriver) GetDataIndex() int { - return f.DataIndex +func (f *FromDriver) GetStreamType() StreamType { + return f.StreamType } type FromShardstore struct { - FileHash cdssdk.FileHash - Hub cdssdk.Hub - Storage cdssdk.Storage - DataIndex int + FileHash cdssdk.FileHash + Hub cdssdk.Hub + Storage cdssdk.Storage + StreamType StreamType } -func NewFromShardstore(fileHash cdssdk.FileHash, hub cdssdk.Hub, storage cdssdk.Storage, dataIndex int) *FromShardstore { +func NewFromShardstore(fileHash cdssdk.FileHash, hub cdssdk.Hub, storage cdssdk.Storage, strType StreamType) *FromShardstore { return &FromShardstore{ - FileHash: fileHash, - Hub: hub, - Storage: storage, - DataIndex: dataIndex, + FileHash: fileHash, + Hub: hub, + Storage: storage, + StreamType: strType, } } -func (f *FromShardstore) GetDataIndex() int { - return f.DataIndex +func (f *FromShardstore) GetStreamType() StreamType { + return f.StreamType } type ToDriver struct { - Handle *exec.DriverReadStream - DataIndex int - Range exec.Range + Handle *exec.DriverReadStream + StreamType StreamType + Range exec.Range } -func NewToDriver(dataIndex int) (*ToDriver, *exec.DriverReadStream) { +func NewToDriver(strType StreamType) (*ToDriver, *exec.DriverReadStream) { str := exec.DriverReadStream{} return &ToDriver{ - Handle: &str, - DataIndex: dataIndex, + Handle: &str, + StreamType: strType, }, &str } -func NewToDriverWithRange(dataIndex int, rng exec.Range) (*ToDriver, *exec.DriverReadStream) { +func NewToDriverWithRange(strType StreamType, rng exec.Range) (*ToDriver, *exec.DriverReadStream) { str := exec.DriverReadStream{} return &ToDriver{ - Handle: &str, - DataIndex: dataIndex, - Range: rng, + Handle: &str, + StreamType: strType, + Range: rng, }, &str } -func (t *ToDriver) GetDataIndex() int { - return t.DataIndex +func (t *ToDriver) GetStreamType() StreamType { + return t.StreamType } func (t *ToDriver) GetRange() exec.Range { @@ -111,32 +157,32 @@ func (t *ToDriver) GetRange() exec.Range { type ToShardStore struct { Hub cdssdk.Hub Storage cdssdk.Storage - DataIndex int + StreamType StreamType Range exec.Range FileHashStoreKey string } -func NewToShardStore(hub cdssdk.Hub, stg cdssdk.Storage, dataIndex int, fileHashStoreKey string) *ToShardStore { +func NewToShardStore(hub cdssdk.Hub, stg cdssdk.Storage, strType StreamType, fileHashStoreKey string) *ToShardStore { return &ToShardStore{ Hub: hub, Storage: stg, - DataIndex: dataIndex, + StreamType: strType, FileHashStoreKey: fileHashStoreKey, } } -func NewToShardStoreWithRange(hub cdssdk.Hub, stg cdssdk.Storage, dataIndex int, fileHashStoreKey string, rng exec.Range) *ToShardStore { +func NewToShardStoreWithRange(hub cdssdk.Hub, stg cdssdk.Storage, streamType StreamType, fileHashStoreKey string, rng exec.Range) *ToShardStore { return &ToShardStore{ Hub: hub, Storage: stg, - DataIndex: dataIndex, + StreamType: streamType, FileHashStoreKey: fileHashStoreKey, Range: rng, } } -func (t *ToShardStore) GetDataIndex() int { - return t.DataIndex +func (t *ToShardStore) GetStreamType() StreamType { + return t.StreamType } func (t *ToShardStore) GetRange() exec.Range { @@ -161,8 +207,10 @@ func NewLoadToShared(hub cdssdk.Hub, storage cdssdk.Storage, userID cdssdk.UserI } } -func (t *LoadToShared) GetDataIndex() int { - return -1 +func (t *LoadToShared) GetStreamType() StreamType { + return StreamType{ + Type: StreamTypeRaw, + } } func (t *LoadToShared) GetRange() exec.Range { diff --git a/common/pkgs/ioswitch2/ops2/segment.go b/common/pkgs/ioswitch2/ops2/segment.go new file mode 100644 index 0000000..b1b3347 --- /dev/null +++ b/common/pkgs/ioswitch2/ops2/segment.go @@ -0,0 +1 @@ +package ops2 diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 0d312fb..f4d4dfd 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -15,9 +15,9 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type IndexedStream struct { - Stream *dag.Var - DataIndex int +type TypedStream struct { + Stream *dag.Var + StreamType ioswitch2.StreamType } type ParseContext struct { @@ -25,10 +25,10 @@ type ParseContext struct { DAG *ops2.GraphNodeBuilder // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 - ToNodes map[ioswitch2.To]ops2.ToNode - IndexedStreams []IndexedStream - StreamRange exec.Range - EC cdssdk.ECRedundancy + ToNodes map[ioswitch2.To]ops2.ToNode + TypedStreams []TypedStream + StreamRange exec.Range + EC cdssdk.ECRedundancy } func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder, ec cdssdk.ECRedundancy) error { @@ -86,10 +86,10 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder, ec cdssdk.ECRedundancy) return plan.Generate(ctx.DAG.Graph, blder) } -func findOutputStream(ctx *ParseContext, streamIndex int) *dag.Var { +func findOutputStream(ctx *ParseContext, streamType ioswitch2.StreamType) *dag.Var { var ret *dag.Var - for _, s := range ctx.IndexedStreams { - if s.DataIndex == streamIndex { + for _, s := range ctx.TypedStreams { + if s.StreamType == streamType { ret = s.Stream break } @@ -106,7 +106,7 @@ func calcStreamRange(ctx *ParseContext) { } for _, to := range ctx.Ft.Toes { - if to.GetDataIndex() == -1 { + if to.GetStreamType().IsRaw() { toRng := to.GetRange() rng.ExtendStart(math2.Floor(toRng.Offset, stripSize)) if toRng.Length != nil { @@ -139,19 +139,19 @@ func extend(ctx *ParseContext) error { return err } - ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ - Stream: frNode.Output().Var, - DataIndex: fr.GetDataIndex(), + ctx.TypedStreams = append(ctx.TypedStreams, TypedStream{ + Stream: frNode.Output().Var, + StreamType: fr.GetStreamType(), }) // 对于完整文件的From,生成Split指令 - if fr.GetDataIndex() == -1 { + if fr.GetStreamType().IsRaw() { splitNode := ctx.DAG.NewChunkedSplit(ctx.EC.ChunkSize) splitNode.Split(frNode.Output().Var, ctx.EC.K) for i := 0; i < ctx.EC.K; i++ { - ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ - Stream: splitNode.SubStream(i), - DataIndex: i, + ctx.TypedStreams = append(ctx.TypedStreams, TypedStream{ + Stream: splitNode.SubStream(i), + StreamType: ioswitch2.ECSrteam(i), }) } } @@ -159,9 +159,9 @@ func extend(ctx *ParseContext) error { // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 ecInputStrs := make(map[int]*dag.Var) - for _, s := range ctx.IndexedStreams { - if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { - ecInputStrs[s.DataIndex] = s.Stream + for _, s := range ctx.TypedStreams { + if s.StreamType.IsEC() && ecInputStrs[s.StreamType.Index] == nil { + ecInputStrs[s.StreamType.Index] = s.Stream if len(ecInputStrs) == ctx.EC.K { break } @@ -175,20 +175,20 @@ func extend(ctx *ParseContext) error { mulNode.AddInput(s, i) } for i := 0; i < ctx.EC.N; i++ { - ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ - Stream: mulNode.NewOutput(i), - DataIndex: i, + ctx.TypedStreams = append(ctx.TypedStreams, TypedStream{ + Stream: mulNode.NewOutput(i), + StreamType: ioswitch2.ECSrteam(i), }) } joinNode := ctx.DAG.NewChunkedJoin(ctx.EC.ChunkSize) for i := 0; i < ctx.EC.K; i++ { // 不可能找不到流 - joinNode.AddInput(findOutputStream(ctx, i)) + joinNode.AddInput(findOutputStream(ctx, ioswitch2.ECSrteam(i))) } - ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ - Stream: joinNode.Joined(), - DataIndex: -1, + ctx.TypedStreams = append(ctx.TypedStreams, TypedStream{ + Stream: joinNode.Joined(), + StreamType: ioswitch2.RawStream(), }) } @@ -200,9 +200,9 @@ func extend(ctx *ParseContext) error { } ctx.ToNodes[to] = toNode - str := findOutputStream(ctx, to.GetDataIndex()) + str := findOutputStream(ctx, to.GetStreamType()) if str == nil { - return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex()) + return fmt.Errorf("no output stream found for data index %d", to.GetStreamType()) } toNode.SetInput(str) @@ -229,7 +229,7 @@ func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) { case *ioswitch2.FromShardstore: t := ctx.DAG.NewShardRead(f.Storage.StorageID, types.NewOpen(f.FileHash)) - if f.DataIndex == -1 { + if f.StreamType.IsRaw() { t.Open.WithNullableLength(repRange.Offset, repRange.Length) } else { t.Open.WithNullableLength(blkRange.Offset, blkRange.Length) @@ -255,7 +255,7 @@ func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) { n.Env().ToEnvDriver() n.Env().Pinned = true - if f.DataIndex == -1 { + if f.StreamType.IsRaw() { f.Handle.RangeHint.Offset = repRange.Offset f.Handle.RangeHint.Length = repRange.Length } else { @@ -539,10 +539,10 @@ func generateRange(ctx *ParseContext) { to := ctx.Ft.Toes[i] toNode := ctx.ToNodes[to] - toDataIdx := to.GetDataIndex() + toStrType := to.GetStreamType() toRng := to.GetRange() - if toDataIdx == -1 { + if toStrType.IsRaw() { n := ctx.DAG.NewRange() toInput := toNode.Input() *n.Env() = *toInput.Var.From().Node.Env() diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 1605a2e..8babde9 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -40,10 +40,10 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e uploadTime := time.Now() ft := ioswitch2.NewFromTo() - fromExec, hd := ioswitch2.NewFromDriver(-1) + fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) ft.AddFrom(fromExec) for _, stg := range u.targetStgs { - ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, -1, "fileHash")) + ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, ioswitch2.RawStream(), "fileHash")) ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, u.userID, u.pkg.PackageID, path)) } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index aed711b..3b2ac0b 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -43,8 +43,8 @@ func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error uploadTime := time.Now() ft := ioswitch2.NewFromTo() - fromExec, hd := ioswitch2.NewFromDriver(-1) - ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg.Storage, -1, "fileHash")) + fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) + ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg.Storage, ioswitch2.RawStream(), "fileHash")) plans := exec.NewPlanBuilder() err := parser.Parse(ft, plans, cdssdk.DefaultECRedundancy) diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 337cf9c..006fc42 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -447,9 +447,9 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, -1)) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, ioswitch2.RawStream())) for i, stg := range uploadStgs { - ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, -1, fmt.Sprintf("%d", i))) + ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i))) } plans := exec.NewPlanBuilder() @@ -506,9 +506,9 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD } ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, -1)) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, ioswitch2.RawStream())) for i := 0; i < red.N; i++ { - ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, i, fmt.Sprintf("%d", i))) + ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i))) } plans := exec.NewPlanBuilder() err = parser.Parse(ft, plans, *red) @@ -623,9 +623,9 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, -1)) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, ioswitch2.RawStream())) for i, stg := range uploadStgs { - ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, -1, fmt.Sprintf("%d", i))) + ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i))) } plans := exec.NewPlanBuilder() @@ -696,11 +696,11 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe ft := ioswitch2.NewFromTo() for _, block := range chosenBlocks { - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, block.Index)) + ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.ECSrteam(block.Index))) } len := obj.Object.Size - ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, -1, fmt.Sprintf("%d", i), exec.Range{ + ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), exec.Range{ Offset: 0, Length: &len, })) @@ -788,11 +788,11 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet ft := ioswitch2.NewFromTo() for _, block := range chosenBlocks { stg := stg.Storage - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *stg.MasterHub, stg.Storage, block.Index)) + ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *stg.MasterHub, stg.Storage, ioswitch2.ECSrteam(block.Index))) } // 输出只需要自己要保存的那一块 - ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, i, fmt.Sprintf("%d", i))) + ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i))) err := parser.Parse(ft, planBlder, *srcRed) if err != nil { diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 91ed3b0..6214560 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -741,9 +741,9 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st ft := ioswitch2.NewFromTo() fromStg := allStgInfos[obj.Blocks[0].StorageID] - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, fromStg.Storage, -1)) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, fromStg.Storage, ioswitch2.RawStream())) toStg := allStgInfos[solu.blockList[i].StorageID] - ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, toStg.Storage, -1, fmt.Sprintf("%d.0", obj.Object.ObjectID))) + ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, toStg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d.0", obj.Object.ObjectID))) err := parser.Parse(ft, planBld, cdssdk.DefaultECRedundancy) if err != nil { @@ -798,10 +798,10 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg for id, idxs := range reconstrct { ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *allStgInfos[id].MasterHub, allStgInfos[id].Storage, -1)) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *allStgInfos[id].MasterHub, allStgInfos[id].Storage, ioswitch2.RawStream())) for _, i := range *idxs { - ft.AddTo(ioswitch2.NewToShardStore(*allStgInfos[id].MasterHub, allStgInfos[id].Storage, i, fmt.Sprintf("%d.%d", obj.Object.ObjectID, i))) + ft.AddTo(ioswitch2.NewToShardStore(*allStgInfos[id].MasterHub, allStgInfos[id].Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d.%d", obj.Object.ObjectID, i))) } err := parser.Parse(ft, planBld, *ecRed)