Browse Source

增加替换SegmentJoin为分片上传的pass

gitlink
Sydonian 1 year ago
parent
commit
bd7283c044
10 changed files with 149 additions and 40 deletions
  1. +8
    -8
      client/internal/cmdline/test.go
  2. +4
    -3
      common/pkgs/ioswitch2/fromto.go
  3. +2
    -2
      common/pkgs/ioswitch2/ops2/multipart.go
  4. +20
    -7
      common/pkgs/ioswitch2/ops2/segment.go
  5. +5
    -4
      common/pkgs/ioswitch2/ops2/shard_store.go
  6. +100
    -6
      common/pkgs/ioswitch2/parser/parser.go
  7. +1
    -1
      common/pkgs/uploader/create_load.go
  8. +1
    -1
      common/pkgs/uploader/update.go
  9. +6
    -6
      scanner/internal/event/check_package_redundancy.go
  10. +2
    -2
      scanner/internal/event/clean_pinned.go

+ 8
- 8
client/internal/cmdline/test.go View File

@@ -35,8 +35,8 @@ func init() {
ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3)
ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream()))
// ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0"))
ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1), "1", exec.Range{Offset: 1}))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2), "2"))
ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1", exec.Range{Offset: 1}))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2), "2"))

plans := exec.NewPlanBuilder()
err = parser.Parse(ft, plans)
@@ -86,9 +86,9 @@ func init() {

toDrv, drvStr := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), exec.NewRange(0, 1293))
ft.AddTo(toDrv)
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(0), "EC0"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(1), "EC1"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(2), "EC2"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECSrteam(0), "EC0"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECSrteam(1), "EC1"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECSrteam(2), "EC2"))

plans := exec.NewPlanBuilder()
err = parser.Parse(ft, plans)
@@ -150,9 +150,9 @@ func init() {
ft := ioswitch2.NewFromTo()
ft.ECParam = &cdssdk.DefaultECRedundancy
ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.RawStream()))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(0), "EC0"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(1), "EC1"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.ECSrteam(2), "EC2"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECSrteam(0), "EC0"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECSrteam(1), "EC1"))
ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECSrteam(2), "EC2"))

plans := exec.NewPlanBuilder()
err = parser.Parse(ft, plans)


+ 4
- 3
common/pkgs/ioswitch2/fromto.go View File

@@ -3,6 +3,7 @@ package ioswitch2
import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

type From interface {
@@ -160,13 +161,13 @@ func (t *ToDriver) GetRange() exec.Range {

type ToShardStore struct {
Hub cdssdk.Hub
Storage cdssdk.Storage
Storage stgmod.StorageDetail
StreamIndex StreamIndex
Range exec.Range
FileHashStoreKey string
}

func NewToShardStore(hub cdssdk.Hub, stg cdssdk.Storage, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore {
func NewToShardStore(hub cdssdk.Hub, stg stgmod.StorageDetail, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore {
return &ToShardStore{
Hub: hub,
Storage: stg,
@@ -175,7 +176,7 @@ func NewToShardStore(hub cdssdk.Hub, stg cdssdk.Storage, strIdx StreamIndex, fil
}
}

func NewToShardStoreWithRange(hub cdssdk.Hub, stg cdssdk.Storage, streamIndex StreamIndex, fileHashStoreKey string, rng exec.Range) *ToShardStore {
func NewToShardStoreWithRange(hub cdssdk.Hub, stg stgmod.StorageDetail, streamIndex StreamIndex, fileHashStoreKey string, rng exec.Range) *ToShardStore {
return &ToShardStore{
Hub: hub,
Storage: stg,


+ 2
- 2
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -246,8 +246,8 @@ func (n *MultipartUploadNode) UploadResultVar() *dag.ValueVar {
return n.OutputValues().Get(0)
}

func (n *MultipartUploadNode) PartStreamSlot() dag.ValueInputSlot {
return dag.ValueInputSlot{
func (n *MultipartUploadNode) PartStreamSlot() dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: n,
Index: 0,
}


+ 20
- 7
common/pkgs/ioswitch2/ops2/segment.go View File

@@ -136,8 +136,15 @@ func (b *GraphNodeBuilder) NewSegmentSplit(segments []int64) *SegmentSplitNode {
return node
}

func (n *SegmentSplitNode) SetInput(input *dag.StreamVar) {
input.To(n, 0)
func (n *SegmentSplitNode) InputSlot() dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: n,
Index: 0,
}
}

func (n *SegmentSplitNode) SegmentVar(index int) *dag.StreamVar {
return n.OutputStreams().Get(index)
}

func (t *SegmentSplitNode) RemoveAllStream() {
@@ -159,18 +166,24 @@ func (n *SegmentSplitNode) GenerateOp() (exec.Op, error) {

type SegmentJoinNode struct {
dag.NodeBase
Segments []int64
}

func (b *GraphNodeBuilder) NewSegmentJoin(segmentSizes []int64) *SegmentJoinNode {
node := &SegmentJoinNode{}
func (b *GraphNodeBuilder) NewSegmentJoin(segments []int64) *SegmentJoinNode {
node := &SegmentJoinNode{
Segments: segments,
}
b.AddNode(node)
node.InputStreams().Init(len(segmentSizes))
node.InputStreams().Init(len(segments))
node.OutputStreams().Init(node, 1)
return node
}

func (n *SegmentJoinNode) SetInput(index int, input *dag.StreamVar) {
input.To(n, index)
func (n *SegmentJoinNode) InputSlot(index int) dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: n,
Index: index,
}
}

func (n *SegmentJoinNode) RemoveAllInputs() {


+ 5
- 4
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -10,6 +10,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
@@ -159,14 +160,14 @@ func (t *ShardReadNode) GenerateOp() (exec.Op, error) {
type ShardWriteNode struct {
dag.NodeBase
To ioswitch2.To
StorageID cdssdk.StorageID
Storage stgmod.StorageDetail
FileHashStoreKey string
}

func (b *GraphNodeBuilder) NewShardWrite(to ioswitch2.To, stgID cdssdk.StorageID, fileHashStoreKey string) *ShardWriteNode {
func (b *GraphNodeBuilder) NewShardWrite(to ioswitch2.To, stg stgmod.StorageDetail, fileHashStoreKey string) *ShardWriteNode {
node := &ShardWriteNode{
To: to,
StorageID: stgID,
Storage: stg,
FileHashStoreKey: fileHashStoreKey,
}
b.AddNode(node)
@@ -199,7 +200,7 @@ func (t *ShardWriteNode) GenerateOp() (exec.Op, error) {
return &ShardWrite{
Input: t.InputStreams().Get(0).VarID,
FileHash: t.OutputValues().Get(0).VarID,
StorageID: t.StorageID,
StorageID: t.Storage.Storage.StorageID,
}, nil
}



+ 100
- 6
common/pkgs/ioswitch2/parser/parser.go View File

@@ -13,6 +13,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
)

type IndexedStream struct {
@@ -104,6 +105,7 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {

// 下面这些只需要执行一次,但需要按顺序
removeUnusedFromNode(&ctx)
useMultipartUploadToShardStore(&ctx)
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
generateRange(&ctx)
@@ -242,7 +244,7 @@ func extend(ctx *ParseContext) error {
// 同上
if ctx.UseSegment {
splitNode := ctx.DAG.NewSegmentSplit(ctx.Ft.SegmentParam.Segments)
splitNode.SetInput(frNode.Output().Var())
frNode.Output().Var().ToSlot(splitNode.InputSlot())
for i := 0; i < len(ctx.Ft.SegmentParam.Segments); i++ {
ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: splitNode.Segment(i),
@@ -296,7 +298,7 @@ func extend(ctx *ParseContext) error {
for i := 0; i < ctx.Ft.SegmentParam.SegmentCount(); i++ {
str := findOutputStream(ctx, ioswitch2.SegmentStream(i))
if str != nil {
joinNode.SetInput(i, str)
str.ToSlot(joinNode.InputSlot(i))
}
}
ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
@@ -451,7 +453,7 @@ func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) {
func buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) {
switch t := t.(type) {
case *ioswitch2.ToShardStore:
n := ctx.DAG.NewShardWrite(t, t.Storage.StorageID, t.FileHashStoreKey)
n := ctx.DAG.NewShardWrite(t, t.Storage, t.FileHashStoreKey)

if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
return nil, err
@@ -546,8 +548,14 @@ func fixSegmentJoin(ctx *ParseContext) error {
startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end)

// 关闭超出范围的分段
node.InputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
node.InputStreams().Slots.RemoveRange(0, startSeg)
node.OutputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
node.OutputStreams().Slots.RemoveRange(0, startSeg)
node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg)

// StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离
startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg)
node.Segments[0] -= ctx.StreamRange.Offset - startSegStart

// 检查一下必须的分段是否都被加入到Join中
for i := 0; i < node.InputStreams().Len(); i++ {
@@ -845,7 +853,7 @@ func removeUnusedFromNode(ctx *ParseContext) {
return true
}

if node.Output().Var == nil {
if node.Output().Var() == nil {
ctx.DAG.RemoveNode(node)
}
return true
@@ -866,6 +874,92 @@ func dropUnused(ctx *ParseContext) {
})
}

// 将SegmentJoin指令替换成分片上传指令
func useMultipartUploadToShardStore(ctx *ParseContext) {
dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(joinNode *ops2.SegmentJoinNode) bool {
if joinNode.Joined().Dst.Len() != 1 {
return true
}

joinDst := joinNode.Joined().Dst.Get(0)
shardNode, ok := joinDst.(*ops2.ShardWriteNode)
if !ok {
return true
}

// Join的目的地必须支持TempStore和MultipartUpload功能才能替换成分片上传
if utils.FindFeature[*cdssdk.TempStore](shardNode.Storage) == nil {
return true
}

multiUpload := utils.FindFeature[*cdssdk.MultipartUploadFeature](shardNode.Storage)
if multiUpload == nil {
return true
}

// Join的每一个段的大小必须超过最小分片大小。
// 目前只支持拆分超过最大分片的流,不支持合并多个小段流以达到最小分片大小。
for _, size := range joinNode.Segments {
if size < multiUpload.MinPartSize {
return true
}
}

initNode := ctx.DAG.NewMultipartInitiator(shardNode.Storage.Storage.StorageID)
initNode.Env().CopyFrom(shardNode.Env())

partNumber := 1
for i, size := range joinNode.Segments {
joinInput := joinNode.InputSlot(i)
joinInput.Var().NotTo(joinNode)

if size > multiUpload.MaxPartSize {
// 如果一个分段的大小大于最大分片大小,则需要拆分为多个小段上传
// 拆分以及上传指令直接在流的产生节点执行
splits := math2.SplitLessThan(size, multiUpload.MaxPartSize)
splitNode := ctx.DAG.NewSegmentSplit(splits)
splitNode.Env().CopyFrom(joinInput.Var().Src.Env())

joinInput.Var().ToSlot(splitNode.InputSlot())

for i2 := 0; i2 < len(splits); i2++ {
uploadNode := ctx.DAG.NewMultipartUpload(shardNode.Storage, partNumber, splits[i2])
uploadNode.Env().CopyFrom(joinInput.Var().Src.Env())

initNode.UploadArgsVar().ToSlot(uploadNode.UploadArgsSlot())
splitNode.SegmentVar(i2).ToSlot(uploadNode.PartStreamSlot())
uploadNode.UploadResultVar().ToSlot(initNode.AppendPartInfoSlot())

partNumber++
}
} else {
// 否则直接上传整个分段
uploadNode := ctx.DAG.NewMultipartUpload(shardNode.Storage, partNumber, size)
// 上传指令直接在流的产生节点执行
uploadNode.Env().CopyFrom(joinInput.Var().Src.Env())

initNode.UploadArgsVar().ToSlot(uploadNode.UploadArgsSlot())
joinInput.Var().ToSlot(uploadNode.PartStreamSlot())
uploadNode.UploadResultVar().ToSlot(initNode.AppendPartInfoSlot())

partNumber++
}
}

bypassNode := ctx.DAG.NewBypassToShardStore(shardNode.Storage.Storage.StorageID)
bypassNode.Env().CopyFrom(shardNode.Env())

// 分片上传Node产生的结果送到bypassNode,bypassNode将处理结果再送回分片上传Node
initNode.BypassFileInfoVar().ToSlot(bypassNode.BypassFileInfoSlot())
bypassNode.BypassCallbackVar().ToSlot(initNode.BypassCallbackSlot())

// 最后删除Join指令和ToShardStore指令
ctx.DAG.RemoveNode(joinNode)
ctx.DAG.RemoveNode(shardNode)
return true
})
}

// 为IPFS写入指令存储结果
func storeIPFSWriteResult(ctx *ParseContext) {
dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool {


+ 1
- 1
common/pkgs/uploader/create_load.go View File

@@ -44,7 +44,7 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromExec)
for _, stg := range u.targetStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, ioswitch2.RawStream(), "fileHash"))
ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "fileHash"))
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, u.userID, u.pkg.PackageID, path))
stgIDs = append(stgIDs, stg.Storage.StorageID)
}


+ 1
- 1
common/pkgs/uploader/update.go View File

@@ -44,7 +44,7 @@ func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error

ft := ioswitch2.NewFromTo()
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg.Storage, ioswitch2.RawStream(), "fileHash"))
ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash"))

plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)


+ 6
- 6
scanner/internal/event/check_package_redundancy.go View File

@@ -464,7 +464,7 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object
ft := ioswitch2.NewFromTo()
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream()))
for i, stg := range uploadStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
}

plans := exec.NewPlanBuilder()
@@ -515,7 +515,7 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD
ft.ECParam = red
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream()))
for i := 0; i < red.N; i++ {
ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i)))
ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i)))
}
plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)
@@ -615,7 +615,7 @@ func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.Object
ft.SegmentParam = red
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream()))
for i, stg := range uploadStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i)))
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i)))
}

plans := exec.NewPlanBuilder()
@@ -668,7 +668,7 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD
ft := ioswitch2.NewFromTo()
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream()))
for i, stg := range uploadStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
}

plans := exec.NewPlanBuilder()
@@ -749,7 +749,7 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe
}

len := obj.Object.Size
ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), exec.Range{
ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), exec.Range{
Offset: 0,
Length: &len,
}))
@@ -845,7 +845,7 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet
}

// 输出只需要自己要保存的那一块
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i)))
ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i)))

err := parser.Parse(ft, planBlder)
if err != nil {


+ 2
- 2
scanner/internal/event/clean_pinned.go View File

@@ -743,7 +743,7 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st
fromStg := allStgInfos[obj.Blocks[0].StorageID]
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, fromStg.Storage, ioswitch2.RawStream()))
toStg := allStgInfos[solu.blockList[i].StorageID]
ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, toStg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d.0", obj.Object.ObjectID)))
ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, *toStg, ioswitch2.RawStream(), fmt.Sprintf("%d.0", obj.Object.ObjectID)))

err := parser.Parse(ft, planBld)
if err != nil {
@@ -802,7 +802,7 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg
ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *allStgInfos[id].MasterHub, allStgInfos[id].Storage, ioswitch2.RawStream()))

for _, i := range *idxs {
ft.AddTo(ioswitch2.NewToShardStore(*allStgInfos[id].MasterHub, allStgInfos[id].Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d.%d", obj.Object.ObjectID, i)))
ft.AddTo(ioswitch2.NewToShardStore(*allStgInfos[id].MasterHub, *allStgInfos[id], ioswitch2.ECSrteam(i), fmt.Sprintf("%d.%d", obj.Object.ObjectID, i)))
}

err := parser.Parse(ft, planBld)


Loading…
Cancel
Save