Browse Source

重构ioswitch模块

gitlink
Sydonian 1 year ago
parent
commit
09e428c696
23 changed files with 417 additions and 1064 deletions
  1. +0
    -3
      agent/main.go
  2. +14
    -8
      common/pkgs/grpc/agent/client.go
  3. +61
    -0
      common/pkgs/ioswitch/agent_worker.go
  4. +37
    -16
      common/pkgs/ioswitch/fromto.go
  5. +23
    -0
      common/pkgs/ioswitch/ioswitch.go
  6. +0
    -43
      common/pkgs/ioswitch/ops/driver.go
  7. +0
    -53
      common/pkgs/ioswitch/ops/drop.go
  8. +0
    -230
      common/pkgs/ioswitch/ops/grpc.go
  9. +0
    -134
      common/pkgs/ioswitch/ops/ops.go
  10. +0
    -53
      common/pkgs/ioswitch/ops/store.go
  11. +0
    -173
      common/pkgs/ioswitch/ops/sync.go
  12. +0
    -20
      common/pkgs/ioswitch/ops/var.go
  13. +24
    -25
      common/pkgs/ioswitch/ops2/chunked.go
  14. +23
    -25
      common/pkgs/ioswitch/ops2/clone.go
  15. +21
    -18
      common/pkgs/ioswitch/ops2/ec.go
  16. +16
    -17
      common/pkgs/ioswitch/ops2/file.go
  17. +20
    -21
      common/pkgs/ioswitch/ops2/ipfs.go
  18. +5
    -5
      common/pkgs/ioswitch/ops2/join.go
  19. +5
    -5
      common/pkgs/ioswitch/ops2/length.go
  20. +75
    -0
      common/pkgs/ioswitch/ops2/ops.go
  21. +11
    -11
      common/pkgs/ioswitch/ops2/range.go
  22. +65
    -204
      common/pkgs/ioswitch/plans/parser.go
  23. +17
    -0
      common/pkgs/ioswitch/utils.go

+ 0
- 3
agent/main.go View File

@@ -16,9 +16,6 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"

// TODO 注册OpUnion,但在mq包中注册会造成循环依赖,所以只能放到这里
_ "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"

"google.golang.org/grpc"

agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"


+ 14
- 8
common/pkgs/grpc/agent/client.go View File

@@ -132,11 +132,12 @@ func (c *Client) SendStream(ctx context.Context, planID exec.PlanID, varID exec.
}
}

func (c *Client) GetStream(planID exec.PlanID, varID exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(context.Background())
func (c *Client) GetStream(ctx context.Context, planID exec.PlanID, varID exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(ctx)

sdata, err := serder.ObjectToJSONEx(signal)
if err != nil {
cancel()
return nil, err
}

@@ -169,15 +170,15 @@ func (c *Client) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) er
return err
}

func (c *Client) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) (exec.Var, error) {
func (c *Client) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error {
vdata, err := serder.ObjectToJSONEx(v)
if err != nil {
return nil, err
return err
}

sdata, err := serder.ObjectToJSONEx(signal)
if err != nil {
return nil, err
return err
}

resp, err := c.cli.GetVar(ctx, &GetVarReq{
@@ -186,15 +187,20 @@ func (c *Client) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, sig
Signal: string(sdata),
})
if err != nil {
return nil, err
return err
}

v2, err := serder.JSONToObjectEx[exec.Var]([]byte(resp.Var))
if err != nil {
return nil, err
return err
}

return v2, nil
err = exec.AssignVar(v2, v)
if err != nil {
return err
}

return nil
}

func (c *Client) Ping() error {


+ 61
- 0
common/pkgs/ioswitch/agent_worker.go View File

@@ -0,0 +1,61 @@
package ioswitch

import (
"context"
"io"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
)

type AgentWorker struct {
Node cdssdk.Node
}

func (w *AgentWorker) NewClient() (exec.WorkerClient, error) {
cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&w.Node))
if err != nil {
return nil, err
}

return &AgentWorkerClient{cli: cli}, nil
}

func (w *AgentWorker) String() string {
return w.Node.String()
}

func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool {
aw, ok := worker.(*AgentWorker)
if !ok {
return false
}

return w.Node.NodeID == aw.Node.NodeID
}

type AgentWorkerClient struct {
cli *agtrpc.PoolClient
}

func (c *AgentWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
return c.cli.ExecuteIOPlan(ctx, plan)
}
func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, str io.ReadCloser) error {
return c.cli.SendStream(ctx, planID, v.ID, str)
}
func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error {
return c.cli.SendVar(ctx, planID, v)
}
func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, signal *exec.SignalVar) (io.ReadCloser, error) {
return c.cli.GetStream(ctx, planID, v.ID, signal)
}
func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error {
return c.cli.GetVar(ctx, planID, v, signal)
}
func (c *AgentWorkerClient) Close() error {
stgglb.AgentRPCPool.Release(c.cli)
return nil
}

common/pkgs/ioswitch/ops/fromto.go → common/pkgs/ioswitch/fromto.go View File

@@ -1,4 +1,4 @@
package ops
package ioswitch

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
@@ -17,71 +17,92 @@ type To interface {
GetDataIndex() int
}

type FromExecutor struct {
type FromTos []FromTo

type FromTo struct {
Froms []From
Toes []To
}

func NewFromTo() FromTo {
return FromTo{}
}

func (ft *FromTo) AddFrom(from From) *FromTo {
ft.Froms = append(ft.Froms, from)
return ft
}

func (ft *FromTo) AddTo(to To) *FromTo {
ft.Toes = append(ft.Toes, to)
return ft
}

type FromDriver struct {
Handle *exec.DriverWriteStream
DataIndex int
}

func NewFromExecutor(dataIndex int) (*FromExecutor, *exec.DriverWriteStream) {
func NewFromDriver(dataIndex int) (*FromDriver, *exec.DriverWriteStream) {
handle := &exec.DriverWriteStream{
RangeHint: &exec.Range{},
}
return &FromExecutor{
return &FromDriver{
Handle: handle,
DataIndex: dataIndex,
}, handle
}

func (f *FromExecutor) GetDataIndex() int {
func (f *FromDriver) GetDataIndex() int {
return f.DataIndex
}

type FromWorker struct {
type FromNode struct {
FileHash string
Node *cdssdk.Node
DataIndex int
}

func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromWorker {
return &FromWorker{
func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode {
return &FromNode{
FileHash: fileHash,
Node: node,
DataIndex: dataIndex,
}
}

func (f *FromWorker) GetDataIndex() int {
func (f *FromNode) GetDataIndex() int {
return f.DataIndex
}

type ToExecutor struct {
type ToDriver struct {
Handle *exec.DriverReadStream
DataIndex int
Range exec.Range
}

func NewToExecutor(dataIndex int) (*ToExecutor, *exec.DriverReadStream) {
func NewToDriver(dataIndex int) (*ToDriver, *exec.DriverReadStream) {
str := exec.DriverReadStream{}
return &ToExecutor{
return &ToDriver{
Handle: &str,
DataIndex: dataIndex,
}, &str
}

func NewToExecutorWithRange(dataIndex int, rng exec.Range) (*ToExecutor, *exec.DriverReadStream) {
func NewToExecutorWithRange(dataIndex int, rng exec.Range) (*ToDriver, *exec.DriverReadStream) {
str := exec.DriverReadStream{}
return &ToExecutor{
return &ToDriver{
Handle: &str,
DataIndex: dataIndex,
Range: rng,
}, &str
}

func (t *ToExecutor) GetDataIndex() int {
func (t *ToDriver) GetDataIndex() int {
return t.DataIndex
}

func (t *ToExecutor) GetRange() exec.Range {
func (t *ToDriver) GetRange() exec.Range {
return t.Range
}


+ 23
- 0
common/pkgs/ioswitch/ioswitch.go View File

@@ -0,0 +1,23 @@
package ioswitch

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type NodeProps struct {
From From
To To
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type VarProps struct {
StreamIndex int // 流的编号,只在StreamVar上有意义
ValueType ValueVarType // 值类型,只在ValueVar上有意义
Var exec.Var // 生成Plan的时候创建的对应的Var
}

+ 0
- 43
common/pkgs/ioswitch/ops/driver.go View File

@@ -1,43 +0,0 @@
package ops

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type FromDriverType struct {
Handle *exec.DriverWriteStream
}

func (t *FromDriverType) InitNode(node *Node) {
dag.NodeNewOutputStream(node, VarProps{})
}

func (t *FromDriverType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
t.Handle.Var = op.OutputStreams[0].Props.Var.(*exec.StreamVar)
return nil
}

func (t *FromDriverType) String(node *Node) string {
return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type ToDriverType struct {
Handle *exec.DriverReadStream
Range exec.Range
}

func (t *ToDriverType) InitNode(node *Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *ToDriverType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
t.Handle.Var = op.InputStreams[0].Props.Var.(*exec.StreamVar)
return nil
}

func (t *ToDriverType) String(node *Node) string {
return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

+ 0
- 53
common/pkgs/ioswitch/ops/drop.go View File

@@ -1,53 +0,0 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

func init() {
OpUnion.AddT((*DropStream)(nil))
}

type DropStream struct {
Input *exec.StreamVar `json:"input"`
}

func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}

for {
buf := make([]byte, 1024*8)
_, err = o.Input.Stream.Read(buf)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}

type DropType struct{}

func (t *DropType) InitNode(node *Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *DropType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&DropStream{
Input: op.InputStreams[0].Props.Var.(*exec.StreamVar),
}, op.Env, blder)
return nil
}

func (t *DropType) String(node *Node) string {
return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 0
- 230
common/pkgs/ioswitch/ops/grpc.go View File

@@ -1,230 +0,0 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
)

func init() {
OpUnion.AddT((*SendStream)(nil))
OpUnion.AddT((*GetStream)(nil))
OpUnion.AddT((*SendVar)(nil))
OpUnion.AddT((*GetVar)(nil))
}

type SendStream struct {
Input *exec.StreamVar `json:"input"`
Send *exec.StreamVar `json:"send"`
Node cdssdk.Node `json:"node"`
}

func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node))
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("sending stream %v as %v to node %v", o.Input.ID, o.Send.ID, o.Node)

// 发送后流的ID不同
err = agtCli.SendStream(ctx, e.Plan().ID, o.Send.ID, o.Input.Stream)
if err != nil {
return fmt.Errorf("sending stream: %w", err)
}

return nil
}

type GetStream struct {
Signal *exec.SignalVar `json:"signal"`
Target *exec.StreamVar `json:"target"`
Output *exec.StreamVar `json:"output"`
Node cdssdk.Node `json:"node"`
}

func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error {
agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node))
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("getting stream %v as %v from node %v", o.Target.ID, o.Output.ID, o.Node)

str, err := agtCli.GetStream(e.Plan().ID, o.Target.ID, o.Signal)
if err != nil {
return fmt.Errorf("getting stream: %w", err)
}

fut := future.NewSetVoid()
// 获取后送到本地的流ID是不同的
o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
fut.SetVoid()
})
e.PutVars(o.Output)

return fut.Wait(ctx)
}

type SendVar struct {
Input exec.Var `json:"input"`
Send exec.Var `json:"send"`
Node cdssdk.Node `json:"node"`
}

func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}

agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node))
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("sending var %v as %v to node %v", o.Input.GetID(), o.Send.GetID(), o.Node)

exec.AssignVar(o.Input, o.Send)
err = agtCli.SendVar(ctx, e.Plan().ID, o.Send)
if err != nil {
return fmt.Errorf("sending var: %w", err)
}

return nil
}

type GetVar struct {
Signal *exec.SignalVar `json:"signal"`
Target exec.Var `json:"target"`
Output exec.Var `json:"output"`
Node cdssdk.Node `json:"node"`
}

func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error {
agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node))
if err != nil {
return fmt.Errorf("new agent rpc client: %w", err)
}
defer stgglb.AgentRPCPool.Release(agtCli)

logger.Debugf("getting var %v as %v from node %v", o.Target.GetID(), o.Output.GetID(), o.Node)

v2, err := agtCli.GetVar(ctx, e.Plan().ID, o.Target, o.Signal)
if err != nil {
return fmt.Errorf("getting var: %w", err)
}
exec.AssignVar(v2, o.Output)
e.PutVars(o.Output)

return nil
}

type SendStreamType struct {
}

func (t *SendStreamType) InitNode(node *Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, VarProps{})
}

func (t *SendStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
toAgt := op.OutputStreams[0].Toes[0].Node.Env.Worker.(*AgentWorker)
addOpByEnv(&SendStream{
Input: op.InputStreams[0].Props.Var.(*exec.StreamVar),
Send: op.OutputStreams[0].Props.Var.(*exec.StreamVar),
Node: toAgt.Node,
}, op.Env, blder)
return nil
}

func (t *SendStreamType) String(node *Node) string {
return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type SendVarType struct {
}

func (t *SendVarType) InitNode(node *Node) {
dag.NodeDeclareInputValue(node, 1)
dag.NodeNewOutputValue(node, VarProps{})
}

func (t *SendVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
toAgt := op.OutputValues[0].Toes[0].Node.Env.Worker.(*AgentWorker)
addOpByEnv(&SendVar{
Input: op.InputValues[0].Props.Var,
Send: op.OutputValues[0].Props.Var,
Node: toAgt.Node,
}, op.Env, blder)
return nil
}

func (t *SendVarType) String(node *Node) string {
return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type GetStreamType struct {
}

func (t *GetStreamType) InitNode(node *Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, VarProps{})
dag.NodeNewOutputStream(node, VarProps{})
}

func (t *GetStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
fromAgt := op.InputStreams[0].From.Node.Env.Worker.(*AgentWorker)
addOpByEnv(&GetStream{
Signal: op.OutputValues[0].Props.Var.(*exec.SignalVar),
Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar),
Target: op.InputStreams[0].Props.Var.(*exec.StreamVar),
Node: fromAgt.Node,
}, op.Env, blder)
return nil
}

func (t *GetStreamType) String(node *Node) string {
return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type GetVaType struct {
}

func (t *GetVaType) InitNode(node *Node) {
dag.NodeDeclareInputValue(node, 1)
dag.NodeNewOutputValue(node, VarProps{})
dag.NodeNewOutputValue(node, VarProps{})
}

func (t *GetVaType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
fromAgt := op.InputValues[0].From.Node.Env.Worker.(*AgentWorker)
addOpByEnv(&GetVar{
Signal: op.OutputValues[0].Props.Var.(*exec.SignalVar),
Output: op.OutputValues[1].Props.Var,
Target: op.InputValues[0].Props.Var,
Node: fromAgt.Node,
}, op.Env, blder)
return nil
}

func (t *GetVaType) String(node *Node) string {
return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 0
- 134
common/pkgs/ioswitch/ops/ops.go View File

@@ -1,134 +0,0 @@
package ops

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/types"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
)

var OpUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.Op]()))

type AgentWorker struct {
Node cdssdk.Node
}

func (w *AgentWorker) GetAddress() string {
// TODO 选择地址
return fmt.Sprintf("%v:%v", w.Node.ExternalIP, w.Node.ExternalGRPCPort)
}

func (w *AgentWorker) Equals(worker dag.WorkerInfo) bool {
aw, ok := worker.(*AgentWorker)
if !ok {
return false
}

return w.Node.NodeID == aw.Node.NodeID
}

type NodeProps struct {
From From
To To
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type VarProps struct {
StreamIndex int // 流的编号,只在StreamVar上有意义
ValueType ValueVarType // 值类型,只在ValueVar上有意义
Var exec.Var // 生成Plan的时候创建的对应的Var
}

type Graph = dag.Graph[NodeProps, VarProps]

type Node = dag.Node[NodeProps, VarProps]

type StreamVar = dag.StreamVar[NodeProps, VarProps]

type ValueVar = dag.ValueVar[NodeProps, VarProps]

func addOpByEnv(op exec.Op, env dag.NodeEnv, blder *exec.PlanBuilder) {
switch env.Type {
case dag.EnvWorker:
blder.AtAgent(env.Worker.(*AgentWorker).Node).AddOp(op)
case dag.EnvExecutor:
blder.AtExecutor().AddOp(op)
}
}

func formatStreamIO(node *Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("S{%s>%s}", is, os)
}

func formatValueIO(node *Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("V{%s>%s}", is, os)
}

+ 0
- 53
common/pkgs/ioswitch/ops/store.go View File

@@ -1,53 +0,0 @@
package ops

import (
"context"
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type Store struct {
Var exec.Var
Key string
Store *sync.Map
}

func (o *Store) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Var)
if err != nil {
return err
}

switch v := o.Var.(type) {
case *exec.IntVar:
o.Store.Store(o.Key, v.Value)
case *exec.StringVar:
o.Store.Store(o.Key, v.Value)
}

return nil
}

type StoreType struct {
StoreKey string
}

func (t *StoreType) InitNode(node *Node) {
dag.NodeDeclareInputValue(node, 1)
}

func (t *StoreType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
blder.AtExecutor().AddOp(&Store{
Var: op.InputValues[0].Props.Var,
Key: t.StoreKey,
Store: blder.DriverPlan.StoreMap,
})
return nil
}

func (t *StoreType) String(node *Node) string {
return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node))
}

+ 0
- 173
common/pkgs/ioswitch/ops/sync.go View File

@@ -1,173 +0,0 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

func init() {
OpUnion.AddT((*OnStreamBegin)(nil))
OpUnion.AddT((*OnStreamEnd)(nil))
OpUnion.AddT((*HoldUntil)(nil))
OpUnion.AddT((*HangUntil)(nil))
OpUnion.AddT((*Broadcast)(nil))
}

type OnStreamBegin struct {
Raw *exec.StreamVar `json:"raw"`
New *exec.StreamVar `json:"new"`
Signal *exec.SignalVar `json:"signal"`
}

func (o *OnStreamBegin) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}

o.New.Stream = o.Raw.Stream

e.PutVars(o.New, o.Signal)
return nil
}

type OnStreamEnd struct {
Raw *exec.StreamVar `json:"raw"`
New *exec.StreamVar `json:"new"`
Signal *exec.SignalVar `json:"signal"`
}

type onStreamEnd struct {
inner io.ReadCloser
callback *future.SetVoidFuture
}

func (o *onStreamEnd) Read(p []byte) (n int, err error) {
n, err = o.inner.Read(p)
if err == io.EOF {
o.callback.SetVoid()
} else if err != nil {
o.callback.SetError(err)
}
return n, err
}

func (o *onStreamEnd) Close() error {
o.callback.SetError(fmt.Errorf("stream closed early"))
return o.inner.Close()
}

func (o *OnStreamEnd) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}

cb := future.NewSetVoid()

o.New.Stream = &onStreamEnd{
inner: o.Raw.Stream,
callback: cb,
}
e.PutVars(o.New)

err = cb.Wait(ctx)
if err != nil {
return err
}

e.PutVars(o.Signal)
return nil
}

type HoldUntil struct {
Waits []*exec.SignalVar `json:"waits"`
Holds []exec.Var `json:"holds"`
Emits []exec.Var `json:"emits"`
}

func (w *HoldUntil) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, w.Holds...)
if err != nil {
return err
}

err = exec.BindArrayVars(e, ctx, w.Waits)
if err != nil {
return err
}

for i := 0; i < len(w.Holds); i++ {
err := exec.AssignVar(w.Holds[i], w.Emits[i])
if err != nil {
return err
}
}

e.PutVars(w.Emits...)
return nil
}

type HangUntil struct {
Waits []*exec.SignalVar `json:"waits"`
Op exec.Op `json:"op"`
}

func (h *HangUntil) Execute(ctx context.Context, e *exec.Executor) error {
err := exec.BindArrayVars(e, ctx, h.Waits)
if err != nil {
return err
}

return h.Op.Execute(ctx, e)
}

type Broadcast struct {
Source *exec.SignalVar `json:"source"`
Targets []*exec.SignalVar `json:"targets"`
}

func (b *Broadcast) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, b.Source)
if err != nil {
return err
}

exec.PutArrayVars(e, b.Targets)
return nil
}

type HoldUntilType struct {
}

func (t *HoldUntilType) InitNode(node *Node) {
dag.NodeDeclareInputValue(node, 1)
}

func (t *HoldUntilType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
o := &HoldUntil{
Waits: []*exec.SignalVar{op.InputValues[0].Props.Var.(*exec.SignalVar)},
}

for i := 0; i < len(op.OutputValues); i++ {
o.Holds = append(o.Holds, op.InputValues[i+1].Props.Var)
o.Emits = append(o.Emits, op.OutputValues[i].Props.Var)
}

for i := 0; i < len(op.OutputStreams); i++ {
o.Holds = append(o.Holds, op.InputStreams[i].Props.Var)
o.Emits = append(o.Emits, op.OutputStreams[i].Props.Var)
}

addOpByEnv(o, op.Env, blder)
return nil
}

func (t *HoldUntilType) String(node *Node) string {
return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 0
- 20
common/pkgs/ioswitch/ops/var.go View File

@@ -1,20 +0,0 @@
package ops

import (
"context"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type ConstVar struct {
Var *exec.StringVar `json:"var"`
}

func (o *ConstVar) Execute(ctx context.Context, e *exec.Executor) error {
e.PutVars(o.Var)
return nil
}

func init() {
OpUnion.AddT((*ConstVar)(nil))
}

common/pkgs/ioswitch/ops/chunked.go → common/pkgs/ioswitch/ops2/chunked.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -10,12 +10,13 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"golang.org/x/sync/semaphore"
)

func init() {
OpUnion.AddT((*ChunkedSplit)(nil))
OpUnion.AddT((*ChunkedJoin)(nil))
exec.UseOp[*ChunkedSplit]()
exec.UseOp[*ChunkedJoin]()
}

type ChunkedSplit struct {
@@ -85,27 +86,26 @@ type ChunkedSplitType struct {
ChunkSize int
}

func (t *ChunkedSplitType) InitNode(node *Node) {
func (t *ChunkedSplitType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
for i := 0; i < t.OutputCount; i++ {
dag.NodeNewOutputStream(node, VarProps{})
dag.NodeNewOutputStream(node, &ioswitch.VarProps{})
}
}

func (t *ChunkedSplitType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&ChunkedSplit{
Input: op.InputStreams[0].Props.Var.(*exec.StreamVar),
Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *exec.StreamVar {
return v.Props.Var.(*exec.StreamVar)
func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &ChunkedSplit{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
ChunkSize: t.ChunkSize,
PaddingZeros: true,
}, op.Env, blder)
return nil
}, nil
}

func (t *ChunkedSplitType) String(node *Node) string {
return fmt.Sprintf("ChunkedSplit[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
func (t *ChunkedSplitType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}

type ChunkedJoinType struct {
@@ -113,22 +113,21 @@ type ChunkedJoinType struct {
ChunkSize int
}

func (t *ChunkedJoinType) InitNode(node *Node) {
func (t *ChunkedJoinType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, t.InputCount)
dag.NodeNewOutputStream(node, VarProps{})
dag.NodeNewOutputStream(node, &ioswitch.VarProps{})
}

func (t *ChunkedJoinType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&ChunkedJoin{
Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *exec.StreamVar {
return v.Props.Var.(*exec.StreamVar)
func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &ChunkedJoin{
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar),
Output: op.OutputStreams[0].Var,
ChunkSize: t.ChunkSize,
}, op.Env, blder)
return nil
}, nil
}

func (t *ChunkedJoinType) String(node *Node) string {
return fmt.Sprintf("ChunkedJoin[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
func (t *ChunkedJoinType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}

common/pkgs/ioswitch/ops/clone.go → common/pkgs/ioswitch/ops2/clone.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -13,8 +13,8 @@ import (
)

func init() {
OpUnion.AddT((*CloneStream)(nil))
OpUnion.AddT((*CloneVar)(nil))
exec.UseOp[*CloneStream]()
exec.UseOp[*CloneVar]()
}

type CloneStream struct {
@@ -67,48 +67,46 @@ func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error {

type CloneStreamType struct{}

func (t *CloneStreamType) InitNode(node *Node) {
func (t *CloneStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *CloneStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&CloneStream{
Input: op.InputStreams[0].Props.Var.(*exec.StreamVar),
Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *exec.StreamVar {
return v.Props.Var.(*exec.StreamVar)
func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &CloneStream{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
}, op.Env, blder)
return nil
}, nil
}

func (t *CloneStreamType) NewOutput(node *Node) *StreamVar {
return dag.NodeNewOutputStream(node, VarProps{})
func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar {
return dag.NodeNewOutputStream(node, nil)
}

func (t *CloneStreamType) String(node *Node) string {
func (t *CloneStreamType) String(node *dag.Node) string {
return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type CloneVarType struct{}

func (t *CloneVarType) InitNode(node *Node) {
func (t *CloneVarType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
}

func (t *CloneVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&CloneVar{
Raw: op.InputValues[0].Props.Var,
Cloneds: lo.Map(op.OutputValues, func(v *ValueVar, idx int) exec.Var {
return v.Props.Var
func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &CloneVar{
Raw: op.InputValues[0].Var,
Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var {
return v.Var
}),
}, op.Env, blder)
return nil
}, nil
}

func (t *CloneVarType) NewOutput(node *Node) *ValueVar {
return dag.NodeNewOutputValue(node, VarProps{})
func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar {
return dag.NodeNewOutputValue(node, nil)
}

func (t *CloneVarType) String(node *Node) string {
func (t *CloneVarType) String(node *dag.Node) string {
return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}

common/pkgs/ioswitch/ops/ec.go → common/pkgs/ioswitch/ops2/ec.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -13,13 +13,14 @@ import (
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"golang.org/x/sync/semaphore"
)

func init() {
OpUnion.AddT((*ECReconstructAny)(nil))
OpUnion.AddT((*ECReconstruct)(nil))
OpUnion.AddT((*ECMultiply)(nil))
exec.UseOp[*ECReconstructAny]()
exec.UseOp[*ECReconstruct]()
exec.UseOp[*ECMultiply]()
}

type ECReconstructAny struct {
@@ -197,42 +198,44 @@ type MultiplyType struct {
EC cdssdk.ECRedundancy
}

func (t *MultiplyType) InitNode(node *Node) {}
func (t *MultiplyType) InitNode(node *dag.Node) {}

func (t *MultiplyType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) {
var inputIdxs []int
var outputIdxs []int
for _, in := range op.InputStreams {
inputIdxs = append(inputIdxs, in.Props.StreamIndex)
inputIdxs = append(inputIdxs, ioswitch.SProps(in).StreamIndex)
}
for _, out := range op.OutputStreams {
outputIdxs = append(outputIdxs, out.Props.StreamIndex)
outputIdxs = append(outputIdxs, ioswitch.SProps(out).StreamIndex)
}

rs, err := ec.NewRs(t.EC.K, t.EC.N)
if err != nil {
return nil, err
}
coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs)
if err != nil {
return err
return nil, err
}

addOpByEnv(&ECMultiply{
return &ECMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *exec.StreamVar { return v.Props.Var.(*exec.StreamVar) }),
Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *exec.StreamVar { return v.Props.Var.(*exec.StreamVar) }),
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.EC.ChunkSize,
}, op.Env, blder)
return nil
}, nil
}

func (t *MultiplyType) AddInput(node *Node, str *StreamVar) {
func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar) {
node.InputStreams = append(node.InputStreams, str)
str.To(node, len(node.InputStreams)-1)
}

func (t *MultiplyType) NewOutput(node *Node, dataIndex int) *StreamVar {
return dag.NodeNewOutputStream(node, VarProps{StreamIndex: dataIndex})
func (t *MultiplyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar {
return dag.NodeNewOutputStream(node, &ioswitch.VarProps{StreamIndex: dataIndex})
}

func (t *MultiplyType) String(node *Node) string {
func (t *MultiplyType) String(node *dag.Node) string {
return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node))
}

common/pkgs/ioswitch/ops/file.go → common/pkgs/ioswitch/ops2/file.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -11,11 +11,12 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

func init() {
OpUnion.AddT((*FileRead)(nil))
OpUnion.AddT((*FileWrite)(nil))
exec.UseOp[*FileRead]()
exec.UseOp[*FileWrite]()
}

type FileWrite struct {
@@ -75,19 +76,18 @@ type FileReadType struct {
FilePath string
}

func (t *FileReadType) InitNode(node *Node) {
dag.NodeNewOutputStream(node, VarProps{})
func (t *FileReadType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitch.VarProps{})
}

func (t *FileReadType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&FileRead{
Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar),
func (t *FileReadType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &FileRead{
Output: op.OutputStreams[0].Var,
FilePath: t.FilePath,
}, op.Env, blder)
return nil
}, nil
}

func (t *FileReadType) String(node *Node) string {
func (t *FileReadType) String(node *dag.Node) string {
return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node))
}

@@ -95,14 +95,13 @@ type FileWriteType struct {
FilePath string
}

func (t *FileWriteType) InitNode(node *Node) {
func (t *FileWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *FileWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&FileWrite{
Input: op.InputStreams[0].Props.Var.(*exec.StreamVar),
func (t *FileWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &FileWrite{
Input: op.InputStreams[0].Var,
FilePath: t.FilePath,
}, op.Env, blder)
return nil
}, nil
}

common/pkgs/ioswitch/ops/ipfs.go → common/pkgs/ioswitch/ops2/ipfs.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -12,11 +12,12 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

func init() {
OpUnion.AddT((*IPFSRead)(nil))
OpUnion.AddT((*IPFSWrite)(nil))
exec.UseOp[*IPFSRead]()
exec.UseOp[*IPFSWrite]()
}

type IPFSRead struct {
@@ -90,20 +91,19 @@ type IPFSReadType struct {
Option ipfs.ReadOption
}

func (t *IPFSReadType) InitNode(node *Node) {
dag.NodeNewOutputStream(node, VarProps{})
func (t *IPFSReadType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitch.VarProps{})
}

func (t *IPFSReadType) GenerateOp(node *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&IPFSRead{
Output: node.OutputStreams[0].Props.Var.(*exec.StreamVar),
func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) {
return &IPFSRead{
Output: n.OutputStreams[0].Var,
FileHash: t.FileHash,
Option: t.Option,
}, node.Env, blder)
return nil
}, nil
}

func (t *IPFSReadType) String(node *Node) string {
func (t *IPFSReadType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
}

@@ -112,19 +112,18 @@ type IPFSWriteType struct {
Range exec.Range
}

func (t *IPFSWriteType) InitNode(node *Node) {
func (t *IPFSWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, VarProps{})
dag.NodeNewOutputValue(node, &ioswitch.VarProps{})
}

func (t *IPFSWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&IPFSWrite{
Input: op.InputStreams[0].Props.Var.(*exec.StreamVar),
FileHash: op.OutputValues[0].Props.Var.(*exec.StringVar),
}, op.Env, blder)
return nil
func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &IPFSWrite{
Input: op.InputStreams[0].Var,
FileHash: op.OutputValues[0].Var.(*exec.StringVar),
}, nil
}

func (t *IPFSWriteType) String(node *Node) string {
return fmt.Sprintf("IPFSWrite[%s,%v+%v](%v>)", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
func (t *IPFSWriteType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

common/pkgs/ioswitch/ops/join.go → common/pkgs/ioswitch/ops2/join.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -9,6 +9,10 @@ import (
"gitlink.org.cn/cloudream/common/utils/io2"
)

func init() {
// OpUnion.AddT((*Join)(nil))
}

type Join struct {
Inputs []*exec.StreamVar `json:"inputs"`
Output *exec.StreamVar `json:"output"`
@@ -39,7 +43,3 @@ func (o *Join) Execute(ctx context.Context, e *exec.Executor) error {

return fut.Wait(ctx)
}

func init() {
OpUnion.AddT((*Join)(nil))
}

common/pkgs/ioswitch/ops/length.go → common/pkgs/ioswitch/ops2/length.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -9,6 +9,10 @@ import (
"gitlink.org.cn/cloudream/common/utils/io2"
)

func init() {
// OpUnion.AddT((*Length)(nil))
}

type Length struct {
Input *exec.StreamVar `json:"input"`
Output *exec.StreamVar `json:"output"`
@@ -30,7 +34,3 @@ func (o *Length) Execute(ctx context.Context, e *exec.Executor) error {

return fut.Wait(ctx)
}

func init() {
OpUnion.AddT((*Length)(nil))
}

+ 75
- 0
common/pkgs/ioswitch/ops2/ops.go View File

@@ -0,0 +1,75 @@
package ops2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func formatStreamIO(node *dag.Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("S{%s>%s}", is, os)
}

func formatValueIO(node *dag.Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("V{%s>%s}", is, os)
}

common/pkgs/ioswitch/ops/range.go → common/pkgs/ioswitch/ops2/range.go View File

@@ -1,4 +1,4 @@
package ops
package ops2

import (
"context"
@@ -10,10 +10,11 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

func init() {
OpUnion.AddT((*Range)(nil))
exec.UseOp[*Range]()
}

type Range struct {
@@ -75,21 +76,20 @@ type RangeType struct {
Range exec.Range
}

func (t *RangeType) InitNode(node *Node) {
func (t *RangeType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, VarProps{})
dag.NodeNewOutputStream(node, &ioswitch.VarProps{})
}

func (t *RangeType) GenerateOp(op *Node, blder *exec.PlanBuilder) error {
addOpByEnv(&Range{
Input: op.InputStreams[0].Props.Var.(*exec.StreamVar),
Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar),
func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) {
return &Range{
Input: n.InputStreams[0].Var,
Output: n.OutputStreams[0].Var,
Offset: t.Range.Offset,
Length: t.Range.Length,
}, op.Env, blder)
return nil
}, nil
}

func (t *RangeType) String(node *Node) string {
func (t *RangeType) String(node *dag.Node) string {
return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

+ 65
- 204
common/pkgs/ioswitch/plans/parser.go View File

@@ -6,12 +6,14 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/parser"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops2"
)

type DefaultParser struct {
@@ -25,14 +27,14 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser {
}

type ParseContext struct {
Ft parser.FromTo
DAG *ops.Graph
Ft ioswitch.FromTo
DAG *dag.Graph
// 为了产生所有To所需的数据范围,而需要From打开的范围。
// 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。
StreamRange exec.Range
}

func (p *DefaultParser) Parse(ft parser.FromTo, blder *exec.PlanBuilder) error {
func (p *DefaultParser) Parse(ft ioswitch.FromTo, blder *exec.PlanBuilder) error {
ctx := ParseContext{Ft: ft}

// 分成两个阶段:
@@ -79,15 +81,14 @@ func (p *DefaultParser) Parse(ft parser.FromTo, blder *exec.PlanBuilder) error {
p.storeIPFSWriteResult(&ctx)
p.generateClone(&ctx)
p.generateRange(&ctx)
p.generateSend(&ctx)

return p.buildPlan(&ctx, blder)
return plan.Generate(ctx.DAG, blder)
}
func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *ops.StreamVar {
var ret *ops.StreamVar
ctx.DAG.Walk(func(n *dag.Node[ops.NodeProps, ops.VarProps]) bool {
func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.StreamVar {
var ret *dag.StreamVar
ctx.DAG.Walk(func(n *dag.Node) bool {
for _, o := range n.OutputStreams {
if o != nil && o.Props.StreamIndex == streamIndex {
if o != nil && ioswitch.SProps(o).StreamIndex == streamIndex {
ret = o
return false
}
@@ -106,8 +107,7 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) {
Offset: math.MaxInt64,
}

for _, t := range ctx.Ft.Toes {
to := t.(ops.To)
for _, to := range ctx.Ft.Toes {
if to.GetDataIndex() == -1 {
toRng := to.GetRange()
rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
@@ -134,10 +134,8 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) {
ctx.StreamRange = rng
}

func (p *DefaultParser) extend(ctx *ParseContext, ft parser.FromTo) error {
for _, f := range ft.Froms {
fr := f.(ops.From)

func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch.FromTo) error {
for _, fr := range ft.Froms {
_, err := p.buildFromNode(ctx, &ft, fr)
if err != nil {
return err
@@ -145,20 +143,21 @@ func (p *DefaultParser) extend(ctx *ParseContext, ft parser.FromTo) error {

// 对于完整文件的From,生成Split指令
if fr.GetDataIndex() == -1 {
n, _ := dag.NewNode(ctx.DAG, &ops.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, ops.NodeProps{})
n, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch.NodeProps{})
for i := 0; i < p.EC.K; i++ {
n.OutputStreams[i].Props.StreamIndex = i
ioswitch.SProps(n.OutputStreams[i]).StreamIndex = i
}
}
}

// 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
ecInputStrs := make(map[int]*ops.StreamVar)
ecInputStrs := make(map[int]*dag.StreamVar)
loop:
for _, o := range ctx.DAG.Nodes {
for _, s := range o.OutputStreams {
if s.Props.StreamIndex >= 0 && ecInputStrs[s.Props.StreamIndex] == nil {
ecInputStrs[s.Props.StreamIndex] = s
prop := ioswitch.SProps(s)
if prop.StreamIndex >= 0 && ecInputStrs[prop.StreamIndex] == nil {
ecInputStrs[prop.StreamIndex] = s
if len(ecInputStrs) == p.EC.K {
break loop
}
@@ -166,9 +165,9 @@ loop:
}
}
if len(ecInputStrs) == p.EC.K {
mulNode, mulType := dag.NewNode(ctx.DAG, &ops.MultiplyType{
mulNode, mulType := dag.NewNode(ctx.DAG, &ops2.MultiplyType{
EC: p.EC,
}, ops.NodeProps{})
}, &ioswitch.NodeProps{})

for _, s := range ecInputStrs {
mulType.AddInput(mulNode, s)
@@ -177,22 +176,20 @@ loop:
mulType.NewOutput(mulNode, i)
}

joinNode, _ := dag.NewNode(ctx.DAG, &ops.ChunkedJoinType{
joinNode, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedJoinType{
InputCount: p.EC.K,
ChunkSize: p.EC.ChunkSize,
}, ops.NodeProps{})
}, &ioswitch.NodeProps{})

for i := 0; i < p.EC.K; i++ {
// 不可能找不到流
p.findOutputStream(ctx, i).To(joinNode, i)
}
joinNode.OutputStreams[0].Props.StreamIndex = -1
ioswitch.SProps(joinNode.OutputStreams[0]).StreamIndex = -1
}

// 为每一个To找到一个输入流
for _, t := range ft.Toes {
to := t.(ops.To)

for _, to := range ft.Toes {
n, err := p.buildToNode(ctx, &ft, to)
if err != nil {
return err
@@ -209,7 +206,7 @@ loop:
return nil
}

func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *parser.FromTo, f ops.From) (*ops.Node, error) {
func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch.FromTo, f ioswitch.From) (*dag.Node, error) {
var repRange exec.Range
var blkRange exec.Range

@@ -224,17 +221,17 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *parser.FromTo, f op
}

switch f := f.(type) {
case *ops.FromWorker:
n, t := dag.NewNode(ctx.DAG, &ops.IPFSReadType{
case *ioswitch.FromNode:
n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{
FileHash: f.FileHash,
Option: ipfs.ReadOption{
Offset: 0,
Length: -1,
},
}, ops.NodeProps{
}, &ioswitch.NodeProps{
From: f,
})
n.OutputStreams[0].Props.StreamIndex = f.DataIndex
ioswitch.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
t.Option.Offset = repRange.Offset
@@ -249,15 +246,15 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *parser.FromTo, f op
}

if f.Node != nil {
n.Env.ToEnvWorker(&ops.AgentWorker{*f.Node})
n.Env.ToEnvWorker(&ioswitch.AgentWorker{Node: *f.Node})
}

return n, nil

case *ops.FromExecutor:
n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, ops.NodeProps{From: f})
case *ioswitch.FromDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch.NodeProps{From: f})
n.Env.ToEnvExecutor()
n.OutputStreams[0].Props.StreamIndex = f.DataIndex
ioswitch.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
f.Handle.RangeHint.Offset = repRange.Offset
@@ -274,20 +271,20 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *parser.FromTo, f op
}
}

func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *parser.FromTo, t ops.To) (*ops.Node, error) {
func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch.FromTo, t ioswitch.To) (*dag.Node, error) {
switch t := t.(type) {
case *ops.ToNode:
n, _ := dag.NewNode(ctx.DAG, &ops.IPFSWriteType{
case *ioswitch.ToNode:
n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{
FileHashStoreKey: t.FileHashStoreKey,
Range: t.Range,
}, ops.NodeProps{
}, &ioswitch.NodeProps{
To: t,
})

return n, nil

case *ops.ToExecutor:
n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, ops.NodeProps{To: t})
case *ioswitch.ToDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch.NodeProps{To: t})
n.Env.ToEnvExecutor()

return n, nil
@@ -301,7 +298,7 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *parser.FromTo, t ops.
func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
changed := false

dag.WalkOnlyType[*ops.ChunkedJoinType](ctx.DAG, func(node *ops.Node, typ *ops.ChunkedJoinType) bool {
dag.WalkOnlyType[*ops2.ChunkedJoinType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedJoinType) bool {
if len(node.OutputStreams[0].Toes) > 0 {
return true
}
@@ -320,7 +317,7 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
// 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
changed := false
dag.WalkOnlyType[*ops.MultiplyType](ctx.DAG, func(node *ops.Node, typ *ops.MultiplyType) bool {
dag.WalkOnlyType[*ops2.MultiplyType](ctx.DAG, func(node *dag.Node, typ *ops2.MultiplyType) bool {
for i2, out := range node.OutputStreams {
if len(out.Toes) > 0 {
continue
@@ -349,7 +346,7 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
// 删除未使用的Split指令
func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
changed := false
dag.WalkOnlyType[*ops.ChunkedSplitType](ctx.DAG, func(node *ops.Node, typ *ops.ChunkedSplitType) bool {
dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedSplitType) bool {
// Split出来的每一个流都没有被使用,才能删除这个指令
for _, out := range node.OutputStreams {
if len(out.Toes) > 0 {
@@ -370,9 +367,9 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
changed := false

dag.WalkOnlyType[*ops.ChunkedSplitType](ctx.DAG, func(splitNode *ops.Node, typ *ops.ChunkedSplitType) bool {
dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(splitNode *dag.Node, typ *ops2.ChunkedSplitType) bool {
// Split指令的每一个输出都有且只有一个目的地
var joinNode *ops.Node
var joinNode *dag.Node
for _, out := range splitNode.OutputStreams {
if len(out.Toes) != 1 {
continue
@@ -390,7 +387,7 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
}

// 且这个目的地要是一个Join指令
_, ok := joinNode.Type.(*ops.ChunkedJoinType)
_, ok := joinNode.Type.(*ops2.ChunkedJoinType)
if !ok {
return true
}
@@ -424,7 +421,7 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
// 所以理论上不会出现有指令的位置始终无法确定的情况。
func (p *DefaultParser) pin(ctx *ParseContext) bool {
changed := false
ctx.DAG.Walk(func(node *ops.Node) bool {
ctx.DAG.Walk(func(node *dag.Node) bool {
var toEnv *dag.NodeEnv
for _, out := range node.OutputStreams {
for _, to := range out.Toes {
@@ -480,10 +477,10 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool {

// 对于所有未使用的流,增加Drop指令
func (p *DefaultParser) dropUnused(ctx *ParseContext) {
ctx.DAG.Walk(func(node *ops.Node) bool {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) == 0 {
n := ctx.DAG.NewNode(&ops.DropType{}, ops.NodeProps{})
n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitch.NodeProps{})
n.Env = node.Env
out.To(n, 0)
}
@@ -494,14 +491,14 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) {

// 为IPFS写入指令存储结果
func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
dag.WalkOnlyType[*ops.IPFSWriteType](ctx.DAG, func(node *ops.Node, typ *ops.IPFSWriteType) bool {
dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool {
if typ.FileHashStoreKey == "" {
return true
}

n := ctx.DAG.NewNode(&ops.StoreType{
StoreKey: typ.FileHashStoreKey,
}, ops.NodeProps{})
}, &ioswitch.NodeProps{})
n.Env.ToEnvExecutor()

node.OutputValues[0].To(n, 0)
@@ -511,21 +508,22 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {

// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
func (p *DefaultParser) generateRange(ctx *ParseContext) {
ctx.DAG.Walk(func(node *ops.Node) bool {
if node.Props.To == nil {
ctx.DAG.Walk(func(node *dag.Node) bool {
props := ioswitch.NProps(node)
if props.To == nil {
return true
}

toDataIdx := node.Props.To.GetDataIndex()
toRng := node.Props.To.GetRange()
toDataIdx := props.To.GetDataIndex()
toRng := props.To.GetRange()

if toDataIdx == -1 {
n := ctx.DAG.NewNode(&ops.RangeType{
n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
},
}, ops.NodeProps{})
}, &ioswitch.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
@@ -538,12 +536,12 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) {

blkStart := blkStartIdx * int64(p.EC.ChunkSize)

n := ctx.DAG.NewNode(&ops.RangeType{
n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
},
}, ops.NodeProps{})
}, &ioswitch.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
@@ -557,13 +555,13 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) {

// 生成Clone指令
func (p *DefaultParser) generateClone(ctx *ParseContext) {
ctx.DAG.Walk(func(node *ops.Node) bool {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops.CloneStreamType{}, ops.NodeProps{})
n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
@@ -577,7 +575,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops.CloneVarType{}, ops.NodeProps{})
n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
@@ -589,140 +587,3 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {
return true
})
}

// 生成Send指令
func (p *DefaultParser) generateSend(ctx *ParseContext) {
ctx.DAG.Walk(func(node *ops.Node) bool {
for _, out := range node.OutputStreams {
to := out.Toes[0]
if to.Node.Env.Equals(node.Env) {
continue
}

switch to.Node.Env.Type {
case dag.EnvExecutor:
// // 如果是要送到Executor,则只能由Executor主动去拉取
getNode := ctx.DAG.NewNode(&ops.GetStreamType{}, ops.NodeProps{})
getNode.Env.ToEnvExecutor()

// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode := ctx.DAG.NewNode(&ops.HoldUntilType{}, ops.NodeProps{})
holdNode.Env = node.Env

// 将Get指令的信号送到Hold指令
getNode.OutputValues[0].To(holdNode, 0)
// 将Get指令的输出送到目的地
getNode.OutputStreams[0].To(to.Node, to.SlotIndex)
out.Toes = nil
// 将源节点的输出送到Hold指令
out.To(holdNode, 0)
// 将Hold指令的输出送到Get指令
holdNode.OutputStreams[0].To(getNode, 0)

case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送
n := ctx.DAG.NewNode(&ops.SendStreamType{}, ops.NodeProps{})
n.Env = node.Env
n.OutputStreams[0].To(to.Node, to.SlotIndex)
out.Toes = nil
out.To(n, 0)
}
}

for _, out := range node.OutputValues {
to := out.Toes[0]
if to.Node.Env.Equals(node.Env) {
continue
}

switch to.Node.Env.Type {
case dag.EnvExecutor:
// // 如果是要送到Executor,则只能由Executor主动去拉取
getNode := ctx.DAG.NewNode(&ops.GetVaType{}, ops.NodeProps{})
getNode.Env.ToEnvExecutor()

// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode := ctx.DAG.NewNode(&ops.HoldUntilType{}, ops.NodeProps{})
holdNode.Env = node.Env

// 将Get指令的信号送到Hold指令
getNode.OutputValues[0].To(holdNode, 0)
// 将Get指令的输出送到目的地
getNode.OutputValues[1].To(to.Node, to.SlotIndex)
out.Toes = nil
// 将源节点的输出送到Hold指令
out.To(holdNode, 0)
// 将Hold指令的输出送到Get指令
holdNode.OutputValues[0].To(getNode, 0)

case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送
n := ctx.DAG.NewNode(&ops.SendVarType{}, ops.NodeProps{})
n.Env = node.Env
n.OutputValues[0].To(to.Node, to.SlotIndex)
out.Toes = nil
out.To(n, 0)
}
}

return true
})
}

// 生成Plan
func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *exec.PlanBuilder) error {
var retErr error
ctx.DAG.Walk(func(node *dag.Node[ops.NodeProps, ops.VarProps]) bool {
for _, out := range node.OutputStreams {
if out.Props.Var != nil {
continue
}

out.Props.Var = blder.NewStreamVar()
}

for _, in := range node.InputStreams {
if in.Props.Var != nil {
continue
}

in.Props.Var = blder.NewStreamVar()
}

for _, out := range node.OutputValues {
if out.Props.Var != nil {
continue
}

switch out.Props.ValueType {
case ops.StringValueVar:
out.Props.Var = blder.NewStringVar()
case ops.SignalValueVar:
out.Props.Var = blder.NewSignalVar()
}

}

for _, in := range node.InputValues {
if in.Props.Var != nil {
continue
}

switch in.Props.ValueType {
case ops.StringValueVar:
in.Props.Var = blder.NewStringVar()
case ops.SignalValueVar:
in.Props.Var = blder.NewSignalVar()
}
}

if err := node.Type.GenerateOp(node, blder); err != nil {
retErr = err
return false
}

return true
})

return retErr
}

+ 17
- 0
common/pkgs/ioswitch/utils.go View File

@@ -0,0 +1,17 @@
package ioswitch

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func NProps(n *dag.Node) *NodeProps {
return dag.NProps[*NodeProps](n)
}

func SProps(str *dag.StreamVar) *VarProps {
return dag.SProps[*VarProps](str)
}

func VProps(v *dag.ValueVar) *VarProps {
return dag.VProps[*VarProps](v)
}

Loading…
Cancel
Save