Browse Source

增加ioswitch功能,准备测试代码

gitlink
Sydonian 2 years ago
parent
commit
ea3a7beff0
13 changed files with 513 additions and 171 deletions
  1. +1
    -1
      client/internal/cmdline/package.go
  2. +1
    -1
      common/assets/confs/agent.config.json
  3. +2
    -2
      common/models/models.go
  4. +1
    -1
      common/pkgs/cmd/create_ec_package.go
  5. +2
    -2
      common/pkgs/ec/stream_rs.go
  6. +49
    -0
      common/pkgs/ioswitch/ops/chunked_join.go
  7. +49
    -0
      common/pkgs/ioswitch/ops/chunked_split.go
  8. +102
    -0
      common/pkgs/ioswitch/ops/ec.go
  9. +64
    -0
      common/pkgs/ioswitch/ops/file.go
  10. +84
    -0
      common/pkgs/ioswitch/ops/grpc.go
  11. +3
    -115
      common/pkgs/ioswitch/ops/ops.go
  12. +116
    -0
      common/pkgs/ioswitch/plans/agent_plan.go
  13. +39
    -49
      common/pkgs/ioswitch/plans/plan_builder.go

+ 1
- 1
client/internal/cmdline/package.go View File

@@ -185,7 +185,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin
}
}

func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int64, nodeAffinity []int64) error {
func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int, nodeAffinity []int64) error {
rootPath = filepath.Clean(rootPath)

var uploadFilePathes []string


+ 1
- 1
common/assets/confs/agent.config.json View File

@@ -23,7 +23,7 @@
"vhost": "/"
},
"ipfs": {
"port": 5001
"address": "127.0.0.1:5001"
},
"distlock": {
"etcdAddress": "127.0.0.1:2379",


+ 2
- 2
common/models/models.go View File

@@ -8,10 +8,10 @@ type EC struct {
ID int64 `json:"id"`
K int `json:"k"`
N int `json:"n"`
ChunkSize int64 `json:"chunkSize"`
ChunkSize int `json:"chunkSize"`
}

func NewEc(id int64, k int, n int, chunkSize int64) EC {
func NewEc(id int64, k int, n int, chunkSize int) EC {
return EC{
ID: id,
K: k,


+ 1
- 1
common/pkgs/cmd/create_ec_package.go View File

@@ -193,7 +193,7 @@ func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeI
}

outputs := myio.ChunkedSplit(obj.File, ecInfo.ChunkSize, ecMod.EcK, myio.ChunkedSplitOption{
FillZeros: true,
PaddingZeros: true,
})
var readers []io.Reader
for _, o := range outputs {


+ 2
- 2
common/pkgs/ec/stream_rs.go View File

@@ -12,10 +12,10 @@ type Rs struct {
ecN int
ecK int
ecP int
chunkSize int64
chunkSize int
}

func NewRs(k int, n int, chunkSize int64) (*Rs, error) {
func NewRs(k int, n int, chunkSize int) (*Rs, error) {
enc := Rs{
ecN: n,
ecK: k,


+ 49
- 0
common/pkgs/ioswitch/ops/chunked_join.go View File

@@ -0,0 +1,49 @@
package ops

import (
"context"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type ChunkedJoin struct {
InputIDs []ioswitch.StreamID `json:"inputIDs"`
OutputID ioswitch.StreamID `json:"outputID"`
ChunkSize int `json:"chunkSize"`
}

func (o *ChunkedJoin) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
strs, err := sw.WaitStreams(planID, o.InputIDs...)
if err != nil {
return err
}

var strReaders []io.Reader
for _, s := range strs {
strReaders = append(strReaders, s.Stream)
}
defer func() {
for _, str := range strs {
str.Stream.Close()
}
}()

fut := future.NewSetVoid()
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputID,
myio.AfterReadClosedOnce(myio.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
fut.SetVoid()
}),
),
)

fut.Wait(context.TODO())
return nil
}

func init() {
OpUnion.AddT((*ChunkedJoin)(nil))
}

+ 49
- 0
common/pkgs/ioswitch/ops/chunked_split.go View File

@@ -0,0 +1,49 @@
package ops

import (
"io"
"sync"

myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type ChunkedSplit struct {
InputID ioswitch.StreamID `json:"inputID"`
OutputIDs []ioswitch.StreamID `json:"outputIDs"`
ChunkSize int `json:"chunkSize"`
StreamCount int `json:"streamCount"`
PaddingZeros bool `json:"paddingZeros"`
}

func (o *ChunkedSplit) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
str, err := sw.WaitStreams(planID, o.InputID)
if err != nil {
return err
}
defer str[0].Stream.Close()

wg := sync.WaitGroup{}
outputs := myio.ChunkedSplit(str[0].Stream, o.ChunkSize, o.StreamCount, myio.ChunkedSplitOption{
PaddingZeros: o.PaddingZeros,
})

for i := range outputs {
wg.Add(1)

sw.StreamReady(planID, ioswitch.NewStream(
o.OutputIDs[i],
myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
wg.Done()
}),
))
}

wg.Wait()

return nil
}

func init() {
OpUnion.AddT((*ChunkedSplit)(nil))
}

+ 102
- 0
common/pkgs/ioswitch/ops/ec.go View File

@@ -0,0 +1,102 @@
package ops

import (
"fmt"
"io"
"sync"

myio "gitlink.org.cn/cloudream/common/utils/io"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type ECCompute struct {
EC stgmod.EC `json:"ec"`
InputIDs []ioswitch.StreamID `json:"inputIDs"`
OutputIDs []ioswitch.StreamID `json:"outputIDs"`
InputBlockIndexes []int `json:"inputBlockIndexes"`
OutputBlockIndexes []int `json:"outputBlockIndexes"`
}

func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
if err != nil {
return fmt.Errorf("new ec: %w", err)
}

strs, err := sw.WaitStreams(planID, o.InputIDs...)
if err != nil {
return err
}
defer func() {
for _, s := range strs {
s.Stream.Close()
}
}()

var inputs []io.Reader
for _, s := range strs {
inputs = append(inputs, s.Stream)
}

outputs := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes)

wg := sync.WaitGroup{}
for i, id := range o.OutputIDs {
wg.Add(1)
sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
wg.Done()
})))
}
wg.Wait()

return nil
}

type ECReconstruct struct {
EC stgmod.EC `json:"ec"`
InputIDs []ioswitch.StreamID `json:"inputIDs"`
OutputIDs []ioswitch.StreamID `json:"outputIDs"`
InputBlockIndexes []int `json:"inputBlockIndexes"`
}

func (o *ECReconstruct) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
if err != nil {
return fmt.Errorf("new ec: %w", err)
}

strs, err := sw.WaitStreams(planID, o.InputIDs...)
if err != nil {
return err
}
defer func() {
for _, s := range strs {
s.Stream.Close()
}
}()

var inputs []io.Reader
for _, s := range strs {
inputs = append(inputs, s.Stream)
}

outputs := rs.ReconstructData(inputs, o.InputBlockIndexes)

wg := sync.WaitGroup{}
for i, id := range o.OutputIDs {
wg.Add(1)
sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
wg.Done()
})))
}
wg.Wait()

return nil
}

func init() {
OpUnion.AddT((*ECCompute)(nil))
OpUnion.AddT((*ECReconstruct)(nil))
}

+ 64
- 0
common/pkgs/ioswitch/ops/file.go View File

@@ -0,0 +1,64 @@
package ops

import (
"context"
"fmt"
"io"
"os"

"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type FileWrite struct {
InputID ioswitch.StreamID `json:"inputID"`
FilePath string `json:"filePath"`
}

func (o *FileWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
str, err := sw.WaitStreams(planID, o.InputID)
if err != nil {
return err
}
defer str[0].Stream.Close()

file, err := os.Create(o.FilePath)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}
defer file.Close()

_, err = io.Copy(file, str[0].Stream)
if err != nil {
return fmt.Errorf("copying data to file: %w", err)
}

return nil
}

type FileRead struct {
OutputID ioswitch.StreamID `json:"outputID"`
FilePath string `json:"filePath"`
}

func (o *FileRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
file, err := os.Open(o.FilePath)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}

fut := future.NewSetVoid()
sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, myio.AfterReadClosed(file, func(closer io.ReadCloser) {
fut.SetVoid()
})))

fut.Wait(context.TODO())

return nil
}

func init() {
OpUnion.AddT((*FileRead)(nil))
OpUnion.AddT((*FileWrite)(nil))
}

+ 84
- 0
common/pkgs/ioswitch/ops/grpc.go View File

@@ -0,0 +1,84 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type GRPCSend struct {
LocalID ioswitch.StreamID `json:"localID"`
RemoteID ioswitch.StreamID `json:"remoteID"`
Node model.Node `json:"node"`
}

func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
logger.
WithField("LocalID", o.LocalID).
WithField("RemoteID", o.RemoteID).
Debugf("grpc send")

strs, err := sw.WaitStreams(planID, o.LocalID)
if err != nil {
return err
}
defer strs[0].Stream.Close()

// TODO 根据客户端地址选择IP和端口
agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

err = agtCli.SendStream(planID, o.RemoteID, strs[0].Stream)
if err != nil {
return fmt.Errorf("sending stream: %w", err)
}

return nil
}

type GRPCFetch struct {
RemoteID ioswitch.StreamID `json:"remoteID"`
LocalID ioswitch.StreamID `json:"localID"`
Node model.Node `json:"node"`
}

func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
// TODO 根据客户端地址选择IP和端口
agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

str, err := agtCli.FetchStream(planID, o.RemoteID)
if err != nil {
return fmt.Errorf("fetching stream: %w", err)
}

fut := future.NewSetVoid()
str = myio.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
fut.SetVoid()
})

sw.StreamReady(planID, ioswitch.NewStream(o.LocalID, str))

// TODO
fut.Wait(context.TODO())

return err
}

func init() {
OpUnion.AddT((*GRPCSend)(nil))
OpUnion.AddT((*GRPCFetch)(nil))
}

+ 3
- 115
common/pkgs/ioswitch/ops/ops.go View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
@@ -12,18 +11,12 @@ import (
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/serder"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op](
var OpUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op](
(*IPFSRead)(nil),
(*IPFSWrite)(nil),
(*GRPCSend)(nil),
(*GRPCFetch)(nil),
(*ECCompute)(nil),
(*Join)(nil),
)))

@@ -51,7 +44,7 @@ func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
}

fut := future.NewSetVoid()
file = myio.AfterReadClosed(file, func(closer io.ReadCloser) {
file = myio.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
fut.SetVoid()
})

@@ -100,111 +93,6 @@ func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
return nil
}

type GRPCSend struct {
StreamID ioswitch.StreamID `json:"streamID"`
Node model.Node `json:"node"`
}

func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
logger.
WithField("ioswitch.StreamID", o.StreamID).
Debugf("grpc send")

strs, err := sw.WaitStreams(planID, o.StreamID)
if err != nil {
return err
}
defer strs[0].Stream.Close()

// TODO 根据客户端地址选择IP和端口
agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

err = agtCli.SendStream(planID, o.StreamID, strs[0].Stream)
if err != nil {
return fmt.Errorf("sending stream: %w", err)
}

return nil
}

type GRPCFetch struct {
StreamID ioswitch.StreamID `json:"streamID"`
Node model.Node `json:"node"`
}

func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
// TODO 根据客户端地址选择IP和端口
agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort)
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

str, err := agtCli.FetchStream(planID, o.StreamID)
if err != nil {
return fmt.Errorf("fetching stream: %w", err)
}

fut := future.NewSetVoid()
str = myio.AfterReadClosed(str, func(closer io.ReadCloser) {
fut.SetVoid()
})

sw.StreamReady(planID, ioswitch.NewStream(o.StreamID, str))

// TODO
fut.Wait(context.TODO())

return err
}

type ECCompute struct {
EC stgmod.EC `json:"ec"`
InputIDs []ioswitch.StreamID `json:"inputIDs"`
OutputIDs []ioswitch.StreamID `json:"outputIDs"`
InputBlockIndexes []int `json:"inputBlockIndexes"`
OutputBlockIndexes []int `json:"outputBlockIndexes"`
}

func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize)
if err != nil {
return fmt.Errorf("new ec: %w", err)
}

strs, err := sw.WaitStreams(planID, o.InputIDs...)
if err != nil {
return err
}
defer func() {
for _, s := range strs {
s.Stream.Close()
}
}()

var inputs []io.Reader
for _, s := range strs {
inputs = append(inputs, s.Stream)
}

outputs := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes)

wg := sync.WaitGroup{}
for i, id := range o.OutputIDs {
wg.Add(1)
sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosed(outputs[i], func(closer io.ReadCloser) {
wg.Done()
})))
}
wg.Wait()

return nil
}

type Join struct {
InputIDs []ioswitch.StreamID `json:"inputIDs"`
OutputID ioswitch.StreamID `json:"outputID"`
@@ -230,7 +118,7 @@ func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
fut := future.NewSetVoid()
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputID,
myio.AfterReadClosed(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) {
myio.AfterReadClosedOnce(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) {
fut.SetVoid()
}),
),


+ 116
- 0
common/pkgs/ioswitch/plans/agent_plan.go View File

@@ -0,0 +1,116 @@
package plans

import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
)

func (b *AgentPlanBuilder) GRCPFetch(node model.Node, str *AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

b.ops = append(b.ops, &ops.GRPCFetch{
RemoteID: str.info.ID,
LocalID: agtStr.info.ID,
Node: node,
})

return agtStr
}

func (s *AgentStream) GRPCSend(node model.Node) *AgentStream {
agtStr := &AgentStream{
owner: s.owner.owner.AtAgent(node),
info: s.owner.owner.newStream(),
}

s.owner.ops = append(s.owner.ops, &ops.GRPCSend{
LocalID: s.info.ID,
RemoteID: agtStr.info.ID,
Node: node,
})

return agtStr
}

func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

b.ops = append(b.ops, &ops.FileRead{
OutputID: agtStr.info.ID,
FilePath: filePath,
})

return agtStr
}

func (b *AgentStream) FileWrite(filePath string) {
b.owner.ops = append(b.owner.ops, &ops.FileWrite{
InputID: b.info.ID,
FilePath: filePath,
})
}

func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream {
mstr := &MultiStream{}

var inputStrIDs []ioswitch.StreamID
for _, str := range streams {
inputStrIDs = append(inputStrIDs, str.info.ID)
}

var outputStrIDs []ioswitch.StreamID
for i := 0; i < ec.N-ec.K; i++ {
info := b.owner.newStream()
mstr.Streams = append(mstr.Streams, &AgentStream{
owner: b,
info: info,
})
outputStrIDs = append(outputStrIDs, info.ID)
}

b.ops = append(b.ops, &ops.ECCompute{
EC: ec,
InputIDs: inputStrIDs,
OutputIDs: outputStrIDs,
InputBlockIndexes: inBlockIndexes,
OutputBlockIndexes: outBlockIndexes,
})

return mstr
}

func (b *AgentPlanBuilder) ECReconstruct(ec stgmod.EC, inBlockIndexes []int, streams ...*AgentStream) *MultiStream {
mstr := &MultiStream{}

var inputStrIDs []ioswitch.StreamID
for _, str := range streams {
inputStrIDs = append(inputStrIDs, str.info.ID)
}

var outputStrIDs []ioswitch.StreamID
for i := 0; i < ec.K; i++ {
info := b.owner.newStream()
mstr.Streams = append(mstr.Streams, &AgentStream{
owner: b,
info: info,
})
outputStrIDs = append(outputStrIDs, info.ID)
}

b.ops = append(b.ops, &ops.ECReconstruct{
EC: ec,
InputIDs: inputStrIDs,
OutputIDs: outputStrIDs,
InputBlockIndexes: inBlockIndexes,
})

return mstr
}

+ 39
- 49
common/pkgs/ioswitch/plans/plan_builder.go View File

@@ -4,7 +4,6 @@ import (
"fmt"

"github.com/google/uuid"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
@@ -105,18 +104,28 @@ func (s *AgentStream) IPFSWrite(resultKey string) {
})
}

func (s *AgentStream) GRPCSend(node model.Node) *AgentStream {
agtStr := &AgentStream{
owner: s.owner.owner.AtAgent(node),
info: s.info,
func (b *AgentStream) ChunkSplit(chunkSize int, streamCount int, paddingZeros bool) *MultiStream {
mstr := &MultiStream{}

var outputStrIDs []ioswitch.StreamID
for i := 0; i < streamCount; i++ {
info := b.owner.owner.newStream()
mstr.Streams = append(mstr.Streams, &AgentStream{
owner: b.owner,
info: info,
})
outputStrIDs = append(outputStrIDs, info.ID)
}

s.owner.ops = append(s.owner.ops, &ops.GRPCSend{
StreamID: s.info.ID,
Node: node,
b.owner.ops = append(b.owner.ops, &ops.ChunkedSplit{
InputID: b.info.ID,
OutputIDs: outputStrIDs,
ChunkSize: chunkSize,
StreamCount: streamCount,
PaddingZeros: paddingZeros,
})

return agtStr
return mstr
}

func (s *AgentStream) ToExecutor() *ToExecutorStream {
@@ -132,20 +141,6 @@ type AgentPlanBuilder struct {
ops []ioswitch.Op
}

func (b *AgentPlanBuilder) GRCPFetch(node model.Node) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

b.ops = append(b.ops, &ops.GRPCFetch{
StreamID: agtStr.info.ID,
Node: node,
})

return agtStr
}

func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream {
agtStr := &AgentStream{
owner: b,
@@ -160,36 +155,27 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream {
return agtStr
}

func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream {
mstr := &MultiStream{}
func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

var inputStrIDs []ioswitch.StreamID
for _, str := range streams {
inputStrIDs = append(inputStrIDs, str.info.ID)
}

var outputStrIDs []ioswitch.StreamID
for i := 0; i < ec.N-ec.K; i++ {
info := b.owner.newStream()
mstr.streams[i] = &AgentStream{
owner: b,
info: info,
}
outputStrIDs = append(outputStrIDs, info.ID)
}

b.ops = append(b.ops, &ops.ECCompute{
EC: ec,
InputIDs: inputStrIDs,
OutputIDs: outputStrIDs,
InputBlockIndexes: inBlockIndexes,
OutputBlockIndexes: outBlockIndexes,
b.ops = append(b.ops, &ops.Join{
InputIDs: inputStrIDs,
OutputID: agtStr.info.ID,
Length: length,
})

return mstr
return agtStr
}

func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream {
func (b *AgentPlanBuilder) ChunkJoin(chunkSize int, streams ...*AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
@@ -200,10 +186,10 @@ func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStr
inputStrIDs = append(inputStrIDs, str.info.ID)
}

b.ops = append(b.ops, &ops.Join{
InputIDs: inputStrIDs,
OutputID: agtStr.info.ID,
Length: length,
b.ops = append(b.ops, &ops.ChunkedJoin{
InputIDs: inputStrIDs,
OutputID: agtStr.info.ID,
ChunkSize: chunkSize,
})

return agtStr
@@ -222,9 +208,13 @@ func (b *AgentPlanBuilder) Build(planID ioswitch.PlanID) (AgentPlan, error) {
}

type MultiStream struct {
streams []*AgentStream
Streams []*AgentStream
}

func (m *MultiStream) Count() int {
return len(m.Streams)
}

func (m *MultiStream) Stream(index int) *AgentStream {
return m.streams[index]
return m.Streams[index]
}

Loading…
Cancel
Save