From ea3a7beff01cd7a30459db433602cc8daecc5772 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 27 Nov 2023 16:13:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ioswitch=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=8C=E5=87=86=E5=A4=87=E6=B5=8B=E8=AF=95=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/package.go | 2 +- common/assets/confs/agent.config.json | 2 +- common/models/models.go | 4 +- common/pkgs/cmd/create_ec_package.go | 2 +- common/pkgs/ec/stream_rs.go | 4 +- common/pkgs/ioswitch/ops/chunked_join.go | 49 +++++++++ common/pkgs/ioswitch/ops/chunked_split.go | 49 +++++++++ common/pkgs/ioswitch/ops/ec.go | 102 ++++++++++++++++++ common/pkgs/ioswitch/ops/file.go | 64 +++++++++++ common/pkgs/ioswitch/ops/grpc.go | 84 +++++++++++++++ common/pkgs/ioswitch/ops/ops.go | 118 +-------------------- common/pkgs/ioswitch/plans/agent_plan.go | 116 ++++++++++++++++++++ common/pkgs/ioswitch/plans/plan_builder.go | 88 +++++++-------- 13 files changed, 513 insertions(+), 171 deletions(-) create mode 100644 common/pkgs/ioswitch/ops/chunked_join.go create mode 100644 common/pkgs/ioswitch/ops/chunked_split.go create mode 100644 common/pkgs/ioswitch/ops/ec.go create mode 100644 common/pkgs/ioswitch/ops/file.go create mode 100644 common/pkgs/ioswitch/ops/grpc.go create mode 100644 common/pkgs/ioswitch/plans/agent_plan.go diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index a30d3e3..10b824c 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -185,7 +185,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin } } -func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int64, nodeAffinity []int64) error { +func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int, nodeAffinity []int64) error { rootPath = filepath.Clean(rootPath) var uploadFilePathes []string diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index b340adf..e706fd4 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -23,7 +23,7 @@ "vhost": "/" }, "ipfs": { - "port": 5001 + "address": "127.0.0.1:5001" }, "distlock": { "etcdAddress": "127.0.0.1:2379", diff --git a/common/models/models.go b/common/models/models.go index 6296953..641ee70 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -8,10 +8,10 @@ type EC struct { ID int64 `json:"id"` K int `json:"k"` N int `json:"n"` - ChunkSize int64 `json:"chunkSize"` + ChunkSize int `json:"chunkSize"` } -func NewEc(id int64, k int, n int, chunkSize int64) EC { +func NewEc(id int64, k int, n int, chunkSize int) EC { return EC{ ID: id, K: k, diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index fd77010..18b9a6a 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -193,7 +193,7 @@ func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeI } outputs := myio.ChunkedSplit(obj.File, ecInfo.ChunkSize, ecMod.EcK, myio.ChunkedSplitOption{ - FillZeros: true, + PaddingZeros: true, }) var readers []io.Reader for _, o := range outputs { diff --git a/common/pkgs/ec/stream_rs.go b/common/pkgs/ec/stream_rs.go index c1d68e1..df7e4b4 100644 --- a/common/pkgs/ec/stream_rs.go +++ b/common/pkgs/ec/stream_rs.go @@ -12,10 +12,10 @@ type Rs struct { ecN int ecK int ecP int - chunkSize int64 + chunkSize int } -func NewRs(k int, n int, chunkSize int64) (*Rs, error) { +func NewRs(k int, n int, chunkSize int) (*Rs, error) { enc := Rs{ ecN: n, ecK: k, diff --git a/common/pkgs/ioswitch/ops/chunked_join.go b/common/pkgs/ioswitch/ops/chunked_join.go new file mode 100644 index 0000000..691af52 --- /dev/null +++ b/common/pkgs/ioswitch/ops/chunked_join.go @@ -0,0 +1,49 @@ +package ops + +import ( + "context" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type ChunkedJoin struct { + InputIDs []ioswitch.StreamID `json:"inputIDs"` + OutputID ioswitch.StreamID `json:"outputID"` + ChunkSize int `json:"chunkSize"` +} + +func (o *ChunkedJoin) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + strs, err := sw.WaitStreams(planID, o.InputIDs...) + if err != nil { + return err + } + + var strReaders []io.Reader + for _, s := range strs { + strReaders = append(strReaders, s.Stream) + } + defer func() { + for _, str := range strs { + str.Stream.Close() + } + }() + + fut := future.NewSetVoid() + sw.StreamReady(planID, + ioswitch.NewStream(o.OutputID, + myio.AfterReadClosedOnce(myio.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { + fut.SetVoid() + }), + ), + ) + + fut.Wait(context.TODO()) + return nil +} + +func init() { + OpUnion.AddT((*ChunkedJoin)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/chunked_split.go b/common/pkgs/ioswitch/ops/chunked_split.go new file mode 100644 index 0000000..0c8eb96 --- /dev/null +++ b/common/pkgs/ioswitch/ops/chunked_split.go @@ -0,0 +1,49 @@ +package ops + +import ( + "io" + "sync" + + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type ChunkedSplit struct { + InputID ioswitch.StreamID `json:"inputID"` + OutputIDs []ioswitch.StreamID `json:"outputIDs"` + ChunkSize int `json:"chunkSize"` + StreamCount int `json:"streamCount"` + PaddingZeros bool `json:"paddingZeros"` +} + +func (o *ChunkedSplit) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + str, err := sw.WaitStreams(planID, o.InputID) + if err != nil { + return err + } + defer str[0].Stream.Close() + + wg := sync.WaitGroup{} + outputs := myio.ChunkedSplit(str[0].Stream, o.ChunkSize, o.StreamCount, myio.ChunkedSplitOption{ + PaddingZeros: o.PaddingZeros, + }) + + for i := range outputs { + wg.Add(1) + + sw.StreamReady(planID, ioswitch.NewStream( + o.OutputIDs[i], + myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + wg.Done() + }), + )) + } + + wg.Wait() + + return nil +} + +func init() { + OpUnion.AddT((*ChunkedSplit)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go new file mode 100644 index 0000000..bfeb776 --- /dev/null +++ b/common/pkgs/ioswitch/ops/ec.go @@ -0,0 +1,102 @@ +package ops + +import ( + "fmt" + "io" + "sync" + + myio "gitlink.org.cn/cloudream/common/utils/io" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type ECCompute struct { + EC stgmod.EC `json:"ec"` + InputIDs []ioswitch.StreamID `json:"inputIDs"` + OutputIDs []ioswitch.StreamID `json:"outputIDs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` + OutputBlockIndexes []int `json:"outputBlockIndexes"` +} + +func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + if err != nil { + return fmt.Errorf("new ec: %w", err) + } + + strs, err := sw.WaitStreams(planID, o.InputIDs...) + if err != nil { + return err + } + defer func() { + for _, s := range strs { + s.Stream.Close() + } + }() + + var inputs []io.Reader + for _, s := range strs { + inputs = append(inputs, s.Stream) + } + + outputs := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) + + wg := sync.WaitGroup{} + for i, id := range o.OutputIDs { + wg.Add(1) + sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + wg.Done() + }))) + } + wg.Wait() + + return nil +} + +type ECReconstruct struct { + EC stgmod.EC `json:"ec"` + InputIDs []ioswitch.StreamID `json:"inputIDs"` + OutputIDs []ioswitch.StreamID `json:"outputIDs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` +} + +func (o *ECReconstruct) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + if err != nil { + return fmt.Errorf("new ec: %w", err) + } + + strs, err := sw.WaitStreams(planID, o.InputIDs...) + if err != nil { + return err + } + defer func() { + for _, s := range strs { + s.Stream.Close() + } + }() + + var inputs []io.Reader + for _, s := range strs { + inputs = append(inputs, s.Stream) + } + + outputs := rs.ReconstructData(inputs, o.InputBlockIndexes) + + wg := sync.WaitGroup{} + for i, id := range o.OutputIDs { + wg.Add(1) + sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + wg.Done() + }))) + } + wg.Wait() + + return nil +} + +func init() { + OpUnion.AddT((*ECCompute)(nil)) + OpUnion.AddT((*ECReconstruct)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/file.go b/common/pkgs/ioswitch/ops/file.go new file mode 100644 index 0000000..0219a53 --- /dev/null +++ b/common/pkgs/ioswitch/ops/file.go @@ -0,0 +1,64 @@ +package ops + +import ( + "context" + "fmt" + "io" + "os" + + "gitlink.org.cn/cloudream/common/pkgs/future" + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type FileWrite struct { + InputID ioswitch.StreamID `json:"inputID"` + FilePath string `json:"filePath"` +} + +func (o *FileWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + str, err := sw.WaitStreams(planID, o.InputID) + if err != nil { + return err + } + defer str[0].Stream.Close() + + file, err := os.Create(o.FilePath) + if err != nil { + return fmt.Errorf("opening file: %w", err) + } + defer file.Close() + + _, err = io.Copy(file, str[0].Stream) + if err != nil { + return fmt.Errorf("copying data to file: %w", err) + } + + return nil +} + +type FileRead struct { + OutputID ioswitch.StreamID `json:"outputID"` + FilePath string `json:"filePath"` +} + +func (o *FileRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + file, err := os.Open(o.FilePath) + if err != nil { + return fmt.Errorf("opening file: %w", err) + } + + fut := future.NewSetVoid() + sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, myio.AfterReadClosed(file, func(closer io.ReadCloser) { + fut.SetVoid() + }))) + + fut.Wait(context.TODO()) + + return nil +} + +func init() { + OpUnion.AddT((*FileRead)(nil)) + OpUnion.AddT((*FileWrite)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/grpc.go b/common/pkgs/ioswitch/ops/grpc.go new file mode 100644 index 0000000..104b258 --- /dev/null +++ b/common/pkgs/ioswitch/ops/grpc.go @@ -0,0 +1,84 @@ +package ops + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + myio "gitlink.org.cn/cloudream/common/utils/io" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type GRPCSend struct { + LocalID ioswitch.StreamID `json:"localID"` + RemoteID ioswitch.StreamID `json:"remoteID"` + Node model.Node `json:"node"` +} + +func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + logger. + WithField("LocalID", o.LocalID). + WithField("RemoteID", o.RemoteID). + Debugf("grpc send") + + strs, err := sw.WaitStreams(planID, o.LocalID) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + // TODO 根据客户端地址选择IP和端口 + agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + err = agtCli.SendStream(planID, o.RemoteID, strs[0].Stream) + if err != nil { + return fmt.Errorf("sending stream: %w", err) + } + + return nil +} + +type GRPCFetch struct { + RemoteID ioswitch.StreamID `json:"remoteID"` + LocalID ioswitch.StreamID `json:"localID"` + Node model.Node `json:"node"` +} + +func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + // TODO 根据客户端地址选择IP和端口 + agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + str, err := agtCli.FetchStream(planID, o.RemoteID) + if err != nil { + return fmt.Errorf("fetching stream: %w", err) + } + + fut := future.NewSetVoid() + str = myio.AfterReadClosedOnce(str, func(closer io.ReadCloser) { + fut.SetVoid() + }) + + sw.StreamReady(planID, ioswitch.NewStream(o.LocalID, str)) + + // TODO + fut.Wait(context.TODO()) + + return err +} + +func init() { + OpUnion.AddT((*GRPCSend)(nil)) + OpUnion.AddT((*GRPCFetch)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/ops.go b/common/pkgs/ioswitch/ops/ops.go index 48d1b6e..e0a5ed8 100644 --- a/common/pkgs/ioswitch/ops/ops.go +++ b/common/pkgs/ioswitch/ops/ops.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "sync" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -12,18 +11,12 @@ import ( myio "gitlink.org.cn/cloudream/common/utils/io" "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" - "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) -var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op]( +var OpUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op]( (*IPFSRead)(nil), (*IPFSWrite)(nil), - (*GRPCSend)(nil), - (*GRPCFetch)(nil), - (*ECCompute)(nil), (*Join)(nil), ))) @@ -51,7 +44,7 @@ func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } fut := future.NewSetVoid() - file = myio.AfterReadClosed(file, func(closer io.ReadCloser) { + file = myio.AfterReadClosedOnce(file, func(closer io.ReadCloser) { fut.SetVoid() }) @@ -100,111 +93,6 @@ func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { return nil } -type GRPCSend struct { - StreamID ioswitch.StreamID `json:"streamID"` - Node model.Node `json:"node"` -} - -func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { - logger. - WithField("ioswitch.StreamID", o.StreamID). - Debugf("grpc send") - - strs, err := sw.WaitStreams(planID, o.StreamID) - if err != nil { - return err - } - defer strs[0].Stream.Close() - - // TODO 根据客户端地址选择IP和端口 - agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) - if err != nil { - return fmt.Errorf("new agent rpc client: %w", err) - } - defer stgglb.AgentRPCPool.Release(agtCli) - - err = agtCli.SendStream(planID, o.StreamID, strs[0].Stream) - if err != nil { - return fmt.Errorf("sending stream: %w", err) - } - - return nil -} - -type GRPCFetch struct { - StreamID ioswitch.StreamID `json:"streamID"` - Node model.Node `json:"node"` -} - -func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { - // TODO 根据客户端地址选择IP和端口 - agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) - if err != nil { - return fmt.Errorf("new agent rpc client: %w", err) - } - defer stgglb.AgentRPCPool.Release(agtCli) - - str, err := agtCli.FetchStream(planID, o.StreamID) - if err != nil { - return fmt.Errorf("fetching stream: %w", err) - } - - fut := future.NewSetVoid() - str = myio.AfterReadClosed(str, func(closer io.ReadCloser) { - fut.SetVoid() - }) - - sw.StreamReady(planID, ioswitch.NewStream(o.StreamID, str)) - - // TODO - fut.Wait(context.TODO()) - - return err -} - -type ECCompute struct { - EC stgmod.EC `json:"ec"` - InputIDs []ioswitch.StreamID `json:"inputIDs"` - OutputIDs []ioswitch.StreamID `json:"outputIDs"` - InputBlockIndexes []int `json:"inputBlockIndexes"` - OutputBlockIndexes []int `json:"outputBlockIndexes"` -} - -func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { - rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) - if err != nil { - return fmt.Errorf("new ec: %w", err) - } - - strs, err := sw.WaitStreams(planID, o.InputIDs...) - if err != nil { - return err - } - defer func() { - for _, s := range strs { - s.Stream.Close() - } - }() - - var inputs []io.Reader - for _, s := range strs { - inputs = append(inputs, s.Stream) - } - - outputs := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) - - wg := sync.WaitGroup{} - for i, id := range o.OutputIDs { - wg.Add(1) - sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosed(outputs[i], func(closer io.ReadCloser) { - wg.Done() - }))) - } - wg.Wait() - - return nil -} - type Join struct { InputIDs []ioswitch.StreamID `json:"inputIDs"` OutputID ioswitch.StreamID `json:"outputID"` @@ -230,7 +118,7 @@ func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { fut := future.NewSetVoid() sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, - myio.AfterReadClosed(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) { + myio.AfterReadClosedOnce(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) { fut.SetVoid() }), ), diff --git a/common/pkgs/ioswitch/plans/agent_plan.go b/common/pkgs/ioswitch/plans/agent_plan.go new file mode 100644 index 0000000..ced7af9 --- /dev/null +++ b/common/pkgs/ioswitch/plans/agent_plan.go @@ -0,0 +1,116 @@ +package plans + +import ( + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" +) + +func (b *AgentPlanBuilder) GRCPFetch(node model.Node, str *AgentStream) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + b.ops = append(b.ops, &ops.GRPCFetch{ + RemoteID: str.info.ID, + LocalID: agtStr.info.ID, + Node: node, + }) + + return agtStr +} + +func (s *AgentStream) GRPCSend(node model.Node) *AgentStream { + agtStr := &AgentStream{ + owner: s.owner.owner.AtAgent(node), + info: s.owner.owner.newStream(), + } + + s.owner.ops = append(s.owner.ops, &ops.GRPCSend{ + LocalID: s.info.ID, + RemoteID: agtStr.info.ID, + Node: node, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + b.ops = append(b.ops, &ops.FileRead{ + OutputID: agtStr.info.ID, + FilePath: filePath, + }) + + return agtStr +} + +func (b *AgentStream) FileWrite(filePath string) { + b.owner.ops = append(b.owner.ops, &ops.FileWrite{ + InputID: b.info.ID, + FilePath: filePath, + }) +} + +func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { + mstr := &MultiStream{} + + var inputStrIDs []ioswitch.StreamID + for _, str := range streams { + inputStrIDs = append(inputStrIDs, str.info.ID) + } + + var outputStrIDs []ioswitch.StreamID + for i := 0; i < ec.N-ec.K; i++ { + info := b.owner.newStream() + mstr.Streams = append(mstr.Streams, &AgentStream{ + owner: b, + info: info, + }) + outputStrIDs = append(outputStrIDs, info.ID) + } + + b.ops = append(b.ops, &ops.ECCompute{ + EC: ec, + InputIDs: inputStrIDs, + OutputIDs: outputStrIDs, + InputBlockIndexes: inBlockIndexes, + OutputBlockIndexes: outBlockIndexes, + }) + + return mstr +} + +func (b *AgentPlanBuilder) ECReconstruct(ec stgmod.EC, inBlockIndexes []int, streams ...*AgentStream) *MultiStream { + mstr := &MultiStream{} + + var inputStrIDs []ioswitch.StreamID + for _, str := range streams { + inputStrIDs = append(inputStrIDs, str.info.ID) + } + + var outputStrIDs []ioswitch.StreamID + for i := 0; i < ec.K; i++ { + info := b.owner.newStream() + mstr.Streams = append(mstr.Streams, &AgentStream{ + owner: b, + info: info, + }) + outputStrIDs = append(outputStrIDs, info.ID) + } + + b.ops = append(b.ops, &ops.ECReconstruct{ + EC: ec, + InputIDs: inputStrIDs, + OutputIDs: outputStrIDs, + InputBlockIndexes: inBlockIndexes, + }) + + return mstr +} diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index 8d0ab13..191131f 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/google/uuid" - stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" @@ -105,18 +104,28 @@ func (s *AgentStream) IPFSWrite(resultKey string) { }) } -func (s *AgentStream) GRPCSend(node model.Node) *AgentStream { - agtStr := &AgentStream{ - owner: s.owner.owner.AtAgent(node), - info: s.info, +func (b *AgentStream) ChunkSplit(chunkSize int, streamCount int, paddingZeros bool) *MultiStream { + mstr := &MultiStream{} + + var outputStrIDs []ioswitch.StreamID + for i := 0; i < streamCount; i++ { + info := b.owner.owner.newStream() + mstr.Streams = append(mstr.Streams, &AgentStream{ + owner: b.owner, + info: info, + }) + outputStrIDs = append(outputStrIDs, info.ID) } - s.owner.ops = append(s.owner.ops, &ops.GRPCSend{ - StreamID: s.info.ID, - Node: node, + b.owner.ops = append(b.owner.ops, &ops.ChunkedSplit{ + InputID: b.info.ID, + OutputIDs: outputStrIDs, + ChunkSize: chunkSize, + StreamCount: streamCount, + PaddingZeros: paddingZeros, }) - return agtStr + return mstr } func (s *AgentStream) ToExecutor() *ToExecutorStream { @@ -132,20 +141,6 @@ type AgentPlanBuilder struct { ops []ioswitch.Op } -func (b *AgentPlanBuilder) GRCPFetch(node model.Node) *AgentStream { - agtStr := &AgentStream{ - owner: b, - info: b.owner.newStream(), - } - - b.ops = append(b.ops, &ops.GRPCFetch{ - StreamID: agtStr.info.ID, - Node: node, - }) - - return agtStr -} - func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream { agtStr := &AgentStream{ owner: b, @@ -160,36 +155,27 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream { return agtStr } -func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { - mstr := &MultiStream{} +func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } var inputStrIDs []ioswitch.StreamID for _, str := range streams { inputStrIDs = append(inputStrIDs, str.info.ID) } - var outputStrIDs []ioswitch.StreamID - for i := 0; i < ec.N-ec.K; i++ { - info := b.owner.newStream() - mstr.streams[i] = &AgentStream{ - owner: b, - info: info, - } - outputStrIDs = append(outputStrIDs, info.ID) - } - - b.ops = append(b.ops, &ops.ECCompute{ - EC: ec, - InputIDs: inputStrIDs, - OutputIDs: outputStrIDs, - InputBlockIndexes: inBlockIndexes, - OutputBlockIndexes: outBlockIndexes, + b.ops = append(b.ops, &ops.Join{ + InputIDs: inputStrIDs, + OutputID: agtStr.info.ID, + Length: length, }) - return mstr + return agtStr } -func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream { +func (b *AgentPlanBuilder) ChunkJoin(chunkSize int, streams ...*AgentStream) *AgentStream { agtStr := &AgentStream{ owner: b, info: b.owner.newStream(), @@ -200,10 +186,10 @@ func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStr inputStrIDs = append(inputStrIDs, str.info.ID) } - b.ops = append(b.ops, &ops.Join{ - InputIDs: inputStrIDs, - OutputID: agtStr.info.ID, - Length: length, + b.ops = append(b.ops, &ops.ChunkedJoin{ + InputIDs: inputStrIDs, + OutputID: agtStr.info.ID, + ChunkSize: chunkSize, }) return agtStr @@ -222,9 +208,13 @@ func (b *AgentPlanBuilder) Build(planID ioswitch.PlanID) (AgentPlan, error) { } type MultiStream struct { - streams []*AgentStream + Streams []*AgentStream +} + +func (m *MultiStream) Count() int { + return len(m.Streams) } func (m *MultiStream) Stream(index int) *AgentStream { - return m.streams[index] + return m.Streams[index] }