Browse Source

优化ioswtich的设计

gitlink
Sydonian 1 year ago
parent
commit
576f8543a0
5 changed files with 123 additions and 27 deletions
  1. +11
    -10
      common/pkgs/ioswitch/plans/executor.go
  2. +96
    -0
      common/pkgs/ioswitch/plans/fromto.go
  3. +15
    -0
      common/pkgs/ioswitch/plans/parser.go
  4. +1
    -1
      common/pkgs/ioswitch/plans/plan_builder.go
  5. +0
    -16
      common/pkgs/ioswitch/plans/plans.go

+ 11
- 10
common/pkgs/ioswitch/plans/executor.go View File

@@ -15,19 +15,19 @@ import (

type Executor struct {
planID ioswitch.PlanID
plan *PlanBuilder
planBlder *PlanBuilder
callback *future.SetVoidFuture
ctx context.Context
cancel context.CancelFunc
executorSw *ioswitch.Switch
}

func (e *Executor) BeginWrite(str io.ReadCloser, target ExecutorWriteStream) {
func (e *Executor) BeginWrite(str io.ReadCloser, target *ExecutorWriteStream) {
target.stream.Stream = str
e.executorSw.PutVars(target.stream)
}

func (e *Executor) BeginRead(target ExecutorReadStream) (io.ReadCloser, error) {
func (e *Executor) BeginRead(target *ExecutorReadStream) (io.ReadCloser, error) {
err := e.executorSw.BindVars(e.ctx, target.stream)
if err != nil {
return nil, fmt.Errorf("bind vars: %w", err)
@@ -36,7 +36,7 @@ func (e *Executor) BeginRead(target ExecutorReadStream) (io.ReadCloser, error) {
return target.stream.Stream, nil
}

func (e *Executor) Signal(signal ExecutorSignalVar) {
func (e *Executor) Signal(signal *ExecutorSignalVar) {
e.executorSw.PutVars(signal.v)
}

@@ -47,7 +47,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) {
}

ret := make(map[string]any)
e.plan.storeMap.Range(func(k, v any) bool {
e.planBlder.storeMap.Range(func(k, v any) bool {
ret[k.(string)] = v
return true
})
@@ -58,7 +58,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) {
func (e *Executor) execute() {
wg := sync.WaitGroup{}

for _, p := range e.plan.agentPlans {
for _, p := range e.planBlder.agentPlans {
wg.Add(1)

go func(p *AgentPlanBuilder) {
@@ -113,9 +113,10 @@ type ExecutorWriteStream struct {
stream *ioswitch.StreamVar
}

func (b *ExecutorPlanBuilder) WillWrite() (ExecutorWriteStream, *ExecutorStreamVar) {
func (b *ExecutorPlanBuilder) WillWrite(str *ExecutorWriteStream) *ExecutorStreamVar {
stream := b.blder.newStreamVar()
return ExecutorWriteStream{stream}, &ExecutorStreamVar{blder: b.blder, v: stream}
str.stream = stream
return &ExecutorStreamVar{blder: b.blder, v: stream}
}

func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar {
@@ -127,8 +128,8 @@ type ExecutorReadStream struct {
stream *ioswitch.StreamVar
}

func (v *ExecutorStreamVar) WillRead() ExecutorReadStream {
return ExecutorReadStream{v.v}
func (v *ExecutorStreamVar) WillRead(str *ExecutorReadStream) {
str.stream = v.v
}

func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar {


+ 96
- 0
common/pkgs/ioswitch/plans/fromto.go View File

@@ -0,0 +1,96 @@
package plans

import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

type From interface {
GetDataIndex() int
}

type To interface {
GetDataIndex() int
}

type FromTos []FromTo

type FromTo struct {
Froms []From
Tos []To
}

type FromExecutor struct {
Stream ExecutorWriteStream
DataIndex int
}

func (f *FromExecutor) GetDataIndex() int {
return f.DataIndex
}

type FromIPFS struct {
Node cdssdk.Node
FileHash string
DataIndex int
}

func NewFromIPFS(node cdssdk.Node, fileHash string, dataIndex int) *FromIPFS {
return &FromIPFS{
Node: node,
FileHash: fileHash,
DataIndex: dataIndex,
}
}

func (f *FromIPFS) GetDataIndex() int {
return f.DataIndex
}

type ToExecutor struct {
Stream *ExecutorReadStream
DataIndex int
}

func NewToExecutor(dataIndex int) (*ToExecutor, *ExecutorReadStream) {
str := ExecutorReadStream{}
return &ToExecutor{
Stream: &str,
DataIndex: dataIndex,
}, &str
}

func (t *ToExecutor) GetDataIndex() int {
return t.DataIndex
}

type ToIPFS struct {
Node cdssdk.Node
DataIndex int
FileHashKey string
}

func NewToIPFS(node cdssdk.Node, dataIndex int, fileHashKey string) *ToIPFS {
return &ToIPFS{
Node: node,
DataIndex: dataIndex,
FileHashKey: fileHashKey,
}
}

func (t *ToIPFS) GetDataIndex() int {
return t.DataIndex
}

type ToStorage struct {
Storage cdssdk.Storage
DataIndex int
}

func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage {
return &ToStorage{
Storage: storage,
DataIndex: dataIndex,
}
}

func (t *ToStorage) GetDataIndex() int {
return t.DataIndex
}

+ 15
- 0
common/pkgs/ioswitch/plans/parser.go View File

@@ -0,0 +1,15 @@
package plans

import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

type FromToParser interface {
Parse(ft FromTo, blder *PlanBuilder) error
}

type DefaultParser struct {
EC *cdssdk.ECRedundancy
}

func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error {

}

+ 1
- 1
common/pkgs/ioswitch/plans/plan_builder.go View File

@@ -54,7 +54,7 @@ func (b *PlanBuilder) Execute() *Executor {

exec := Executor{
planID: planID,
plan: b,
planBlder: b,
callback: future.NewSetVoid(),
ctx: ctx,
cancel: cancel,


+ 0
- 16
common/pkgs/ioswitch/plans/plans.go View File

@@ -1,16 +0,0 @@
package plans

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type AgentPlan struct {
Node cdssdk.Node
Plan ioswitch.Plan
}

type ComposedPlan struct {
ID ioswitch.PlanID
AgentPlans []AgentPlan
}

Loading…
Cancel
Save