Browse Source

优化ioswitch的代码结构

gitlink
Sydonian 2 years ago
parent
commit
b1ad113b31
6 changed files with 257 additions and 238 deletions
  1. +1
    -1
      common/pkgs/cmd/update_ec_package.go
  2. +91
    -0
      common/pkgs/ioswitch/ops/ipfs.go
  3. +49
    -0
      common/pkgs/ioswitch/ops/join.go
  4. +1
    -121
      common/pkgs/ioswitch/ops/ops.go
  5. +115
    -0
      common/pkgs/ioswitch/plans/agent_plan.go
  6. +0
    -116
      common/pkgs/ioswitch/plans/plan_builder.go

+ 1
- 1
common/pkgs/cmd/update_ec_package.go View File

@@ -44,7 +44,7 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe
Metadata().
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建包信息
// 用于修改包信息
Package().WriteOne(t.packageID).
// 用于创建包中的文件的信息
Object().CreateAny().


+ 91
- 0
common/pkgs/ioswitch/ops/ipfs.go View File

@@ -0,0 +1,91 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type IPFSRead struct {
Output ioswitch.StreamID `json:"output"`
FileHash string `json:"fileHash"`
}

func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
logger.
WithField("FileHash", o.FileHash).
WithField("Output", o.Output).
Debugf("ipfs read op")
defer logger.Debugf("ipfs read op finished")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

file, err := ipfsCli.OpenRead(o.FileHash)
if err != nil {
return fmt.Errorf("reading ipfs: %w", err)
}

fut := future.NewSetVoid()
file = myio.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
fut.SetVoid()
})

sw.StreamReady(planID, ioswitch.NewStream(o.Output, file))

// TODO context
fut.Wait(context.TODO())
return nil
}

type IPFSWrite struct {
Input ioswitch.StreamID `json:"input"`
ResultKey string `json:"resultKey"`
}

func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
logger.
WithField("ResultKey", o.ResultKey).
WithField("Input", o.Input).
Debugf("ipfs write op")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

strs, err := sw.WaitStreams(planID, o.Input)
if err != nil {
return err
}
defer strs[0].Stream.Close()

fileHash, err := ipfsCli.CreateFile(strs[0].Stream)
if err != nil {
return fmt.Errorf("creating ipfs file: %w", err)
}

if o.ResultKey != "" {
sw.AddResultValue(planID, ioswitch.ResultKV{
Key: o.ResultKey,
Value: fileHash,
})
}

return nil
}

func init() {
OpUnion.AddT((*IPFSRead)(nil))
OpUnion.AddT((*IPFSWrite)(nil))
}

+ 49
- 0
common/pkgs/ioswitch/ops/join.go View File

@@ -0,0 +1,49 @@
package ops

import (
"context"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type Join struct {
InputIDs []ioswitch.StreamID `json:"inputIDs"`
OutputID ioswitch.StreamID `json:"outputID"`
Length int64 `json:"length"`
}

func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
strs, err := sw.WaitStreams(planID, o.InputIDs...)
if err != nil {
return err
}

var strReaders []io.Reader
for _, s := range strs {
strReaders = append(strReaders, s.Stream)
}
defer func() {
for _, str := range strs {
str.Stream.Close()
}
}()

fut := future.NewSetVoid()
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputID,
myio.AfterReadClosedOnce(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) {
fut.SetVoid()
}),
),
)

fut.Wait(context.TODO())
return nil
}

func init() {
OpUnion.AddT((*Join)(nil))
}

+ 1
- 121
common/pkgs/ioswitch/ops/ops.go View File

@@ -1,129 +1,9 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/types"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/serder"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

var OpUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op](
(*IPFSRead)(nil),
(*IPFSWrite)(nil),
(*Join)(nil),
)))

type IPFSRead struct {
Output ioswitch.StreamID `json:"output"`
FileHash string `json:"fileHash"`
}

func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
logger.
WithField("FileHash", o.FileHash).
WithField("Output", o.Output).
Debugf("ipfs read op")
defer logger.Debugf("ipfs read op finished")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

file, err := ipfsCli.OpenRead(o.FileHash)
if err != nil {
return fmt.Errorf("reading ipfs: %w", err)
}

fut := future.NewSetVoid()
file = myio.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
fut.SetVoid()
})

sw.StreamReady(planID, ioswitch.NewStream(o.Output, file))

// TODO context
fut.Wait(context.TODO())
return nil
}

type IPFSWrite struct {
Input ioswitch.StreamID `json:"input"`
ResultKey string `json:"resultKey"`
}

func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
logger.
WithField("ResultKey", o.ResultKey).
WithField("Input", o.Input).
Debugf("ipfs write op")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

strs, err := sw.WaitStreams(planID, o.Input)
if err != nil {
return err
}
defer strs[0].Stream.Close()

fileHash, err := ipfsCli.CreateFile(strs[0].Stream)
if err != nil {
return fmt.Errorf("creating ipfs file: %w", err)
}

if o.ResultKey != "" {
sw.AddResultValue(planID, ioswitch.ResultKV{
Key: o.ResultKey,
Value: fileHash,
})
}

return nil
}

type Join struct {
InputIDs []ioswitch.StreamID `json:"inputIDs"`
OutputID ioswitch.StreamID `json:"outputID"`
Length int64 `json:"length"`
}

func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
strs, err := sw.WaitStreams(planID, o.InputIDs...)
if err != nil {
return err
}

var strReaders []io.Reader
for _, s := range strs {
strReaders = append(strReaders, s.Stream)
}
defer func() {
for _, str := range strs {
str.Stream.Close()
}
}()

fut := future.NewSetVoid()
sw.StreamReady(planID,
ioswitch.NewStream(o.OutputID,
myio.AfterReadClosedOnce(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) {
fut.SetVoid()
}),
),
)

fut.Wait(context.TODO())
return nil
}
var OpUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op]()))

+ 115
- 0
common/pkgs/ioswitch/plans/agent_plan.go View File

@@ -7,6 +7,29 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
)

type AgentPlanBuilder struct {
owner *PlanBuilder
node model.Node
ops []ioswitch.Op
}

type AgentStream struct {
owner *AgentPlanBuilder
info *StreamInfo
}

func (b *AgentPlanBuilder) Build(planID ioswitch.PlanID) (AgentPlan, error) {
plan := ioswitch.Plan{
ID: planID,
Ops: b.ops,
}

return AgentPlan{
Plan: plan,
Node: b.node,
}, nil
}

func (b *AgentPlanBuilder) GRCPFetch(node model.Node, str *AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
@@ -37,6 +60,27 @@ func (s *AgentStream) GRPCSend(node model.Node) *AgentStream {
return agtStr
}

func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

b.ops = append(b.ops, &ops.IPFSRead{
Output: agtStr.info.ID,
FileHash: fileHash,
})

return agtStr
}

func (s *AgentStream) IPFSWrite(resultKey string) {
s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{
Input: s.info.ID,
ResultKey: resultKey,
})
}

func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStream {
agtStr := &AgentStream{
owner: b,
@@ -114,3 +158,74 @@ func (b *AgentPlanBuilder) ECReconstruct(ec stgmod.EC, inBlockIndexes []int, str

return mstr
}

func (b *AgentStream) ChunkedSplit(chunkSize int, streamCount int, paddingZeros bool) *MultiStream {
mstr := &MultiStream{}

var outputStrIDs []ioswitch.StreamID
for i := 0; i < streamCount; i++ {
info := b.owner.owner.newStream()
mstr.Streams = append(mstr.Streams, &AgentStream{
owner: b.owner,
info: info,
})
outputStrIDs = append(outputStrIDs, info.ID)
}

b.owner.ops = append(b.owner.ops, &ops.ChunkedSplit{
InputID: b.info.ID,
OutputIDs: outputStrIDs,
ChunkSize: chunkSize,
StreamCount: streamCount,
PaddingZeros: paddingZeros,
})

return mstr
}

func (s *AgentStream) ToExecutor() *ToExecutorStream {
return &ToExecutorStream{
info: s.info,
fromNode: &s.owner.node,
}
}

func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

var inputStrIDs []ioswitch.StreamID
for _, str := range streams {
inputStrIDs = append(inputStrIDs, str.info.ID)
}

b.ops = append(b.ops, &ops.Join{
InputIDs: inputStrIDs,
OutputID: agtStr.info.ID,
Length: length,
})

return agtStr
}

func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams ...*AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

var inputStrIDs []ioswitch.StreamID
for _, str := range streams {
inputStrIDs = append(inputStrIDs, str.info.ID)
}

b.ops = append(b.ops, &ops.ChunkedJoin{
InputIDs: inputStrIDs,
OutputID: agtStr.info.ID,
ChunkSize: chunkSize,
})

return agtStr
}

+ 0
- 116
common/pkgs/ioswitch/plans/plan_builder.go View File

@@ -6,7 +6,6 @@ import (
"github.com/google/uuid"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
)

type StreamInfo struct {
@@ -92,121 +91,6 @@ type ToExecutorStream struct {
fromNode *model.Node
}

type AgentStream struct {
owner *AgentPlanBuilder
info *StreamInfo
}

func (s *AgentStream) IPFSWrite(resultKey string) {
s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{
Input: s.info.ID,
ResultKey: resultKey,
})
}

func (b *AgentStream) ChunkSplit(chunkSize int, streamCount int, paddingZeros bool) *MultiStream {
mstr := &MultiStream{}

var outputStrIDs []ioswitch.StreamID
for i := 0; i < streamCount; i++ {
info := b.owner.owner.newStream()
mstr.Streams = append(mstr.Streams, &AgentStream{
owner: b.owner,
info: info,
})
outputStrIDs = append(outputStrIDs, info.ID)
}

b.owner.ops = append(b.owner.ops, &ops.ChunkedSplit{
InputID: b.info.ID,
OutputIDs: outputStrIDs,
ChunkSize: chunkSize,
StreamCount: streamCount,
PaddingZeros: paddingZeros,
})

return mstr
}

func (s *AgentStream) ToExecutor() *ToExecutorStream {
return &ToExecutorStream{
info: s.info,
fromNode: &s.owner.node,
}
}

type AgentPlanBuilder struct {
owner *PlanBuilder
node model.Node
ops []ioswitch.Op
}

func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

b.ops = append(b.ops, &ops.IPFSRead{
Output: agtStr.info.ID,
FileHash: fileHash,
})

return agtStr
}

func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

var inputStrIDs []ioswitch.StreamID
for _, str := range streams {
inputStrIDs = append(inputStrIDs, str.info.ID)
}

b.ops = append(b.ops, &ops.Join{
InputIDs: inputStrIDs,
OutputID: agtStr.info.ID,
Length: length,
})

return agtStr
}

func (b *AgentPlanBuilder) ChunkJoin(chunkSize int, streams ...*AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
}

var inputStrIDs []ioswitch.StreamID
for _, str := range streams {
inputStrIDs = append(inputStrIDs, str.info.ID)
}

b.ops = append(b.ops, &ops.ChunkedJoin{
InputIDs: inputStrIDs,
OutputID: agtStr.info.ID,
ChunkSize: chunkSize,
})

return agtStr
}

func (b *AgentPlanBuilder) Build(planID ioswitch.PlanID) (AgentPlan, error) {
plan := ioswitch.Plan{
ID: planID,
Ops: b.ops,
}

return AgentPlan{
Plan: plan,
Node: b.node,
}, nil
}

type MultiStream struct {
Streams []*AgentStream
}


Loading…
Cancel
Save