From 0b6d8f78578a491d4a78b5160f54fcd1ef6b5651 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 20 Aug 2024 09:41:50 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E5=91=BD=E5=90=8D=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=EF=BC=8C=E8=A7=A3=E5=86=B3=E7=BC=96=E8=AF=91=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/grpc/io.go | 2 +- client/internal/cmdline/test.go | 16 +++-- common/pkgs/cmd/upload_objects.go | 14 ++-- common/pkgs/downloader/iterator.go | 15 ++-- common/pkgs/downloader/strip_iterator.go | 14 ++-- .../{ioswitch => ioswitch2}/agent_worker.go | 2 +- common/pkgs/{ioswitch => ioswitch2}/fromto.go | 4 +- .../pkgs/{ioswitch => ioswitch2}/ioswitch.go | 2 +- .../{ioswitch => ioswitch2}/ops2/chunked.go | 6 +- .../{ioswitch => ioswitch2}/ops2/clone.go | 0 .../pkgs/{ioswitch => ioswitch2}/ops2/ec.go | 8 +-- .../pkgs/{ioswitch => ioswitch2}/ops2/file.go | 4 +- .../pkgs/{ioswitch => ioswitch2}/ops2/ipfs.go | 6 +- .../pkgs/{ioswitch => ioswitch2}/ops2/join.go | 0 .../{ioswitch => ioswitch2}/ops2/length.go | 0 .../pkgs/{ioswitch => ioswitch2}/ops2/ops.go | 0 .../{ioswitch => ioswitch2}/ops2/range.go | 4 +- .../plans => ioswitch2/parser}/parser.go | 72 +++++++++---------- common/pkgs/{ioswitch => ioswitch2}/utils.go | 2 +- .../event/check_package_redundancy.go | 34 ++++----- scanner/internal/event/clean_pinned.go | 18 ++--- 21 files changed, 117 insertions(+), 106 deletions(-) rename common/pkgs/{ioswitch => ioswitch2}/agent_worker.go (98%) rename common/pkgs/{ioswitch => ioswitch2}/fromto.go (96%) rename common/pkgs/{ioswitch => ioswitch2}/ioswitch.go (96%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/chunked.go (95%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/clone.go (100%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/ec.go (95%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/file.go (95%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/ipfs.go (94%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/join.go (100%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/length.go (100%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/ops.go (100%) rename common/pkgs/{ioswitch => ioswitch2}/ops2/range.go (94%) rename common/pkgs/{ioswitch/plans => ioswitch2/parser}/parser.go (89%) rename common/pkgs/{ioswitch => ioswitch2}/utils.go (94%) diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index b708751..d311486 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -27,7 +27,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe s.swWorker.Add(sw) defer s.swWorker.Remove(sw) - err = sw.Run(ctx) + _, err = sw.Run(ctx) if err != nil { return nil, fmt.Errorf("running io plan: %w", err) } diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 6bb6ac4..0e8d167 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -7,10 +7,12 @@ import ( "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -32,7 +34,7 @@ func init() { panic(err) } - ft := plans.NewFromTo() + ft := ioswitch2.NewFromTo() // ft.AddFrom(plans.NewFromNode("Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", &nodes.Nodes[0], -1)) // ft.AddTo(plans.NewToNode(nodes.Nodes[1], -1, "asd")) @@ -43,10 +45,10 @@ func init() { // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 1, "1")) // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 2, "2")) - ft.AddFrom(plans.NewFromNode("QmS2s8GRYHEurXL7V1zUtKvf2H1BGcQc5NN1T1hiSnWvbd", &nodes.Nodes[0], 1)) - ft.AddFrom(plans.NewFromNode("QmUgUEUMzdnjPNx6xu9PDGXpSyXTk8wzPWvyYZ9zasE1WW", &nodes.Nodes[1], 2)) + ft.AddFrom(ioswitch2.NewFromNode("QmS2s8GRYHEurXL7V1zUtKvf2H1BGcQc5NN1T1hiSnWvbd", &nodes.Nodes[0], 1)) + ft.AddFrom(ioswitch2.NewFromNode("QmUgUEUMzdnjPNx6xu9PDGXpSyXTk8wzPWvyYZ9zasE1WW", &nodes.Nodes[1], 2)) le := int64(3) - toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{Offset: 5, Length: &le}) + toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{Offset: 5, Length: &le}) // toExec, hd := plans.NewToExecutorWithRange(1, plans.Range{Offset: 0, Length: nil}) // toExec2, hd2 := plans.NewToExecutorWithRange(2, plans.Range{Offset: 0, Length: nil}) ft.AddTo(toExec) @@ -56,9 +58,9 @@ func init() { // ft.AddFrom(fromExec) // ft.AddTo(plans.NewToNode(nodes.Nodes[1], -1, "asd")) - parser := plans.NewParser(cdssdk.DefaultECRedundancy) + parser := parser.NewParser(cdssdk.DefaultECRedundancy) - plans := plans.NewPlanBuilder() + plans := exec.NewPlanBuilder() err = parser.Parse(ft, plans) if err != nil { panic(err) diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index b9c1ca1..a1e39e4 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -11,6 +11,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sort2" @@ -18,7 +19,8 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -238,12 +240,12 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { } func uploadToNode(file io.Reader, node cdssdk.Node) (string, error) { - ft := plans.NewFromTo() - fromExec, hd := plans.NewFromExecutor(-1) - ft.AddFrom(fromExec).AddTo(plans.NewToNode(node, -1, "fileHash")) + ft := ioswitch2.NewFromTo() + fromExec, hd := ioswitch2.NewFromDriver(-1) + ft.AddFrom(fromExec).AddTo(ioswitch2.NewToNode(node, -1, "fileHash")) - parser := plans.NewParser(cdssdk.DefaultECRedundancy) - plans := plans.NewPlanBuilder() + parser := parser.NewParser(cdssdk.DefaultECRedundancy) + plans := exec.NewPlanBuilder() err := parser.Parse(ft, plans) if err != nil { return "", fmt.Errorf("parsing plan: %w", err) diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index d43a202..b25c59c 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -22,7 +22,8 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -364,21 +365,21 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 { func (iter *DownloadObjectIterator) downloadFromNode(node *cdssdk.Node, req downloadReqeust2) (io.ReadCloser, error) { var strHandle *exec.DriverReadStream - ft := plans.NewFromTo() + ft := ioswitch2.NewFromTo() - toExec, handle := plans.NewToExecutor(-1) - toExec.Range = plans.Range{ + toExec, handle := ioswitch2.NewToDriver(-1) + toExec.Range = exec.Range{ Offset: req.Raw.Offset, } if req.Raw.Length != -1 { len := req.Raw.Length toExec.Range.Length = &len } - ft.AddFrom(plans.NewFromNode(req.Detail.Object.FileHash, node, -1)).AddTo(toExec) + ft.AddFrom(ioswitch2.NewFromNode(req.Detail.Object.FileHash, node, -1)).AddTo(toExec) strHandle = handle - parser := plans.NewParser(cdssdk.DefaultECRedundancy) - plans := plans.NewPlanBuilder() + parser := parser.NewParser(cdssdk.DefaultECRedundancy) + plans := exec.NewPlanBuilder() if err := parser.Parse(ft, plans); err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index 416cab9..358449a 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -5,11 +5,13 @@ import ( "io" "sync" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" ) type downloadBlock struct { @@ -108,18 +110,18 @@ func (s *StripIterator) Close() { } func (s *StripIterator) downloading() { - ft := plans.NewFromTo() + ft := ioswitch2.NewFromTo() for _, b := range s.blocks { - ft.AddFrom(plans.NewFromNode(b.Block.FileHash, &b.Node, b.Block.Index)) + ft.AddFrom(ioswitch2.NewFromNode(b.Block.FileHash, &b.Node, b.Block.Index)) } - toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{ + toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{ Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K), }) ft.AddTo(toExec) - parser := plans.NewParser(*s.red) - plans := plans.NewPlanBuilder() + parser := parser.NewParser(*s.red) + plans := exec.NewPlanBuilder() err := parser.Parse(ft, plans) if err != nil { s.sendToDataChan(dataChanEntry{Error: err}) diff --git a/common/pkgs/ioswitch/agent_worker.go b/common/pkgs/ioswitch2/agent_worker.go similarity index 98% rename from common/pkgs/ioswitch/agent_worker.go rename to common/pkgs/ioswitch2/agent_worker.go index a5f5b0e..61793ae 100644 --- a/common/pkgs/ioswitch/agent_worker.go +++ b/common/pkgs/ioswitch2/agent_worker.go @@ -1,4 +1,4 @@ -package ioswitch +package ioswitch2 import ( "context" diff --git a/common/pkgs/ioswitch/fromto.go b/common/pkgs/ioswitch2/fromto.go similarity index 96% rename from common/pkgs/ioswitch/fromto.go rename to common/pkgs/ioswitch2/fromto.go index 41a8996..dffb742 100644 --- a/common/pkgs/ioswitch/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -1,4 +1,4 @@ -package ioswitch +package ioswitch2 import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" @@ -89,7 +89,7 @@ func NewToDriver(dataIndex int) (*ToDriver, *exec.DriverReadStream) { }, &str } -func NewToExecutorWithRange(dataIndex int, rng exec.Range) (*ToDriver, *exec.DriverReadStream) { +func NewToDriverWithRange(dataIndex int, rng exec.Range) (*ToDriver, *exec.DriverReadStream) { str := exec.DriverReadStream{} return &ToDriver{ Handle: &str, diff --git a/common/pkgs/ioswitch/ioswitch.go b/common/pkgs/ioswitch2/ioswitch.go similarity index 96% rename from common/pkgs/ioswitch/ioswitch.go rename to common/pkgs/ioswitch2/ioswitch.go index b59d1db..48b43ee 100644 --- a/common/pkgs/ioswitch/ioswitch.go +++ b/common/pkgs/ioswitch2/ioswitch.go @@ -1,4 +1,4 @@ -package ioswitch +package ioswitch2 import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" diff --git a/common/pkgs/ioswitch/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go similarity index 95% rename from common/pkgs/ioswitch/ops2/chunked.go rename to common/pkgs/ioswitch2/ops2/chunked.go index 1ac206c..f9d7b0a 100644 --- a/common/pkgs/ioswitch/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "golang.org/x/sync/semaphore" ) @@ -89,7 +89,7 @@ type ChunkedSplitType struct { func (t *ChunkedSplitType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, &ioswitch.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) } } @@ -115,7 +115,7 @@ type ChunkedJoinType struct { func (t *ChunkedJoinType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, t.InputCount) - dag.NodeNewOutputStream(node, &ioswitch.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) } func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitch/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go similarity index 100% rename from common/pkgs/ioswitch/ops2/clone.go rename to common/pkgs/ioswitch2/ops2/clone.go diff --git a/common/pkgs/ioswitch/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go similarity index 95% rename from common/pkgs/ioswitch/ops2/ec.go rename to common/pkgs/ioswitch2/ops2/ec.go index 00cf792..76e6c22 100644 --- a/common/pkgs/ioswitch/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -13,7 +13,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "golang.org/x/sync/semaphore" ) @@ -204,10 +204,10 @@ func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) { var inputIdxs []int var outputIdxs []int for _, in := range op.InputStreams { - inputIdxs = append(inputIdxs, ioswitch.SProps(in).StreamIndex) + inputIdxs = append(inputIdxs, ioswitch2.SProps(in).StreamIndex) } for _, out := range op.OutputStreams { - outputIdxs = append(outputIdxs, ioswitch.SProps(out).StreamIndex) + outputIdxs = append(outputIdxs, ioswitch2.SProps(out).StreamIndex) } rs, err := ec.NewRs(t.EC.K, t.EC.N) @@ -233,7 +233,7 @@ func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar) { } func (t *MultiplyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { - return dag.NodeNewOutputStream(node, &ioswitch.VarProps{StreamIndex: dataIndex}) + return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex}) } func (t *MultiplyType) String(node *dag.Node) string { diff --git a/common/pkgs/ioswitch/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go similarity index 95% rename from common/pkgs/ioswitch/ops2/file.go rename to common/pkgs/ioswitch2/ops2/file.go index ef2d87f..cc6f46c 100644 --- a/common/pkgs/ioswitch/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -11,7 +11,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -77,7 +77,7 @@ type FileReadType struct { } func (t *FileReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) } func (t *FileReadType) GenerateOp(op *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitch/ops2/ipfs.go b/common/pkgs/ioswitch2/ops2/ipfs.go similarity index 94% rename from common/pkgs/ioswitch/ops2/ipfs.go rename to common/pkgs/ioswitch2/ops2/ipfs.go index c2c05bf..fa02b6d 100644 --- a/common/pkgs/ioswitch/ops2/ipfs.go +++ b/common/pkgs/ioswitch2/ops2/ipfs.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -92,7 +92,7 @@ type IPFSReadType struct { } func (t *IPFSReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) } func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) { @@ -114,7 +114,7 @@ type IPFSWriteType struct { func (t *IPFSWriteType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, &ioswitch.VarProps{}) + dag.NodeNewOutputValue(node, &ioswitch2.VarProps{}) } func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitch/ops2/join.go b/common/pkgs/ioswitch2/ops2/join.go similarity index 100% rename from common/pkgs/ioswitch/ops2/join.go rename to common/pkgs/ioswitch2/ops2/join.go diff --git a/common/pkgs/ioswitch/ops2/length.go b/common/pkgs/ioswitch2/ops2/length.go similarity index 100% rename from common/pkgs/ioswitch/ops2/length.go rename to common/pkgs/ioswitch2/ops2/length.go diff --git a/common/pkgs/ioswitch/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go similarity index 100% rename from common/pkgs/ioswitch/ops2/ops.go rename to common/pkgs/ioswitch2/ops2/ops.go diff --git a/common/pkgs/ioswitch/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go similarity index 94% rename from common/pkgs/ioswitch/ops2/range.go rename to common/pkgs/ioswitch2/ops2/range.go index a269f5f..383b3c0 100644 --- a/common/pkgs/ioswitch/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -78,7 +78,7 @@ type RangeType struct { func (t *RangeType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, &ioswitch.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) } func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitch/plans/parser.go b/common/pkgs/ioswitch2/parser/parser.go similarity index 89% rename from common/pkgs/ioswitch/plans/parser.go rename to common/pkgs/ioswitch2/parser/parser.go index b6873aa..bd6aed1 100644 --- a/common/pkgs/ioswitch/plans/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -1,4 +1,4 @@ -package plans +package parser import ( "fmt" @@ -12,8 +12,8 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" ) type DefaultParser struct { @@ -27,14 +27,14 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { } type ParseContext struct { - Ft ioswitch.FromTo + Ft ioswitch2.FromTo DAG *dag.Graph // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 StreamRange exec.Range } -func (p *DefaultParser) Parse(ft ioswitch.FromTo, blder *exec.PlanBuilder) error { +func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { ctx := ParseContext{Ft: ft} // 分成两个阶段: @@ -88,7 +88,7 @@ func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *da var ret *dag.StreamVar ctx.DAG.Walk(func(n *dag.Node) bool { for _, o := range n.OutputStreams { - if o != nil && ioswitch.SProps(o).StreamIndex == streamIndex { + if o != nil && ioswitch2.SProps(o).StreamIndex == streamIndex { ret = o return false } @@ -134,7 +134,7 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { ctx.StreamRange = rng } -func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch.FromTo) error { +func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch2.FromTo) error { for _, fr := range ft.Froms { _, err := p.buildFromNode(ctx, &ft, fr) if err != nil { @@ -143,9 +143,9 @@ func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch.FromTo) error { // 对于完整文件的From,生成Split指令 if fr.GetDataIndex() == -1 { - n, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch.NodeProps{}) + n, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{}) for i := 0; i < p.EC.K; i++ { - ioswitch.SProps(n.OutputStreams[i]).StreamIndex = i + ioswitch2.SProps(n.OutputStreams[i]).StreamIndex = i } } } @@ -155,7 +155,7 @@ func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch.FromTo) error { loop: for _, o := range ctx.DAG.Nodes { for _, s := range o.OutputStreams { - prop := ioswitch.SProps(s) + prop := ioswitch2.SProps(s) if prop.StreamIndex >= 0 && ecInputStrs[prop.StreamIndex] == nil { ecInputStrs[prop.StreamIndex] = s if len(ecInputStrs) == p.EC.K { @@ -167,7 +167,7 @@ loop: if len(ecInputStrs) == p.EC.K { mulNode, mulType := dag.NewNode(ctx.DAG, &ops2.MultiplyType{ EC: p.EC, - }, &ioswitch.NodeProps{}) + }, &ioswitch2.NodeProps{}) for _, s := range ecInputStrs { mulType.AddInput(mulNode, s) @@ -179,13 +179,13 @@ loop: joinNode, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedJoinType{ InputCount: p.EC.K, ChunkSize: p.EC.ChunkSize, - }, &ioswitch.NodeProps{}) + }, &ioswitch2.NodeProps{}) for i := 0; i < p.EC.K; i++ { // 不可能找不到流 p.findOutputStream(ctx, i).To(joinNode, i) } - ioswitch.SProps(joinNode.OutputStreams[0]).StreamIndex = -1 + ioswitch2.SProps(joinNode.OutputStreams[0]).StreamIndex = -1 } // 为每一个To找到一个输入流 @@ -206,7 +206,7 @@ loop: return nil } -func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch.FromTo, f ioswitch.From) (*dag.Node, error) { +func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f ioswitch2.From) (*dag.Node, error) { var repRange exec.Range var blkRange exec.Range @@ -221,17 +221,17 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch.FromTo, f } switch f := f.(type) { - case *ioswitch.FromNode: + case *ioswitch2.FromNode: n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{ FileHash: f.FileHash, Option: ipfs.ReadOption{ Offset: 0, Length: -1, }, - }, &ioswitch.NodeProps{ + }, &ioswitch2.NodeProps{ From: f, }) - ioswitch.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { t.Option.Offset = repRange.Offset @@ -246,15 +246,15 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch.FromTo, f } if f.Node != nil { - n.Env.ToEnvWorker(&ioswitch.AgentWorker{Node: *f.Node}) + n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) } return n, nil - case *ioswitch.FromDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch.NodeProps{From: f}) - n.Env.ToEnvExecutor() - ioswitch.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + case *ioswitch2.FromDriver: + n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch2.NodeProps{From: f}) + n.Env.ToEnvDriver() + ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { f.Handle.RangeHint.Offset = repRange.Offset @@ -271,21 +271,21 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch.FromTo, f } } -func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch.FromTo, t ioswitch.To) (*dag.Node, error) { +func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t ioswitch2.To) (*dag.Node, error) { switch t := t.(type) { - case *ioswitch.ToNode: + case *ioswitch2.ToNode: n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{ FileHashStoreKey: t.FileHashStoreKey, Range: t.Range, - }, &ioswitch.NodeProps{ + }, &ioswitch2.NodeProps{ To: t, }) return n, nil - case *ioswitch.ToDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch.NodeProps{To: t}) - n.Env.ToEnvExecutor() + case *ioswitch2.ToDriver: + n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch2.NodeProps{To: t}) + n.Env.ToEnvDriver() return n, nil @@ -480,7 +480,7 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { ctx.DAG.Walk(func(node *dag.Node) bool { for _, out := range node.OutputStreams { if len(out.Toes) == 0 { - n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitch.NodeProps{}) + n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitch2.NodeProps{}) n.Env = node.Env out.To(n, 0) } @@ -498,8 +498,8 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { n := ctx.DAG.NewNode(&ops.StoreType{ StoreKey: typ.FileHashStoreKey, - }, &ioswitch.NodeProps{}) - n.Env.ToEnvExecutor() + }, &ioswitch2.NodeProps{}) + n.Env.ToEnvDriver() node.OutputValues[0].To(n, 0) return true @@ -509,7 +509,7 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func (p *DefaultParser) generateRange(ctx *ParseContext) { ctx.DAG.Walk(func(node *dag.Node) bool { - props := ioswitch.NProps(node) + props := ioswitch2.NProps(node) if props.To == nil { return true } @@ -523,7 +523,7 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }, - }, &ioswitch.NodeProps{}) + }, &ioswitch2.NodeProps{}) n.Env = node.InputStreams[0].From.Node.Env node.InputStreams[0].To(n, 0) @@ -541,7 +541,7 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { Offset: toRng.Offset - blkStart, Length: toRng.Length, }, - }, &ioswitch.NodeProps{}) + }, &ioswitch2.NodeProps{}) n.Env = node.InputStreams[0].From.Node.Env node.InputStreams[0].To(n, 0) @@ -561,7 +561,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch.NodeProps{}) + n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch2.NodeProps{}) n.Env = node.Env for _, to := range out.Toes { t.NewOutput(node).To(to.Node, to.SlotIndex) @@ -575,7 +575,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch.NodeProps{}) + n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch2.NodeProps{}) n.Env = node.Env for _, to := range out.Toes { t.NewOutput(node).To(to.Node, to.SlotIndex) diff --git a/common/pkgs/ioswitch/utils.go b/common/pkgs/ioswitch2/utils.go similarity index 94% rename from common/pkgs/ioswitch/utils.go rename to common/pkgs/ioswitch2/utils.go index e0cdb1d..ad3cb20 100644 --- a/common/pkgs/ioswitch/utils.go +++ b/common/pkgs/ioswitch2/utils.go @@ -1,4 +1,4 @@ -package ioswitch +package ioswitch2 import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 81b72f7..066495e 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -7,13 +7,15 @@ import ( "time" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sort2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" @@ -413,13 +415,13 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E return nil, fmt.Errorf("requesting to get nodes: %w", err) } - ft := plans.NewFromTo() - ft.AddFrom(plans.NewFromNode(obj.Object.FileHash, &getNodes.Nodes[0], -1)) + ft := ioswitch2.NewFromTo() + ft.AddFrom(ioswitch2.NewFromNode(obj.Object.FileHash, &getNodes.Nodes[0], -1)) for i := 0; i < red.N; i++ { - ft.AddTo(plans.NewToNode(uploadNodes[i].Node, i, fmt.Sprintf("%d", i))) + ft.AddTo(ioswitch2.NewToNode(uploadNodes[i].Node, i, fmt.Sprintf("%d", i))) } - parser := plans.NewParser(*red) - plans := plans.NewPlanBuilder() + parser := parser.NewParser(*red) + plans := exec.NewPlanBuilder() err = parser.Parse(ft, plans) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) @@ -515,17 +517,17 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID }) // 每个被选节点都在自己节点上重建原始数据 - parser := plans.NewParser(*srcRed) - planBlder := plans.NewPlanBuilder() + parser := parser.NewParser(*srcRed) + planBlder := exec.NewPlanBuilder() for i := range uploadNodes { - ft := plans.NewFromTo() + ft := ioswitch2.NewFromTo() for _, block := range chosenBlocks { - ft.AddFrom(plans.NewFromNode(block.FileHash, &uploadNodes[i].Node, block.Index)) + ft.AddFrom(ioswitch2.NewFromNode(block.FileHash, &uploadNodes[i].Node, block.Index)) } len := obj.Object.Size - ft.AddTo(plans.NewToNodeWithRange(uploadNodes[i].Node, -1, fmt.Sprintf("%d", i), plans.Range{ + ft.AddTo(ioswitch2.NewToNodeWithRange(uploadNodes[i].Node, -1, fmt.Sprintf("%d", i), exec.Range{ Offset: 0, Length: &len, })) @@ -583,8 +585,8 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. } // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 - parser := plans.NewParser(*srcRed) - planBlder := plans.NewPlanBuilder() + parser := parser.NewParser(*srcRed) + planBlder := exec.NewPlanBuilder() var newBlocks []stgmod.ObjectBlock shouldUpdateBlocks := false @@ -608,13 +610,13 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. // 否则就要重建出这个节点需要的块 - ft := plans.NewFromTo() + ft := ioswitch2.NewFromTo() for _, block := range chosenBlocks { - ft.AddFrom(plans.NewFromNode(block.FileHash, &node.Node, block.Index)) + ft.AddFrom(ioswitch2.NewFromNode(block.FileHash, &node.Node, block.Index)) } // 输出只需要自己要保存的那一块 - ft.AddTo(plans.NewToNode(node.Node, i, fmt.Sprintf("%d", i))) + ft.AddTo(ioswitch2.NewToNode(node.Node, i, fmt.Sprintf("%d", i))) err := parser.Parse(ft, planBlder) if err != nil { diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index a20ef87..4cb52cb 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -10,6 +10,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" @@ -19,7 +20,8 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" @@ -106,7 +108,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } } - planBld := plans.NewPlanBuilder() + planBld := exec.NewPlanBuilder() pinPlans := make(map[cdssdk.NodeID]*[]string) plnningNodeIDs := make(map[cdssdk.NodeID]bool) @@ -750,7 +752,7 @@ func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.O return entry } -func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder, planningNodeIDs map[cdssdk.NodeID]bool) coormq.UpdatingObjectRedundancy { +func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *exec.PlanBuilder, planningNodeIDs map[cdssdk.NodeID]bool) coormq.UpdatingObjectRedundancy { entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: obj.Object.Redundancy, @@ -781,14 +783,14 @@ func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssd } ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) - parser := plans.NewParser(*ecRed) + parser := parser.NewParser(*ecRed) for id, idxs := range reconstrct { - ft := plans.NewFromTo() - ft.AddFrom(plans.NewFromNode(obj.Object.FileHash, allNodeInfos[id], -1)) + ft := ioswitch2.NewFromTo() + ft.AddFrom(ioswitch2.NewFromNode(obj.Object.FileHash, allNodeInfos[id], -1)) for _, i := range *idxs { - ft.AddTo(plans.NewToNode(*allNodeInfos[id], i, fmt.Sprintf("%d.%d", obj.Object.ObjectID, i))) + ft.AddTo(ioswitch2.NewToNode(*allNodeInfos[id], i, fmt.Sprintf("%d.%d", obj.Object.ObjectID, i))) } err := parser.Parse(ft, planBld) @@ -802,7 +804,7 @@ func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssd return entry } -func (t *CleanPinned) executePlans(execCtx ExecuteContext, pinPlans map[cdssdk.NodeID]*[]string, planBld *plans.PlanBuilder, plnningNodeIDs map[cdssdk.NodeID]bool) (map[string]any, error) { +func (t *CleanPinned) executePlans(execCtx ExecuteContext, pinPlans map[cdssdk.NodeID]*[]string, planBld *exec.PlanBuilder, plnningNodeIDs map[cdssdk.NodeID]bool) (map[string]any, error) { log := logger.WithType[CleanPinned]("Event") // 统一加锁,有重复也没关系