From 35f5bfa4ea869377b0feef28a966df4274a4d0dd Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 15 Aug 2024 16:48:17 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/grpc/io.go | 34 +- agent/internal/grpc/service.go | 8 +- agent/internal/mq/service.go | 5 +- agent/internal/task/task.go | 5 +- agent/main.go | 8 +- client/internal/cmdline/test.go | 119 +++++ common/pkgs/downloader/iterator.go | 3 +- common/pkgs/grpc/agent/client.go | 14 +- common/pkgs/ioswitch/dag/graph.go | 74 --- common/pkgs/ioswitch/dag/node.go | 68 --- common/pkgs/ioswitch/dag/var.go | 112 ----- common/pkgs/ioswitch/exec/exec.go | 8 - common/pkgs/ioswitch/exec/executor.go | 120 ----- common/pkgs/ioswitch/exec/plan_builder.go | 127 ----- common/pkgs/ioswitch/exec/utils.go | 10 - common/pkgs/ioswitch/ioswitch.go | 69 --- common/pkgs/ioswitch/manager.go | 85 ---- common/pkgs/ioswitch/ops/chunked.go | 90 +++- common/pkgs/ioswitch/ops/clone.go | 80 +++- common/pkgs/ioswitch/ops/driver.go | 43 ++ common/pkgs/ioswitch/ops/drop.go | 31 +- common/pkgs/ioswitch/ops/ec.go | 102 +++-- common/pkgs/ioswitch/ops/file.go | 61 ++- common/pkgs/ioswitch/ops/fromto.go | 134 ++++++ common/pkgs/ioswitch/ops/grpc.go | 161 +++++-- common/pkgs/ioswitch/ops/ipfs.go | 73 ++- common/pkgs/ioswitch/ops/join.go | 14 +- common/pkgs/ioswitch/ops/length.go | 14 +- common/pkgs/ioswitch/ops/ops.go | 129 +++++- common/pkgs/ioswitch/ops/range.go | 47 +- common/pkgs/ioswitch/ops/store.go | 35 +- common/pkgs/ioswitch/ops/sync.go | 107 +++-- common/pkgs/ioswitch/ops/var.go | 8 +- common/pkgs/ioswitch/plans/fromto.go | 223 --------- common/pkgs/ioswitch/plans/ops.go | 534 ---------------------- common/pkgs/ioswitch/plans/parser.go | 202 ++++---- common/pkgs/ioswitch/switch.go | 151 ------ common/pkgs/ioswitch/utils.go | 24 - 38 files changed, 1163 insertions(+), 1969 deletions(-) create mode 100644 client/internal/cmdline/test.go delete mode 100644 common/pkgs/ioswitch/dag/graph.go delete mode 100644 common/pkgs/ioswitch/dag/node.go delete mode 100644 common/pkgs/ioswitch/dag/var.go delete mode 100644 common/pkgs/ioswitch/exec/exec.go delete mode 100644 common/pkgs/ioswitch/exec/executor.go delete mode 100644 common/pkgs/ioswitch/exec/plan_builder.go delete mode 100644 common/pkgs/ioswitch/exec/utils.go delete mode 100644 common/pkgs/ioswitch/ioswitch.go delete mode 100644 common/pkgs/ioswitch/manager.go create mode 100644 common/pkgs/ioswitch/ops/driver.go create mode 100644 common/pkgs/ioswitch/ops/fromto.go delete mode 100644 common/pkgs/ioswitch/plans/fromto.go delete mode 100644 common/pkgs/ioswitch/plans/ops.go delete mode 100644 common/pkgs/ioswitch/switch.go delete mode 100644 common/pkgs/ioswitch/utils.go diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index 4ae7be5..b708751 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -6,15 +6,15 @@ import ( "io" "time" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanReq) (*agtrpc.ExecuteIOPlanResp, error) { - plan, err := serder.JSONToObjectEx[ioswitch.Plan]([]byte(req.Plan)) + plan, err := serder.JSONToObjectEx[exec.Plan]([]byte(req.Plan)) if err != nil { return nil, fmt.Errorf("deserializing plan: %w", err) } @@ -22,10 +22,10 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe logger.WithField("PlanID", plan.ID).Infof("begin execute io plan") defer logger.WithField("PlanID", plan.ID).Infof("plan finished") - sw := ioswitch.NewSwitch(plan) + sw := exec.NewExecutor(plan) - s.swMgr.Add(sw) - defer s.swMgr.Remove(sw) + s.swWorker.Add(sw) + defer s.swWorker.Remove(sw) err = sw.Run(ctx) if err != nil { @@ -53,15 +53,15 @@ func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error { ctx, cancel := context.WithTimeout(server.Context(), time.Second*30) defer cancel() - sw := s.swMgr.FindByIDContexted(ctx, ioswitch.PlanID(msg.PlanID)) + sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(msg.PlanID)) if sw == nil { return fmt.Errorf("plan not found") } pr, pw := io.Pipe() - varID := ioswitch.VarID(msg.VarID) - sw.PutVars(&ioswitch.StreamVar{ + varID := exec.VarID(msg.VarID) + sw.PutVars(&exec.StreamVar{ ID: varID, Stream: pr, }) @@ -122,20 +122,20 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStr ctx, cancel := context.WithTimeout(server.Context(), time.Second*30) defer cancel() - sw := s.swMgr.FindByIDContexted(ctx, ioswitch.PlanID(req.PlanID)) + sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID)) if sw == nil { return fmt.Errorf("plan not found") } - signal, err := serder.JSONToObjectEx[*ioswitch.SignalVar]([]byte(req.Signal)) + signal, err := serder.JSONToObjectEx[*exec.SignalVar]([]byte(req.Signal)) if err != nil { return fmt.Errorf("deserializing var: %w", err) } sw.PutVars(signal) - strVar := &ioswitch.StreamVar{ - ID: ioswitch.VarID(req.VarID), + strVar := &exec.StreamVar{ + ID: exec.VarID(req.VarID), } err = sw.BindVars(server.Context(), strVar) if err != nil { @@ -193,12 +193,12 @@ func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc. ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - sw := s.swMgr.FindByIDContexted(ctx, ioswitch.PlanID(req.PlanID)) + sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID)) if sw == nil { return nil, fmt.Errorf("plan not found") } - v, err := serder.JSONToObjectEx[ioswitch.Var]([]byte(req.Var)) + v, err := serder.JSONToObjectEx[exec.Var]([]byte(req.Var)) if err != nil { return nil, fmt.Errorf("deserializing var: %w", err) } @@ -211,17 +211,17 @@ func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.Ge ctx2, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - sw := s.swMgr.FindByIDContexted(ctx2, ioswitch.PlanID(req.PlanID)) + sw := s.swWorker.FindByIDContexted(ctx2, exec.PlanID(req.PlanID)) if sw == nil { return nil, fmt.Errorf("plan not found") } - v, err := serder.JSONToObjectEx[ioswitch.Var]([]byte(req.Var)) + v, err := serder.JSONToObjectEx[exec.Var]([]byte(req.Var)) if err != nil { return nil, fmt.Errorf("deserializing var: %w", err) } - signal, err := serder.JSONToObjectEx[*ioswitch.SignalVar]([]byte(req.Signal)) + signal, err := serder.JSONToObjectEx[*exec.SignalVar]([]byte(req.Signal)) if err != nil { return nil, fmt.Errorf("deserializing var: %w", err) } diff --git a/agent/internal/grpc/service.go b/agent/internal/grpc/service.go index c80b118..4936bae 100644 --- a/agent/internal/grpc/service.go +++ b/agent/internal/grpc/service.go @@ -1,17 +1,17 @@ package grpc import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Service struct { agentserver.AgentServer - swMgr *ioswitch.Manager + swWorker *exec.Worker } -func NewService(swMgr *ioswitch.Manager) *Service { +func NewService(swWorker *exec.Worker) *Service { return &Service{ - swMgr: swMgr, + swWorker: swWorker, } } diff --git a/agent/internal/mq/service.go b/agent/internal/mq/service.go index 96472bb..18c3e56 100644 --- a/agent/internal/mq/service.go +++ b/agent/internal/mq/service.go @@ -2,17 +2,14 @@ package mq import ( "gitlink.org.cn/cloudream/storage/agent/internal/task" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Service struct { taskManager *task.Manager - swMgr *ioswitch.Manager } -func NewService(taskMgr *task.Manager, swMgr *ioswitch.Manager) *Service { +func NewService(taskMgr *task.Manager) *Service { return &Service{ taskManager: taskMgr, - swMgr: swMgr, } } diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index 6f26794..397b59e 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -5,12 +5,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type TaskContext struct { distlock *distlock.Service - swMgr *ioswitch.Manager connectivity *connectivity.Collector downloader *downloader.Downloader } @@ -27,10 +25,9 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, swMgr *ioswitch.Manager, connectivity *connectivity.Collector, downloader *downloader.Downloader) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader) Manager { return task.NewManager(TaskContext{ distlock: distlock, - swMgr: swMgr, connectivity: connectivity, downloader: downloader, }) diff --git a/agent/main.go b/agent/main.go index 5faf024..42e3735 100644 --- a/agent/main.go +++ b/agent/main.go @@ -5,6 +5,7 @@ import ( "net" "os" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" log "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/agent/internal/config" @@ -14,7 +15,6 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" // TODO 注册OpUnion,但在mq包中注册会造成循环依赖,所以只能放到这里 _ "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" @@ -92,15 +92,15 @@ func main() { log.Fatalf("new ipfs failed, err: %s", err.Error()) } - sw := ioswitch.NewManager() + sw := exec.NewWorker() dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) - taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder) + taskMgr := task.NewManager(distlock, &conCol, &dlder) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, &sw), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go new file mode 100644 index 0000000..6bb6ac4 --- /dev/null +++ b/client/internal/cmdline/test.go @@ -0,0 +1,119 @@ +package cmdline + +import ( + "context" + "fmt" + "io" + + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/common/pkgs/future" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +func init() { + cmd := &cobra.Command{ + Use: "test2", + Short: "test2", + // Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + // cmdCtx := GetCmdCtx(cmd) + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + panic(err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + nodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{1, 2})) + if err != nil { + panic(err) + } + + ft := plans.NewFromTo() + + // ft.AddFrom(plans.NewFromNode("Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", &nodes.Nodes[0], -1)) + // ft.AddTo(plans.NewToNode(nodes.Nodes[1], -1, "asd")) + // len := int64(3) + // toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{Offset: 5, Length: &len}) + // ft.AddTo(toExec) + // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 0, "0")) + // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 1, "1")) + // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 2, "2")) + + ft.AddFrom(plans.NewFromNode("QmS2s8GRYHEurXL7V1zUtKvf2H1BGcQc5NN1T1hiSnWvbd", &nodes.Nodes[0], 1)) + ft.AddFrom(plans.NewFromNode("QmUgUEUMzdnjPNx6xu9PDGXpSyXTk8wzPWvyYZ9zasE1WW", &nodes.Nodes[1], 2)) + le := int64(3) + toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{Offset: 5, Length: &le}) + // toExec, hd := plans.NewToExecutorWithRange(1, plans.Range{Offset: 0, Length: nil}) + // toExec2, hd2 := plans.NewToExecutorWithRange(2, plans.Range{Offset: 0, Length: nil}) + ft.AddTo(toExec) + // ft.AddTo(toExec2) + + // fromExec, hd := plans.NewFromExecutor(-1) + // ft.AddFrom(fromExec) + // ft.AddTo(plans.NewToNode(nodes.Nodes[1], -1, "asd")) + + parser := plans.NewParser(cdssdk.DefaultECRedundancy) + + plans := plans.NewPlanBuilder() + err = parser.Parse(ft, plans) + if err != nil { + panic(err) + } + + exec := plans.Execute() + + fut := future.NewSetVoid() + go func() { + mp, err := exec.Wait(context.Background()) + if err != nil { + panic(err) + } + + fmt.Printf("mp: %+v\n", mp) + fut.SetVoid() + }() + + go func() { + // exec.BeginWrite(io.NopCloser(bytes.NewBuffer([]byte("hello world"))), hd) + // if err != nil { + // panic(err) + // } + + str, err := exec.BeginRead(hd) + if err != nil { + panic(err) + } + defer str.Close() + data, err := io.ReadAll(str) + if err != nil && err != io.EOF { + panic(err) + } + fmt.Printf("data: %v(%v)\n", string(data), len(data)) + }() + + fut.Wait(context.TODO()) + }, + } + + cmd2 := &cobra.Command{ + Use: "test", + Short: "test", + // Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + cmdCtx := GetCmdCtx(cmd) + file, _ := cmdCtx.Cmdline.Svc.ObjectSvc().Download(1, downloader.DownloadReqeust{ + ObjectID: 27379, + Length: -1, + }) + data, _ := io.ReadAll(file.File) + fmt.Printf("data: %v(%v)\n", string(data), len(data)) + }, + } + + rootCmd.AddCommand(cmd) + rootCmd.AddCommand(cmd2) +} diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 5336944..d43a202 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -11,6 +11,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -362,7 +363,7 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 { } func (iter *DownloadObjectIterator) downloadFromNode(node *cdssdk.Node, req downloadReqeust2) (io.ReadCloser, error) { - var strHandle *plans.ExecutorReadStream + var strHandle *exec.DriverReadStream ft := plans.NewFromTo() toExec, handle := plans.NewToExecutor(-1) diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index b206ad7..3f7a762 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -5,8 +5,8 @@ import ( "fmt" "io" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/serder" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -28,7 +28,7 @@ func NewClient(addr string) (*Client, error) { }, nil } -func (c *Client) ExecuteIOPlan(ctx context.Context, plan ioswitch.Plan) error { +func (c *Client) ExecuteIOPlan(ctx context.Context, plan exec.Plan) error { data, err := serder.ObjectToJSONEx(plan) if err != nil { return err @@ -83,7 +83,7 @@ func (s *grpcStreamReadCloser) Close() error { return nil } -func (c *Client) SendStream(ctx context.Context, planID ioswitch.PlanID, varID ioswitch.VarID, str io.Reader) error { +func (c *Client) SendStream(ctx context.Context, planID exec.PlanID, varID exec.VarID, str io.Reader) error { sendCli, err := c.cli.SendStream(ctx) if err != nil { return err @@ -132,7 +132,7 @@ func (c *Client) SendStream(ctx context.Context, planID ioswitch.PlanID, varID i } } -func (c *Client) GetStream(planID ioswitch.PlanID, varID ioswitch.VarID, signal *ioswitch.SignalVar) (io.ReadCloser, error) { +func (c *Client) GetStream(planID exec.PlanID, varID exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) { ctx, cancel := context.WithCancel(context.Background()) sdata, err := serder.ObjectToJSONEx(signal) @@ -156,7 +156,7 @@ func (c *Client) GetStream(planID ioswitch.PlanID, varID ioswitch.VarID, signal }, nil } -func (c *Client) SendVar(ctx context.Context, planID ioswitch.PlanID, v ioswitch.Var) error { +func (c *Client) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error { data, err := serder.ObjectToJSONEx(v) if err != nil { return err @@ -169,7 +169,7 @@ func (c *Client) SendVar(ctx context.Context, planID ioswitch.PlanID, v ioswitch return err } -func (c *Client) GetVar(ctx context.Context, planID ioswitch.PlanID, v ioswitch.Var, signal *ioswitch.SignalVar) (ioswitch.Var, error) { +func (c *Client) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) (exec.Var, error) { vdata, err := serder.ObjectToJSONEx(v) if err != nil { return nil, err @@ -189,7 +189,7 @@ func (c *Client) GetVar(ctx context.Context, planID ioswitch.PlanID, v ioswitch. return nil, err } - v2, err := serder.JSONToObjectEx[ioswitch.Var]([]byte(resp.Var)) + v2, err := serder.JSONToObjectEx[exec.Var]([]byte(resp.Var)) if err != nil { return nil, err } diff --git a/common/pkgs/ioswitch/dag/graph.go b/common/pkgs/ioswitch/dag/graph.go deleted file mode 100644 index 811a6fe..0000000 --- a/common/pkgs/ioswitch/dag/graph.go +++ /dev/null @@ -1,74 +0,0 @@ -package dag - -import ( - "gitlink.org.cn/cloudream/common/utils/lo2" -) - -type Graph[NP any, VP any] struct { - Nodes []*Node[NP, VP] - isWalking bool - nextVarID int -} - -func NewGraph[NP any, VP any]() *Graph[NP, VP] { - return &Graph[NP, VP]{} -} - -func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] { - n := &Node[NP, VP]{ - Type: typ, - Props: props, - Graph: g, - } - typ.InitNode(n) - g.Nodes = append(g.Nodes, n) - return n -} - -func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) { - for i, n := range g.Nodes { - if n == node { - if g.isWalking { - g.Nodes[i] = nil - } else { - g.Nodes = lo2.RemoveAt(g.Nodes, i) - } - break - } - } -} - -func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) { - g.isWalking = true - for i := 0; i < len(g.Nodes); i++ { - if g.Nodes[i] == nil { - continue - } - - if !cb(g.Nodes[i]) { - break - } - } - g.isWalking = false - - g.Nodes = lo2.RemoveAllDefault(g.Nodes) -} - -func (g *Graph[NP, VP]) genVarID() int { - g.nextVarID++ - return g.nextVarID -} - -func NewNode[NP any, VP any, NT NodeType[NP, VP]](graph *Graph[NP, VP], typ NT, props NP) (*Node[NP, VP], NT) { - return graph.NewNode(typ, props), typ -} - -func WalkOnlyType[N NodeType[NP, VP], NP any, VP any](g *Graph[NP, VP], cb func(node *Node[NP, VP], typ N) bool) { - g.Walk(func(node *Node[NP, VP]) bool { - typ, ok := node.Type.(N) - if ok { - return cb(node, typ) - } - return true - }) -} diff --git a/common/pkgs/ioswitch/dag/node.go b/common/pkgs/ioswitch/dag/node.go deleted file mode 100644 index 5e5dd1f..0000000 --- a/common/pkgs/ioswitch/dag/node.go +++ /dev/null @@ -1,68 +0,0 @@ -package dag - -import ( - "fmt" - - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" -) - -type NodeType[NP any, VP any] interface { - InitNode(node *Node[NP, VP]) - String(node *Node[NP, VP]) string - GenerateOp(node *Node[NP, VP], blder *exec.PlanBuilder) error -} - -type NodeEnvType string - -const ( - EnvUnknown NodeEnvType = "" - EnvExecutor NodeEnvType = "Executor" - EnvWorker NodeEnvType = "Worker" -) - -type NodeEnv struct { - Type NodeEnvType - Worker exec.Worker -} - -func (e *NodeEnv) ToEnvUnknown() { - e.Type = EnvUnknown - e.Worker = nil -} - -func (e *NodeEnv) ToEnvExecutor() { - e.Type = EnvExecutor - e.Worker = nil -} - -func (e *NodeEnv) ToEnvWorker(worker exec.Worker) { - e.Type = EnvWorker - e.Worker = worker -} - -func (e *NodeEnv) Equals(other NodeEnv) bool { - if e.Type != other.Type { - return false - } - - if e.Type != EnvWorker { - return true - } - - return e.Worker.Equals(other.Worker) -} - -type Node[NP any, VP any] struct { - Type NodeType[NP, VP] - Env NodeEnv - Props NP - InputStreams []*StreamVar[NP, VP] - OutputStreams []*StreamVar[NP, VP] - InputValues []*ValueVar[NP, VP] - OutputValues []*ValueVar[NP, VP] - Graph *Graph[NP, VP] -} - -func (n *Node[NP, VP]) String() string { - return fmt.Sprintf("%v", n.Type.String(n)) -} diff --git a/common/pkgs/ioswitch/dag/var.go b/common/pkgs/ioswitch/dag/var.go deleted file mode 100644 index e1caa7b..0000000 --- a/common/pkgs/ioswitch/dag/var.go +++ /dev/null @@ -1,112 +0,0 @@ -package dag - -import "gitlink.org.cn/cloudream/common/utils/lo2" - -type EndPoint[NP any, VP any] struct { - Node *Node[NP, VP] - SlotIndex int // 所连接的Node的Output或Input数组的索引 -} - -type StreamVar[NP any, VP any] struct { - ID int - From EndPoint[NP, VP] - Toes []EndPoint[NP, VP] - Props VP -} - -func (v *StreamVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { - v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) - to.InputStreams[slotIdx] = v - return len(v.Toes) - 1 -} - -// func (v *StreamVar[NP, VP]) NotTo(toIdx int) EndPoint[NP, VP] { -// ed := v.Toes[toIdx] -// lo2.RemoveAt(v.Toes, toIdx) -// ed.Node.InputStreams[ed.SlotIndex] = nil -// return ed -// } - -func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) { - for i, ed := range v.Toes { - if ed.Node == node { - v.Toes = lo2.RemoveAt(v.Toes, i) - ed.Node.InputStreams[ed.SlotIndex] = nil - return ed, true - } - } - - return EndPoint[NP, VP]{}, false -} - -func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []EndPoint[NP, VP] { - var newToes []EndPoint[NP, VP] - var rmed []EndPoint[NP, VP] - for _, ed := range v.Toes { - if pred(ed) { - ed.Node.InputStreams[ed.SlotIndex] = nil - rmed = append(rmed, ed) - } else { - newToes = append(newToes, ed) - } - } - v.Toes = newToes - return rmed -} - -func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] { - for _, ed := range v.Toes { - ed.Node.InputStreams[ed.SlotIndex] = nil - } - toes := v.Toes - v.Toes = nil - return toes -} - -func NodeNewOutputStream[NP any, VP any](node *Node[NP, VP], props VP) *StreamVar[NP, VP] { - str := &StreamVar[NP, VP]{ - ID: node.Graph.genVarID(), - From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, - Props: props, - } - node.OutputStreams = append(node.OutputStreams, str) - return str -} - -func NodeDeclareInputStream[NP any, VP any](node *Node[NP, VP], cnt int) { - node.InputStreams = make([]*StreamVar[NP, VP], cnt) -} - -type ValueVarType int - -const ( - StringValueVar ValueVarType = iota - SignalValueVar -) - -type ValueVar[NP any, VP any] struct { - ID int - From EndPoint[NP, VP] - Toes []EndPoint[NP, VP] - Props VP -} - -func (v *ValueVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { - v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) - to.InputValues[slotIdx] = v - return len(v.Toes) - 1 -} - -func NodeNewOutputValue[NP any, VP any](node *Node[NP, VP], props VP) *ValueVar[NP, VP] { - val := &ValueVar[NP, VP]{ - ID: node.Graph.genVarID(), - From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, - Props: props, - } - node.OutputValues = append(node.OutputValues, val) - return val -} - -func NodeDeclareInputValue[NP any, VP any](node *Node[NP, VP], cnt int) { - node.InputValues = make([]*ValueVar[NP, VP], cnt) -} diff --git a/common/pkgs/ioswitch/exec/exec.go b/common/pkgs/ioswitch/exec/exec.go deleted file mode 100644 index 9c4000f..0000000 --- a/common/pkgs/ioswitch/exec/exec.go +++ /dev/null @@ -1,8 +0,0 @@ -package exec - -type Worker interface { - // 获取连接到这个worker的GRPC服务的地址 - GetAddress() string - // 判断两个worker是否相同 - Equals(worker Worker) bool -} diff --git a/common/pkgs/ioswitch/exec/executor.go b/common/pkgs/ioswitch/exec/executor.go deleted file mode 100644 index 4a11160..0000000 --- a/common/pkgs/ioswitch/exec/executor.go +++ /dev/null @@ -1,120 +0,0 @@ -package exec - -import ( - "context" - "fmt" - "io" - "sync" - - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/utils/io2" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" -) - -type Executor struct { - planID ioswitch.PlanID - planBlder *PlanBuilder - callback *future.SetVoidFuture - ctx context.Context - cancel context.CancelFunc - executorSw *ioswitch.Switch -} - -// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。 -func (e *Executor) BeginWrite(str io.ReadCloser, handle *ExecutorWriteStream) { - handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length) - e.executorSw.PutVars(handle.Var) -} - -// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理 -func (e *Executor) BeginWriteRanged(str io.ReadCloser, handle *ExecutorWriteStream) { - handle.Var.Stream = str - e.executorSw.PutVars(handle.Var) -} - -func (e *Executor) BeginRead(handle *ExecutorReadStream) (io.ReadCloser, error) { - err := e.executorSw.BindVars(e.ctx, handle.Var) - if err != nil { - return nil, fmt.Errorf("bind vars: %w", err) - } - - return handle.Var.Stream, nil -} - -func (e *Executor) Signal(signal *ExecutorSignalVar) { - e.executorSw.PutVars(signal.Var) -} - -func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { - err := e.callback.Wait(ctx) - if err != nil { - return nil, err - } - - ret := make(map[string]any) - e.planBlder.ExecutorPlan.StoreMap.Range(func(k, v any) bool { - ret[k.(string)] = v - return true - }) - - return ret, nil -} - -func (e *Executor) execute() { - wg := sync.WaitGroup{} - - for _, p := range e.planBlder.AgentPlans { - wg.Add(1) - - go func(p *AgentPlanBuilder) { - defer wg.Done() - - plan := ioswitch.Plan{ - ID: e.planID, - Ops: p.Ops, - } - - cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&p.Node)) - if err != nil { - e.stopWith(fmt.Errorf("new agent rpc client of node %v: %w", p.Node.NodeID, err)) - return - } - defer stgglb.AgentRPCPool.Release(cli) - - err = cli.ExecuteIOPlan(e.ctx, plan) - if err != nil { - e.stopWith(fmt.Errorf("execute plan at %v: %w", p.Node.NodeID, err)) - return - } - }(p) - } - - err := e.executorSw.Run(e.ctx) - if err != nil { - e.stopWith(fmt.Errorf("run executor switch: %w", err)) - return - } - - wg.Wait() - - e.callback.SetVoid() -} - -func (e *Executor) stopWith(err error) { - e.callback.SetError(err) - e.cancel() -} - -type ExecutorWriteStream struct { - Var *ioswitch.StreamVar - RangeHint *Range -} - -type ExecutorReadStream struct { - Var *ioswitch.StreamVar -} - -type ExecutorSignalVar struct { - Var *ioswitch.SignalVar -} diff --git a/common/pkgs/ioswitch/exec/plan_builder.go b/common/pkgs/ioswitch/exec/plan_builder.go deleted file mode 100644 index ad2ae81..0000000 --- a/common/pkgs/ioswitch/exec/plan_builder.go +++ /dev/null @@ -1,127 +0,0 @@ -package exec - -import ( - "context" - "sync" - - "gitlink.org.cn/cloudream/common/pkgs/future" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" -) - -type PlanBuilder struct { - Vars []ioswitch.Var - AgentPlans map[cdssdk.NodeID]*AgentPlanBuilder - ExecutorPlan ExecutorPlanBuilder -} - -func NewPlanBuilder() *PlanBuilder { - bld := &PlanBuilder{ - AgentPlans: make(map[cdssdk.NodeID]*AgentPlanBuilder), - ExecutorPlan: ExecutorPlanBuilder{ - StoreMap: &sync.Map{}, - }, - } - - return bld -} - -func (b *PlanBuilder) AtExecutor() *ExecutorPlanBuilder { - return &b.ExecutorPlan -} - -func (b *PlanBuilder) AtAgent(node cdssdk.Node) *AgentPlanBuilder { - agtPlan, ok := b.AgentPlans[node.NodeID] - if !ok { - agtPlan = &AgentPlanBuilder{ - Node: node, - } - b.AgentPlans[node.NodeID] = agtPlan - } - - return agtPlan -} - -func (b *PlanBuilder) NewStreamVar() *ioswitch.StreamVar { - v := &ioswitch.StreamVar{ - ID: ioswitch.VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) - - return v -} - -func (b *PlanBuilder) NewIntVar() *ioswitch.IntVar { - v := &ioswitch.IntVar{ - ID: ioswitch.VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) - - return v -} - -func (b *PlanBuilder) NewStringVar() *ioswitch.StringVar { - v := &ioswitch.StringVar{ - ID: ioswitch.VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) - - return v -} -func (b *PlanBuilder) NewSignalVar() *ioswitch.SignalVar { - v := &ioswitch.SignalVar{ - ID: ioswitch.VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) - - return v -} - -func (b *PlanBuilder) Execute() *Executor { - ctx, cancel := context.WithCancel(context.Background()) - planID := genRandomPlanID() - - execPlan := ioswitch.Plan{ - ID: planID, - Ops: b.ExecutorPlan.Ops, - } - - exec := Executor{ - planID: planID, - planBlder: b, - callback: future.NewSetVoid(), - ctx: ctx, - cancel: cancel, - executorSw: ioswitch.NewSwitch(execPlan), - } - go exec.execute() - - return &exec -} - -type AgentPlanBuilder struct { - Node cdssdk.Node - Ops []ioswitch.Op -} - -func (b *AgentPlanBuilder) AddOp(op ioswitch.Op) { - b.Ops = append(b.Ops, op) -} - -func (b *AgentPlanBuilder) RemoveOp(op ioswitch.Op) { - b.Ops = lo2.Remove(b.Ops, op) -} - -type ExecutorPlanBuilder struct { - Ops []ioswitch.Op - StoreMap *sync.Map -} - -func (b *ExecutorPlanBuilder) AddOp(op ioswitch.Op) { - b.Ops = append(b.Ops, op) -} - -func (b *ExecutorPlanBuilder) RemoveOp(op ioswitch.Op) { - b.Ops = lo2.Remove(b.Ops, op) -} diff --git a/common/pkgs/ioswitch/exec/utils.go b/common/pkgs/ioswitch/exec/utils.go deleted file mode 100644 index 3124af6..0000000 --- a/common/pkgs/ioswitch/exec/utils.go +++ /dev/null @@ -1,10 +0,0 @@ -package exec - -import ( - "github.com/google/uuid" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" -) - -func genRandomPlanID() ioswitch.PlanID { - return ioswitch.PlanID(uuid.NewString()) -} diff --git a/common/pkgs/ioswitch/ioswitch.go b/common/pkgs/ioswitch/ioswitch.go deleted file mode 100644 index a72989a..0000000 --- a/common/pkgs/ioswitch/ioswitch.go +++ /dev/null @@ -1,69 +0,0 @@ -package ioswitch - -import ( - "context" - "io" - - "gitlink.org.cn/cloudream/common/pkgs/types" - "gitlink.org.cn/cloudream/common/utils/serder" -) - -type PlanID string - -type VarID int - -type Plan struct { - ID PlanID `json:"id"` - Ops []Op `json:"ops"` -} - -type Var interface { - GetID() VarID -} - -var VarUnion = types.NewTypeUnion[Var]( - (*IntVar)(nil), - (*StringVar)(nil), - (*SignalVar)(nil), - (*StreamVar)(nil), -) -var _ = serder.UseTypeUnionExternallyTagged(&VarUnion) - -type StreamVar struct { - ID VarID `json:"id"` - Stream io.ReadCloser `json:"-"` -} - -func (v *StreamVar) GetID() VarID { - return v.ID -} - -type IntVar struct { - ID VarID `json:"id"` - Value string `json:"value"` -} - -func (v *IntVar) GetID() VarID { - return v.ID -} - -type StringVar struct { - ID VarID `json:"id"` - Value string `json:"value"` -} - -func (v *StringVar) GetID() VarID { - return v.ID -} - -type SignalVar struct { - ID VarID `json:"id"` -} - -func (v *SignalVar) GetID() VarID { - return v.ID -} - -type Op interface { - Execute(ctx context.Context, sw *Switch) error -} diff --git a/common/pkgs/ioswitch/manager.go b/common/pkgs/ioswitch/manager.go deleted file mode 100644 index 940aa66..0000000 --- a/common/pkgs/ioswitch/manager.go +++ /dev/null @@ -1,85 +0,0 @@ -package ioswitch - -import ( - "context" - "sync" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/utils/lo2" -) - -type finding struct { - PlanID PlanID - Callback *future.SetValueFuture[*Switch] -} - -type Manager struct { - lock sync.Mutex - switchs map[PlanID]*Switch - findings []*finding -} - -func NewManager() Manager { - return Manager{ - switchs: make(map[PlanID]*Switch), - } -} - -func (s *Manager) Add(sw *Switch) { - s.lock.Lock() - defer s.lock.Unlock() - - s.switchs[sw.Plan().ID] = sw - - s.findings = lo.Reject(s.findings, func(f *finding, idx int) bool { - if f.PlanID != sw.Plan().ID { - return false - } - - f.Callback.SetValue(sw) - return true - }) -} - -func (s *Manager) Remove(sw *Switch) { - s.lock.Lock() - defer s.lock.Unlock() - - delete(s.switchs, sw.Plan().ID) -} - -func (s *Manager) FindByID(id PlanID) *Switch { - s.lock.Lock() - defer s.lock.Unlock() - - return s.switchs[id] -} - -func (s *Manager) FindByIDContexted(ctx context.Context, id PlanID) *Switch { - s.lock.Lock() - - sw := s.switchs[id] - if sw != nil { - s.lock.Unlock() - return sw - } - - cb := future.NewSetValue[*Switch]() - f := &finding{ - PlanID: id, - Callback: cb, - } - s.findings = append(s.findings, f) - - s.lock.Unlock() - - sw, _ = cb.WaitValue(ctx) - - s.lock.Lock() - defer s.lock.Unlock() - - s.findings = lo2.Remove(s.findings, f) - - return sw -} diff --git a/common/pkgs/ioswitch/ops/chunked.go b/common/pkgs/ioswitch/ops/chunked.go index 048761c..ff65b97 100644 --- a/common/pkgs/ioswitch/ops/chunked.go +++ b/common/pkgs/ioswitch/ops/chunked.go @@ -2,23 +2,31 @@ package ops import ( "context" + "fmt" "io" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "golang.org/x/sync/semaphore" ) +func init() { + OpUnion.AddT((*ChunkedSplit)(nil)) + OpUnion.AddT((*ChunkedJoin)(nil)) +} + type ChunkedSplit struct { - Input *ioswitch.StreamVar `json:"input"` - Outputs []*ioswitch.StreamVar `json:"outputs"` - ChunkSize int `json:"chunkSize"` - PaddingZeros bool `json:"paddingZeros"` + Input *exec.StreamVar `json:"input"` + Outputs []*exec.StreamVar `json:"outputs"` + ChunkSize int `json:"chunkSize"` + PaddingZeros bool `json:"paddingZeros"` } -func (o *ChunkedSplit) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -36,19 +44,19 @@ func (o *ChunkedSplit) Execute(ctx context.Context, sw *ioswitch.Switch) error { sem.Release(1) }) } - ioswitch.PutArrayVars(sw, o.Outputs) + exec.PutArrayVars(e, o.Outputs) return sem.Acquire(ctx, int64(len(outputs))) } type ChunkedJoin struct { - Inputs []*ioswitch.StreamVar `json:"inputs"` - Output *ioswitch.StreamVar `json:"output"` - ChunkSize int `json:"chunkSize"` + Inputs []*exec.StreamVar `json:"inputs"` + Output *exec.StreamVar `json:"output"` + ChunkSize int `json:"chunkSize"` } -func (o *ChunkedJoin) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := ioswitch.BindArrayVars(sw, ctx, o.Inputs) +func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx, o.Inputs) if err != nil { return err } @@ -67,12 +75,60 @@ func (o *ChunkedJoin) Execute(ctx context.Context, sw *ioswitch.Switch) error { o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) return fut.Wait(ctx) } -func init() { - OpUnion.AddT((*ChunkedSplit)(nil)) - OpUnion.AddT((*ChunkedJoin)(nil)) +type ChunkedSplitType struct { + OutputCount int + ChunkSize int +} + +func (t *ChunkedSplitType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + for i := 0; i < t.OutputCount; i++ { + dag.NodeNewOutputStream(node, VarProps{}) + } +} + +func (t *ChunkedSplitType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&ChunkedSplit{ + Input: op.InputStreams[0].Props.Var.(*exec.StreamVar), + Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *exec.StreamVar { + return v.Props.Var.(*exec.StreamVar) + }), + ChunkSize: t.ChunkSize, + PaddingZeros: true, + }, op.Env, blder) + return nil +} + +func (t *ChunkedSplitType) String(node *Node) string { + return fmt.Sprintf("ChunkedSplit[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +} + +type ChunkedJoinType struct { + InputCount int + ChunkSize int +} + +func (t *ChunkedJoinType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, t.InputCount) + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *ChunkedJoinType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&ChunkedJoin{ + Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *exec.StreamVar { + return v.Props.Var.(*exec.StreamVar) + }), + Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar), + ChunkSize: t.ChunkSize, + }, op.Env, blder) + return nil +} + +func (t *ChunkedJoinType) String(node *Node) string { + return fmt.Sprintf("ChunkedJoin[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/clone.go b/common/pkgs/ioswitch/ops/clone.go index c71d6c3..a5421f7 100644 --- a/common/pkgs/ioswitch/ops/clone.go +++ b/common/pkgs/ioswitch/ops/clone.go @@ -5,18 +5,25 @@ import ( "fmt" "io" + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "golang.org/x/sync/semaphore" ) +func init() { + OpUnion.AddT((*CloneStream)(nil)) + OpUnion.AddT((*CloneVar)(nil)) +} + type CloneStream struct { - Input *ioswitch.StreamVar `json:"input"` - Outputs []*ioswitch.StreamVar `json:"outputs"` + Input *exec.StreamVar `json:"input"` + Outputs []*exec.StreamVar `json:"outputs"` } -func (o *CloneStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -32,33 +39,76 @@ func (o *CloneStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { sem.Release(1) }) } - ioswitch.PutArrayVars(sw, o.Outputs) + exec.PutArrayVars(e, o.Outputs) return sem.Acquire(ctx, int64(len(o.Outputs))) } type CloneVar struct { - Raw ioswitch.Var `json:"raw"` - Cloneds []ioswitch.Var `json:"cloneds"` + Raw exec.Var `json:"raw"` + Cloneds []exec.Var `json:"cloneds"` } -func (o *CloneVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Raw) +func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Raw) if err != nil { return err } for _, v := range o.Cloneds { - if err := ioswitch.AssignVar(o.Raw, v); err != nil { + if err := exec.AssignVar(o.Raw, v); err != nil { return fmt.Errorf("clone var: %w", err) } } - sw.PutVars(o.Cloneds...) + e.PutVars(o.Cloneds...) return nil } -func init() { - OpUnion.AddT((*CloneStream)(nil)) - OpUnion.AddT((*CloneVar)(nil)) +type CloneStreamType struct{} + +func (t *CloneStreamType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *CloneStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&CloneStream{ + Input: op.InputStreams[0].Props.Var.(*exec.StreamVar), + Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *exec.StreamVar { + return v.Props.Var.(*exec.StreamVar) + }), + }, op.Env, blder) + return nil +} + +func (t *CloneStreamType) NewOutput(node *Node) *StreamVar { + return dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *CloneStreamType) String(node *Node) string { + return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type CloneVarType struct{} + +func (t *CloneVarType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *CloneVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&CloneVar{ + Raw: op.InputValues[0].Props.Var, + Cloneds: lo.Map(op.OutputValues, func(v *ValueVar, idx int) exec.Var { + return v.Props.Var + }), + }, op.Env, blder) + return nil +} + +func (t *CloneVarType) NewOutput(node *Node) *ValueVar { + return dag.NodeNewOutputValue(node, VarProps{}) +} + +func (t *CloneVarType) String(node *Node) string { + return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/driver.go b/common/pkgs/ioswitch/ops/driver.go new file mode 100644 index 0000000..b87aa72 --- /dev/null +++ b/common/pkgs/ioswitch/ops/driver.go @@ -0,0 +1,43 @@ +package ops + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +type FromDriverType struct { + Handle *exec.DriverWriteStream +} + +func (t *FromDriverType) InitNode(node *Node) { + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *FromDriverType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + t.Handle.Var = op.OutputStreams[0].Props.Var.(*exec.StreamVar) + return nil +} + +func (t *FromDriverType) String(node *Node) string { + return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type ToDriverType struct { + Handle *exec.DriverReadStream + Range exec.Range +} + +func (t *ToDriverType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *ToDriverType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + t.Handle.Var = op.InputStreams[0].Props.Var.(*exec.StreamVar) + return nil +} + +func (t *ToDriverType) String(node *Node) string { + return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +} diff --git a/common/pkgs/ioswitch/ops/drop.go b/common/pkgs/ioswitch/ops/drop.go index 4a78596..3623855 100644 --- a/common/pkgs/ioswitch/ops/drop.go +++ b/common/pkgs/ioswitch/ops/drop.go @@ -2,17 +2,23 @@ package ops import ( "context" + "fmt" "io" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" ) +func init() { + OpUnion.AddT((*DropStream)(nil)) +} + type DropStream struct { - Input *ioswitch.StreamVar `json:"input"` + Input *exec.StreamVar `json:"input"` } -func (o *DropStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -29,6 +35,19 @@ func (o *DropStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } } -func init() { - OpUnion.AddT((*DropStream)(nil)) +type DropType struct{} + +func (t *DropType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *DropType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&DropStream{ + Input: op.InputStreams[0].Props.Var.(*exec.StreamVar), + }, op.Env, blder) + return nil +} + +func (t *DropType) String(node *Node) string { + return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index b64f351..b56628f 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -5,30 +5,38 @@ import ( "fmt" "io" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "golang.org/x/sync/semaphore" ) +func init() { + OpUnion.AddT((*ECReconstructAny)(nil)) + OpUnion.AddT((*ECReconstruct)(nil)) + OpUnion.AddT((*ECMultiply)(nil)) +} + type ECReconstructAny struct { - EC cdssdk.ECRedundancy `json:"ec"` - Inputs []*ioswitch.StreamVar `json:"inputs"` - Outputs []*ioswitch.StreamVar `json:"outputs"` - InputBlockIndexes []int `json:"inputBlockIndexes"` - OutputBlockIndexes []int `json:"outputBlockIndexes"` + EC cdssdk.ECRedundancy `json:"ec"` + Inputs []*exec.StreamVar `json:"inputs"` + Outputs []*exec.StreamVar `json:"outputs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` + OutputBlockIndexes []int `json:"outputBlockIndexes"` } -func (o *ECReconstructAny) Execute(ctx context.Context, sw *ioswitch.Switch) error { +func (o *ECReconstructAny) Execute(ctx context.Context, e *exec.Executor) error { rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) if err != nil { return fmt.Errorf("new ec: %w", err) } - err = ioswitch.BindArrayVars(sw, ctx, o.Inputs) + err = exec.BindArrayVars(e, ctx, o.Inputs) if err != nil { return err } @@ -53,25 +61,25 @@ func (o *ECReconstructAny) Execute(ctx context.Context, sw *ioswitch.Switch) err sem.Release(1) }) } - ioswitch.PutArrayVars(sw, o.Outputs) + exec.PutArrayVars(e, o.Outputs) return sem.Acquire(ctx, int64(len(o.Outputs))) } type ECReconstruct struct { - EC cdssdk.ECRedundancy `json:"ec"` - Inputs []*ioswitch.StreamVar `json:"inputs"` - Outputs []*ioswitch.StreamVar `json:"outputs"` - InputBlockIndexes []int `json:"inputBlockIndexes"` + EC cdssdk.ECRedundancy `json:"ec"` + Inputs []*exec.StreamVar `json:"inputs"` + Outputs []*exec.StreamVar `json:"outputs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` } -func (o *ECReconstruct) Execute(ctx context.Context, sw *ioswitch.Switch) error { +func (o *ECReconstruct) Execute(ctx context.Context, e *exec.Executor) error { rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) if err != nil { return fmt.Errorf("new ec: %w", err) } - err = ioswitch.BindArrayVars(sw, ctx, o.Inputs) + err = exec.BindArrayVars(e, ctx, o.Inputs) if err != nil { return err } @@ -96,20 +104,20 @@ func (o *ECReconstruct) Execute(ctx context.Context, sw *ioswitch.Switch) error sem.Release(1) }) } - ioswitch.PutArrayVars(sw, o.Outputs) + exec.PutArrayVars(e, o.Outputs) return sem.Acquire(ctx, int64(len(o.Outputs))) } type ECMultiply struct { - Coef [][]byte `json:"coef"` - Inputs []*ioswitch.StreamVar `json:"inputs"` - Outputs []*ioswitch.StreamVar `json:"outputs"` - ChunkSize int `json:"chunkSize"` + Coef [][]byte `json:"coef"` + Inputs []*exec.StreamVar `json:"inputs"` + Outputs []*exec.StreamVar `json:"outputs"` + ChunkSize int `json:"chunkSize"` } -func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := ioswitch.BindArrayVars(sw, ctx, o.Inputs) +func (o *ECMultiply) Execute(ctx context.Context, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx, o.Inputs) if err != nil { return err } @@ -141,7 +149,7 @@ func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { } for { - err := sync2.ParallelDo(o.Inputs, func(s *ioswitch.StreamVar, i int) error { + err := sync2.ParallelDo(o.Inputs, func(s *exec.StreamVar, i int) error { _, err := io.ReadFull(s.Stream, inputChunks[i]) return err }) @@ -170,7 +178,7 @@ func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { } }() - ioswitch.PutArrayVars(sw, o.Outputs) + exec.PutArrayVars(e, o.Outputs) err = fut.Wait(ctx) if err != nil { for _, wr := range outputWrs { @@ -185,8 +193,46 @@ func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { return nil } -func init() { - OpUnion.AddT((*ECReconstructAny)(nil)) - OpUnion.AddT((*ECReconstruct)(nil)) - OpUnion.AddT((*ECMultiply)(nil)) +type MultiplyType struct { + EC cdssdk.ECRedundancy +} + +func (t *MultiplyType) InitNode(node *Node) {} + +func (t *MultiplyType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + var inputIdxs []int + var outputIdxs []int + for _, in := range op.InputStreams { + inputIdxs = append(inputIdxs, in.Props.StreamIndex) + } + for _, out := range op.OutputStreams { + outputIdxs = append(outputIdxs, out.Props.StreamIndex) + } + + rs, err := ec.NewRs(t.EC.K, t.EC.N) + coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs) + if err != nil { + return err + } + + addOpByEnv(&ECMultiply{ + Coef: coef, + Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *exec.StreamVar { return v.Props.Var.(*exec.StreamVar) }), + Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *exec.StreamVar { return v.Props.Var.(*exec.StreamVar) }), + ChunkSize: t.EC.ChunkSize, + }, op.Env, blder) + return nil +} + +func (t *MultiplyType) AddInput(node *Node, str *StreamVar) { + node.InputStreams = append(node.InputStreams, str) + str.To(node, len(node.InputStreams)-1) +} + +func (t *MultiplyType) NewOutput(node *Node, dataIndex int) *StreamVar { + return dag.NodeNewOutputStream(node, VarProps{StreamIndex: dataIndex}) +} + +func (t *MultiplyType) String(node *Node) string { + return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/file.go b/common/pkgs/ioswitch/ops/file.go index ceb007e..cbadf2d 100644 --- a/common/pkgs/ioswitch/ops/file.go +++ b/common/pkgs/ioswitch/ops/file.go @@ -8,17 +8,23 @@ import ( "path" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) +func init() { + OpUnion.AddT((*FileRead)(nil)) + OpUnion.AddT((*FileWrite)(nil)) +} + type FileWrite struct { - Input *ioswitch.StreamVar `json:"input"` - FilePath string `json:"filePath"` + Input *exec.StreamVar `json:"input"` + FilePath string `json:"filePath"` } -func (o *FileWrite) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *FileWrite) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -45,11 +51,11 @@ func (o *FileWrite) Execute(ctx context.Context, sw *ioswitch.Switch) error { } type FileRead struct { - Output *ioswitch.StreamVar `json:"output"` - FilePath string `json:"filePath"` + Output *exec.StreamVar `json:"output"` + FilePath string `json:"filePath"` } -func (o *FileRead) Execute(ctx context.Context, sw *ioswitch.Switch) error { +func (o *FileRead) Execute(ctx context.Context, e *exec.Executor) error { file, err := os.Open(o.FilePath) if err != nil { return fmt.Errorf("opening file: %w", err) @@ -59,13 +65,44 @@ func (o *FileRead) Execute(ctx context.Context, sw *ioswitch.Switch) error { o.Output.Stream = io2.AfterReadClosed(file, func(closer io.ReadCloser) { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) fut.Wait(ctx) return nil } -func init() { - OpUnion.AddT((*FileRead)(nil)) - OpUnion.AddT((*FileWrite)(nil)) +type FileReadType struct { + FilePath string +} + +func (t *FileReadType) InitNode(node *Node) { + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *FileReadType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&FileRead{ + Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar), + FilePath: t.FilePath, + }, op.Env, blder) + return nil +} + +func (t *FileReadType) String(node *Node) string { + return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) +} + +type FileWriteType struct { + FilePath string +} + +func (t *FileWriteType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *FileWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&FileWrite{ + Input: op.InputStreams[0].Props.Var.(*exec.StreamVar), + FilePath: t.FilePath, + }, op.Env, blder) + return nil } diff --git a/common/pkgs/ioswitch/ops/fromto.go b/common/pkgs/ioswitch/ops/fromto.go new file mode 100644 index 0000000..6bfe73d --- /dev/null +++ b/common/pkgs/ioswitch/ops/fromto.go @@ -0,0 +1,134 @@ +package ops + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +type From interface { + GetDataIndex() int +} + +type To interface { + // To所需要的文件流的范围。具体含义与DataIndex有关系: + // 如果DataIndex == -1,则表示在整个文件的范围。 + // 如果DataIndex >= 0,则表示在文件的某个分片的范围。 + GetRange() exec.Range + GetDataIndex() int +} + +type FromExecutor struct { + Handle *exec.DriverWriteStream + DataIndex int +} + +func NewFromExecutor(dataIndex int) (*FromExecutor, *exec.DriverWriteStream) { + handle := &exec.DriverWriteStream{ + RangeHint: &exec.Range{}, + } + return &FromExecutor{ + Handle: handle, + DataIndex: dataIndex, + }, handle +} + +func (f *FromExecutor) GetDataIndex() int { + return f.DataIndex +} + +type FromWorker struct { + FileHash string + Node *cdssdk.Node + DataIndex int +} + +func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromWorker { + return &FromWorker{ + FileHash: fileHash, + Node: node, + DataIndex: dataIndex, + } +} + +func (f *FromWorker) GetDataIndex() int { + return f.DataIndex +} + +type ToExecutor struct { + Handle *exec.DriverReadStream + DataIndex int + Range exec.Range +} + +func NewToExecutor(dataIndex int) (*ToExecutor, *exec.DriverReadStream) { + str := exec.DriverReadStream{} + return &ToExecutor{ + Handle: &str, + DataIndex: dataIndex, + }, &str +} + +func NewToExecutorWithRange(dataIndex int, rng exec.Range) (*ToExecutor, *exec.DriverReadStream) { + str := exec.DriverReadStream{} + return &ToExecutor{ + Handle: &str, + DataIndex: dataIndex, + Range: rng, + }, &str +} + +func (t *ToExecutor) GetDataIndex() int { + return t.DataIndex +} + +func (t *ToExecutor) GetRange() exec.Range { + return t.Range +} + +type ToNode struct { + Node cdssdk.Node + DataIndex int + Range exec.Range + FileHashStoreKey string +} + +func NewToNode(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToNode { + return &ToNode{ + Node: node, + DataIndex: dataIndex, + FileHashStoreKey: fileHashStoreKey, + } +} + +func NewToNodeWithRange(node cdssdk.Node, dataIndex int, fileHashStoreKey string, rng exec.Range) *ToNode { + return &ToNode{ + Node: node, + DataIndex: dataIndex, + FileHashStoreKey: fileHashStoreKey, + Range: rng, + } +} + +func (t *ToNode) GetDataIndex() int { + return t.DataIndex +} + +func (t *ToNode) GetRange() exec.Range { + return t.Range +} + +// type ToStorage struct { +// Storage cdssdk.Storage +// DataIndex int +// } + +// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage { +// return &ToStorage{ +// Storage: storage, +// DataIndex: dataIndex, +// } +// } + +// func (t *ToStorage) GetDataIndex() int { +// return t.DataIndex +// } diff --git a/common/pkgs/ioswitch/ops/grpc.go b/common/pkgs/ioswitch/ops/grpc.go index 4455486..7ad799d 100644 --- a/common/pkgs/ioswitch/ops/grpc.go +++ b/common/pkgs/ioswitch/ops/grpc.go @@ -6,21 +6,29 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) +func init() { + OpUnion.AddT((*SendStream)(nil)) + OpUnion.AddT((*GetStream)(nil)) + OpUnion.AddT((*SendVar)(nil)) + OpUnion.AddT((*GetVar)(nil)) +} + type SendStream struct { - Input *ioswitch.StreamVar `json:"input"` - Send *ioswitch.StreamVar `json:"send"` - Node cdssdk.Node `json:"node"` + Input *exec.StreamVar `json:"input"` + Send *exec.StreamVar `json:"send"` + Node cdssdk.Node `json:"node"` } -func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -35,7 +43,7 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger.Debugf("sending stream %v as %v to node %v", o.Input.ID, o.Send.ID, o.Node) // 发送后流的ID不同 - err = agtCli.SendStream(ctx, sw.Plan().ID, o.Send.ID, o.Input.Stream) + err = agtCli.SendStream(ctx, e.Plan().ID, o.Send.ID, o.Input.Stream) if err != nil { return fmt.Errorf("sending stream: %w", err) } @@ -44,13 +52,13 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } type GetStream struct { - Signal *ioswitch.SignalVar `json:"signal"` - Target *ioswitch.StreamVar `json:"target"` - Output *ioswitch.StreamVar `json:"output"` - Node cdssdk.Node `json:"node"` + Signal *exec.SignalVar `json:"signal"` + Target *exec.StreamVar `json:"target"` + Output *exec.StreamVar `json:"output"` + Node cdssdk.Node `json:"node"` } -func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { +func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error { agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node)) if err != nil { return fmt.Errorf("new agent rpc client: %w", err) @@ -59,7 +67,7 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger.Debugf("getting stream %v as %v from node %v", o.Target.ID, o.Output.ID, o.Node) - str, err := agtCli.GetStream(sw.Plan().ID, o.Target.ID, o.Signal) + str, err := agtCli.GetStream(e.Plan().ID, o.Target.ID, o.Signal) if err != nil { return fmt.Errorf("getting stream: %w", err) } @@ -69,19 +77,19 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) return fut.Wait(ctx) } type SendVar struct { - Input ioswitch.Var `json:"input"` - Send ioswitch.Var `json:"send"` - Node cdssdk.Node `json:"node"` + Input exec.Var `json:"input"` + Send exec.Var `json:"send"` + Node cdssdk.Node `json:"node"` } -func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -94,8 +102,8 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger.Debugf("sending var %v as %v to node %v", o.Input.GetID(), o.Send.GetID(), o.Node) - ioswitch.AssignVar(o.Input, o.Send) - err = agtCli.SendVar(ctx, sw.Plan().ID, o.Send) + exec.AssignVar(o.Input, o.Send) + err = agtCli.SendVar(ctx, e.Plan().ID, o.Send) if err != nil { return fmt.Errorf("sending var: %w", err) } @@ -104,13 +112,13 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } type GetVar struct { - Signal *ioswitch.SignalVar `json:"signal"` - Target ioswitch.Var `json:"target"` - Output ioswitch.Var `json:"output"` - Node cdssdk.Node `json:"node"` + Signal *exec.SignalVar `json:"signal"` + Target exec.Var `json:"target"` + Output exec.Var `json:"output"` + Node cdssdk.Node `json:"node"` } -func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { +func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error { agtCli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&o.Node)) if err != nil { return fmt.Errorf("new agent rpc client: %w", err) @@ -119,19 +127,104 @@ func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger.Debugf("getting var %v as %v from node %v", o.Target.GetID(), o.Output.GetID(), o.Node) - v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Target, o.Signal) + v2, err := agtCli.GetVar(ctx, e.Plan().ID, o.Target, o.Signal) if err != nil { return fmt.Errorf("getting var: %w", err) } - ioswitch.AssignVar(v2, o.Output) - sw.PutVars(o.Output) + exec.AssignVar(v2, o.Output) + e.PutVars(o.Output) return nil } -func init() { - OpUnion.AddT((*SendStream)(nil)) - OpUnion.AddT((*GetStream)(nil)) - OpUnion.AddT((*SendVar)(nil)) - OpUnion.AddT((*GetVar)(nil)) +type SendStreamType struct { +} + +func (t *SendStreamType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *SendStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + toAgt := op.OutputStreams[0].Toes[0].Node.Env.Worker.(*AgentWorker) + addOpByEnv(&SendStream{ + Input: op.InputStreams[0].Props.Var.(*exec.StreamVar), + Send: op.OutputStreams[0].Props.Var.(*exec.StreamVar), + Node: toAgt.Node, + }, op.Env, blder) + return nil +} + +func (t *SendStreamType) String(node *Node) string { + return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type SendVarType struct { +} + +func (t *SendVarType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) +} + +func (t *SendVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + toAgt := op.OutputValues[0].Toes[0].Node.Env.Worker.(*AgentWorker) + addOpByEnv(&SendVar{ + Input: op.InputValues[0].Props.Var, + Send: op.OutputValues[0].Props.Var, + Node: toAgt.Node, + }, op.Env, blder) + return nil +} + +func (t *SendVarType) String(node *Node) string { + return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type GetStreamType struct { +} + +func (t *GetStreamType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *GetStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + fromAgt := op.InputStreams[0].From.Node.Env.Worker.(*AgentWorker) + addOpByEnv(&GetStream{ + Signal: op.OutputValues[0].Props.Var.(*exec.SignalVar), + Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar), + Target: op.InputStreams[0].Props.Var.(*exec.StreamVar), + Node: fromAgt.Node, + }, op.Env, blder) + return nil +} + +func (t *GetStreamType) String(node *Node) string { + return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type GetVaType struct { +} + +func (t *GetVaType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) + dag.NodeNewOutputValue(node, VarProps{}) +} + +func (t *GetVaType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + fromAgt := op.InputValues[0].From.Node.Env.Worker.(*AgentWorker) + addOpByEnv(&GetVar{ + Signal: op.OutputValues[0].Props.Var.(*exec.SignalVar), + Output: op.OutputValues[1].Props.Var, + Target: op.InputValues[0].Props.Var, + Node: fromAgt.Node, + }, op.Env, blder) + return nil +} + +func (t *GetVaType) String(node *Node) string { + return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/ipfs.go b/common/pkgs/ioswitch/ops/ipfs.go index 2a485bb..6656881 100644 --- a/common/pkgs/ioswitch/ops/ipfs.go +++ b/common/pkgs/ioswitch/ops/ipfs.go @@ -6,20 +6,26 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) +func init() { + OpUnion.AddT((*IPFSRead)(nil)) + OpUnion.AddT((*IPFSWrite)(nil)) +} + type IPFSRead struct { - Output *ioswitch.StreamVar `json:"output"` - FileHash string `json:"fileHash"` - Option ipfs.ReadOption `json:"option"` + Output *exec.StreamVar `json:"output"` + FileHash string `json:"fileHash"` + Option ipfs.ReadOption `json:"option"` } -func (o *IPFSRead) Execute(ctx context.Context, sw *ioswitch.Switch) error { +func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error { logger. WithField("FileHash", o.FileHash). Debugf("ipfs read op") @@ -41,17 +47,17 @@ func (o *IPFSRead) Execute(ctx context.Context, sw *ioswitch.Switch) error { o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) return fut.Wait(ctx) } type IPFSWrite struct { - Input *ioswitch.StreamVar `json:"input"` - FileHash *ioswitch.StringVar `json:"fileHash"` + Input *exec.StreamVar `json:"input"` + FileHash *exec.StringVar `json:"fileHash"` } -func (o *IPFSWrite) Execute(ctx context.Context, sw *ioswitch.Switch) error { +func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error { logger. WithField("Input", o.Input.ID). WithField("FileHashVar", o.FileHash.ID). @@ -63,7 +69,7 @@ func (o *IPFSWrite) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.IPFSPool.Release(ipfsCli) - err = sw.BindVars(ctx, o.Input) + err = e.BindVars(ctx, o.Input) if err != nil { return err } @@ -74,12 +80,51 @@ func (o *IPFSWrite) Execute(ctx context.Context, sw *ioswitch.Switch) error { return fmt.Errorf("creating ipfs file: %w", err) } - sw.PutVars(o.FileHash) + e.PutVars(o.FileHash) return nil } -func init() { - OpUnion.AddT((*IPFSRead)(nil)) - OpUnion.AddT((*IPFSWrite)(nil)) +type IPFSReadType struct { + FileHash string + Option ipfs.ReadOption +} + +func (t *IPFSReadType) InitNode(node *Node) { + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *IPFSReadType) GenerateOp(node *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&IPFSRead{ + Output: node.OutputStreams[0].Props.Var.(*exec.StreamVar), + FileHash: t.FileHash, + Option: t.Option, + }, node.Env, blder) + return nil +} + +func (t *IPFSReadType) String(node *Node) string { + return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +} + +type IPFSWriteType struct { + FileHashStoreKey string + Range exec.Range +} + +func (t *IPFSWriteType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputValue(node, VarProps{}) +} + +func (t *IPFSWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&IPFSWrite{ + Input: op.InputStreams[0].Props.Var.(*exec.StreamVar), + FileHash: op.OutputValues[0].Props.Var.(*exec.StringVar), + }, op.Env, blder) + return nil +} + +func (t *IPFSWriteType) String(node *Node) string { + return fmt.Sprintf("IPFSWrite[%s,%v+%v](%v>)", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/join.go b/common/pkgs/ioswitch/ops/join.go index 5fab8e8..fe7300c 100644 --- a/common/pkgs/ioswitch/ops/join.go +++ b/common/pkgs/ioswitch/ops/join.go @@ -5,18 +5,18 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Join struct { - Inputs []*ioswitch.StreamVar `json:"inputs"` - Output *ioswitch.StreamVar `json:"output"` - Length int64 `json:"length"` + Inputs []*exec.StreamVar `json:"inputs"` + Output *exec.StreamVar `json:"output"` + Length int64 `json:"length"` } -func (o *Join) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := ioswitch.BindArrayVars(sw, ctx, o.Inputs) +func (o *Join) Execute(ctx context.Context, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx, o.Inputs) if err != nil { return err } @@ -35,7 +35,7 @@ func (o *Join) Execute(ctx context.Context, sw *ioswitch.Switch) error { o.Output.Stream = io2.AfterReadClosedOnce(io2.Length(io2.Join(strReaders), o.Length), func(closer io.ReadCloser) { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) return fut.Wait(ctx) } diff --git a/common/pkgs/ioswitch/ops/length.go b/common/pkgs/ioswitch/ops/length.go index 95203c5..b503b4e 100644 --- a/common/pkgs/ioswitch/ops/length.go +++ b/common/pkgs/ioswitch/ops/length.go @@ -5,18 +5,18 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Length struct { - Input *ioswitch.StreamVar `json:"input"` - Output *ioswitch.StreamVar `json:"output"` - Length int64 `json:"length"` + Input *exec.StreamVar `json:"input"` + Output *exec.StreamVar `json:"output"` + Length int64 `json:"length"` } -func (o *Length) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *Length) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -26,7 +26,7 @@ func (o *Length) Execute(ctx context.Context, sw *ioswitch.Switch) error { o.Output.Stream = io2.AfterReadClosedOnce(io2.Length(o.Input.Stream, o.Length), func(closer io.ReadCloser) { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) return fut.Wait(ctx) } diff --git a/common/pkgs/ioswitch/ops/ops.go b/common/pkgs/ioswitch/ops/ops.go index ee903c6..252ad8c 100644 --- a/common/pkgs/ioswitch/ops/ops.go +++ b/common/pkgs/ioswitch/ops/ops.go @@ -1,9 +1,134 @@ package ops import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/types" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/serder" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) -var OpUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op]())) +var OpUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.Op]())) + +type AgentWorker struct { + Node cdssdk.Node +} + +func (w *AgentWorker) GetAddress() string { + // TODO 选择地址 + return fmt.Sprintf("%v:%v", w.Node.ExternalIP, w.Node.ExternalGRPCPort) +} + +func (w *AgentWorker) Equals(worker dag.WorkerInfo) bool { + aw, ok := worker.(*AgentWorker) + if !ok { + return false + } + + return w.Node.NodeID == aw.Node.NodeID +} + +type NodeProps struct { + From From + To To +} + +type ValueVarType int + +const ( + StringValueVar ValueVarType = iota + SignalValueVar +) + +type VarProps struct { + StreamIndex int // 流的编号,只在StreamVar上有意义 + ValueType ValueVarType // 值类型,只在ValueVar上有意义 + Var exec.Var // 生成Plan的时候创建的对应的Var +} + +type Graph = dag.Graph[NodeProps, VarProps] + +type Node = dag.Node[NodeProps, VarProps] + +type StreamVar = dag.StreamVar[NodeProps, VarProps] + +type ValueVar = dag.ValueVar[NodeProps, VarProps] + +func addOpByEnv(op exec.Op, env dag.NodeEnv, blder *exec.PlanBuilder) { + switch env.Type { + case dag.EnvWorker: + blder.AtAgent(env.Worker.(*AgentWorker).Node).AddOp(op) + case dag.EnvExecutor: + blder.AtExecutor().AddOp(op) + } +} + +func formatStreamIO(node *Node) string { + is := "" + for i, in := range node.InputStreams { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputStreams { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("S{%s>%s}", is, os) +} + +func formatValueIO(node *Node) string { + is := "" + for i, in := range node.InputValues { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputValues { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("V{%s>%s}", is, os) +} diff --git a/common/pkgs/ioswitch/ops/range.go b/common/pkgs/ioswitch/ops/range.go index 4c9634b..78f3285 100644 --- a/common/pkgs/ioswitch/ops/range.go +++ b/common/pkgs/ioswitch/ops/range.go @@ -2,23 +2,29 @@ package ops import ( "context" + "fmt" "io" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) +func init() { + OpUnion.AddT((*Range)(nil)) +} + type Range struct { - Input *ioswitch.StreamVar `json:"input"` - Output *ioswitch.StreamVar `json:"output"` - Offset int64 `json:"offset"` - Length *int64 `json:"length"` + Input *exec.StreamVar `json:"input"` + Output *exec.StreamVar `json:"output"` + Offset int64 `json:"offset"` + Length *int64 `json:"length"` } -func (o *Range) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Input) +func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) if err != nil { return err } @@ -47,7 +53,7 @@ func (o *Range) Execute(ctx context.Context, sw *ioswitch.Switch) error { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) return fut.Wait(ctx) } @@ -55,7 +61,7 @@ func (o *Range) Execute(ctx context.Context, sw *ioswitch.Switch) error { fut.SetVoid() }) - sw.PutVars(o.Output) + e.PutVars(o.Output) err = fut.Wait(ctx) if err != nil { return err @@ -65,6 +71,25 @@ func (o *Range) Execute(ctx context.Context, sw *ioswitch.Switch) error { return nil } -func init() { - OpUnion.AddT((*Range)(nil)) +type RangeType struct { + Range exec.Range +} + +func (t *RangeType) InitNode(node *Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputStream(node, VarProps{}) +} + +func (t *RangeType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + addOpByEnv(&Range{ + Input: op.InputStreams[0].Props.Var.(*exec.StreamVar), + Output: op.OutputStreams[0].Props.Var.(*exec.StreamVar), + Offset: t.Range.Offset, + Length: t.Range.Length, + }, op.Env, blder) + return nil +} + +func (t *RangeType) String(node *Node) string { + return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/store.go b/common/pkgs/ioswitch/ops/store.go index 6e07b62..41d39b4 100644 --- a/common/pkgs/ioswitch/ops/store.go +++ b/common/pkgs/ioswitch/ops/store.go @@ -2,29 +2,52 @@ package ops import ( "context" + "fmt" "sync" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" ) type Store struct { - Var ioswitch.Var + Var exec.Var Key string Store *sync.Map } -func (o *Store) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Var) +func (o *Store) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Var) if err != nil { return err } switch v := o.Var.(type) { - case *ioswitch.IntVar: + case *exec.IntVar: o.Store.Store(o.Key, v.Value) - case *ioswitch.StringVar: + case *exec.StringVar: o.Store.Store(o.Key, v.Value) } return nil } + +type StoreType struct { + StoreKey string +} + +func (t *StoreType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *StoreType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + blder.AtExecutor().AddOp(&Store{ + Var: op.InputValues[0].Props.Var, + Key: t.StoreKey, + Store: blder.DriverPlan.StoreMap, + }) + return nil +} + +func (t *StoreType) String(node *Node) string { + return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) +} diff --git a/common/pkgs/ioswitch/ops/sync.go b/common/pkgs/ioswitch/ops/sync.go index a932a46..e84e064 100644 --- a/common/pkgs/ioswitch/ops/sync.go +++ b/common/pkgs/ioswitch/ops/sync.go @@ -6,31 +6,40 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" ) +func init() { + OpUnion.AddT((*OnStreamBegin)(nil)) + OpUnion.AddT((*OnStreamEnd)(nil)) + OpUnion.AddT((*HoldUntil)(nil)) + OpUnion.AddT((*HangUntil)(nil)) + OpUnion.AddT((*Broadcast)(nil)) +} + type OnStreamBegin struct { - Raw *ioswitch.StreamVar `json:"raw"` - New *ioswitch.StreamVar `json:"new"` - Signal *ioswitch.SignalVar `json:"signal"` + Raw *exec.StreamVar `json:"raw"` + New *exec.StreamVar `json:"new"` + Signal *exec.SignalVar `json:"signal"` } -func (o *OnStreamBegin) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Raw) +func (o *OnStreamBegin) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Raw) if err != nil { return err } o.New.Stream = o.Raw.Stream - sw.PutVars(o.New, o.Signal) + e.PutVars(o.New, o.Signal) return nil } type OnStreamEnd struct { - Raw *ioswitch.StreamVar `json:"raw"` - New *ioswitch.StreamVar `json:"new"` - Signal *ioswitch.SignalVar `json:"signal"` + Raw *exec.StreamVar `json:"raw"` + New *exec.StreamVar `json:"new"` + Signal *exec.SignalVar `json:"signal"` } type onStreamEnd struct { @@ -53,8 +62,8 @@ func (o *onStreamEnd) Close() error { return o.inner.Close() } -func (o *OnStreamEnd) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, o.Raw) +func (o *OnStreamEnd) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Raw) if err != nil { return err } @@ -65,78 +74,100 @@ func (o *OnStreamEnd) Execute(ctx context.Context, sw *ioswitch.Switch) error { inner: o.Raw.Stream, callback: cb, } - sw.PutVars(o.New) + e.PutVars(o.New) err = cb.Wait(ctx) if err != nil { return err } - sw.PutVars(o.Signal) + e.PutVars(o.Signal) return nil } type HoldUntil struct { - Waits []*ioswitch.SignalVar `json:"waits"` - Holds []ioswitch.Var `json:"holds"` - Emits []ioswitch.Var `json:"emits"` + Waits []*exec.SignalVar `json:"waits"` + Holds []exec.Var `json:"holds"` + Emits []exec.Var `json:"emits"` } -func (w *HoldUntil) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, w.Holds...) +func (w *HoldUntil) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, w.Holds...) if err != nil { return err } - err = ioswitch.BindArrayVars(sw, ctx, w.Waits) + err = exec.BindArrayVars(e, ctx, w.Waits) if err != nil { return err } for i := 0; i < len(w.Holds); i++ { - err := ioswitch.AssignVar(w.Holds[i], w.Emits[i]) + err := exec.AssignVar(w.Holds[i], w.Emits[i]) if err != nil { return err } } - sw.PutVars(w.Emits...) + e.PutVars(w.Emits...) return nil } type HangUntil struct { - Waits []*ioswitch.SignalVar `json:"waits"` - Op ioswitch.Op `json:"op"` + Waits []*exec.SignalVar `json:"waits"` + Op exec.Op `json:"op"` } -func (h *HangUntil) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := ioswitch.BindArrayVars(sw, ctx, h.Waits) +func (h *HangUntil) Execute(ctx context.Context, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx, h.Waits) if err != nil { return err } - return h.Op.Execute(ctx, sw) + return h.Op.Execute(ctx, e) } type Broadcast struct { - Source *ioswitch.SignalVar `json:"source"` - Targets []*ioswitch.SignalVar `json:"targets"` + Source *exec.SignalVar `json:"source"` + Targets []*exec.SignalVar `json:"targets"` } -func (b *Broadcast) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := sw.BindVars(ctx, b.Source) +func (b *Broadcast) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, b.Source) if err != nil { return err } - ioswitch.PutArrayVars(sw, b.Targets) + exec.PutArrayVars(e, b.Targets) return nil } -func init() { - OpUnion.AddT((*OnStreamBegin)(nil)) - OpUnion.AddT((*OnStreamEnd)(nil)) - OpUnion.AddT((*HoldUntil)(nil)) - OpUnion.AddT((*HangUntil)(nil)) - OpUnion.AddT((*Broadcast)(nil)) +type HoldUntilType struct { +} + +func (t *HoldUntilType) InitNode(node *Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *HoldUntilType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { + o := &HoldUntil{ + Waits: []*exec.SignalVar{op.InputValues[0].Props.Var.(*exec.SignalVar)}, + } + + for i := 0; i < len(op.OutputValues); i++ { + o.Holds = append(o.Holds, op.InputValues[i+1].Props.Var) + o.Emits = append(o.Emits, op.OutputValues[i].Props.Var) + } + + for i := 0; i < len(op.OutputStreams); i++ { + o.Holds = append(o.Holds, op.InputStreams[i].Props.Var) + o.Emits = append(o.Emits, op.OutputStreams[i].Props.Var) + } + + addOpByEnv(o, op.Env, blder) + return nil +} + +func (t *HoldUntilType) String(node *Node) string { + return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) } diff --git a/common/pkgs/ioswitch/ops/var.go b/common/pkgs/ioswitch/ops/var.go index 91f0456..e4513e4 100644 --- a/common/pkgs/ioswitch/ops/var.go +++ b/common/pkgs/ioswitch/ops/var.go @@ -3,15 +3,15 @@ package ops import ( "context" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" ) type ConstVar struct { - Var *ioswitch.StringVar `json:"var"` + Var *exec.StringVar `json:"var"` } -func (o *ConstVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { - sw.PutVars(o.Var) +func (o *ConstVar) Execute(ctx context.Context, e *exec.Executor) error { + e.PutVars(o.Var) return nil } diff --git a/common/pkgs/ioswitch/plans/fromto.go b/common/pkgs/ioswitch/plans/fromto.go deleted file mode 100644 index a7ff81d..0000000 --- a/common/pkgs/ioswitch/plans/fromto.go +++ /dev/null @@ -1,223 +0,0 @@ -package plans - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" -) - -type FromTo struct { - Froms []From - Toes []To -} - -func NewFromTo() FromTo { - return FromTo{} -} - -func (ft *FromTo) AddFrom(from From) *FromTo { - ft.Froms = append(ft.Froms, from) - return ft -} - -func (ft *FromTo) AddTo(to To) *FromTo { - ft.Toes = append(ft.Toes, to) - return ft -} - -type FromTos []FromTo - -type From interface { - GetDataIndex() int -} - -type To interface { - // To所需要的文件流的范围。具体含义与DataIndex有关系: - // 如果DataIndex == -1,则表示在整个文件的范围。 - // 如果DataIndex >= 0,则表示在文件的某个分片的范围。 - GetRange() Range - GetDataIndex() int -} - -type Range struct { - Offset int64 - Length *int64 -} - -func (r *Range) Extend(other Range) { - newOffset := math2.Min(r.Offset, other.Offset) - - if r.Length == nil { - r.Offset = newOffset - return - } - - if other.Length == nil { - r.Offset = newOffset - r.Length = nil - return - } - - otherEnd := other.Offset + *other.Length - rEnd := r.Offset + *r.Length - - newEnd := math2.Max(otherEnd, rEnd) - r.Offset = newOffset - *r.Length = newEnd - newOffset -} - -func (r *Range) ExtendStart(start int64) { - r.Offset = math2.Min(r.Offset, start) -} - -func (r *Range) ExtendEnd(end int64) { - if r.Length == nil { - return - } - - rEnd := r.Offset + *r.Length - newLen := math2.Max(end, rEnd) - r.Offset - r.Length = &newLen -} - -func (r *Range) Fix(maxLength int64) { - if r.Length != nil { - return - } - - len := maxLength - r.Offset - r.Length = &len -} - -func (r *Range) ToStartEnd(maxLen int64) (start int64, end int64) { - if r.Length == nil { - return r.Offset, maxLen - } - - end = r.Offset + *r.Length - return r.Offset, end -} - -func (r *Range) ClampLength(maxLen int64) { - if r.Length == nil { - return - } - - *r.Length = math2.Min(*r.Length, maxLen-r.Offset) -} - -type FromExecutor struct { - Handle *ExecutorWriteStream - DataIndex int -} - -func NewFromExecutor(dataIndex int) (*FromExecutor, *ExecutorWriteStream) { - handle := &ExecutorWriteStream{ - RangeHint: &Range{}, - } - return &FromExecutor{ - Handle: handle, - DataIndex: dataIndex, - }, handle -} - -func (f *FromExecutor) GetDataIndex() int { - return f.DataIndex -} - -type FromWorker struct { - FileHash string - Node *cdssdk.Node - DataIndex int -} - -func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromWorker { - return &FromWorker{ - FileHash: fileHash, - Node: node, - DataIndex: dataIndex, - } -} - -func (f *FromWorker) GetDataIndex() int { - return f.DataIndex -} - -type ToExecutor struct { - Handle *exec.ExecutorReadStream - DataIndex int - Range Range -} - -func NewToExecutor(dataIndex int) (*ToExecutor, *exec.ExecutorReadStream) { - str := exec.ExecutorReadStream{} - return &ToExecutor{ - Handle: &str, - DataIndex: dataIndex, - }, &str -} - -func NewToExecutorWithRange(dataIndex int, rng Range) (*ToExecutor, *exec.ExecutorReadStream) { - str := exec.ExecutorReadStream{} - return &ToExecutor{ - Handle: &str, - DataIndex: dataIndex, - Range: rng, - }, &str -} - -func (t *ToExecutor) GetDataIndex() int { - return t.DataIndex -} - -func (t *ToExecutor) GetRange() Range { - return t.Range -} - -type ToNode struct { - Node cdssdk.Node - DataIndex int - Range Range - FileHashStoreKey string -} - -func NewToNode(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToNode { - return &ToNode{ - Node: node, - DataIndex: dataIndex, - FileHashStoreKey: fileHashStoreKey, - } -} - -func NewToNodeWithRange(node cdssdk.Node, dataIndex int, fileHashStoreKey string, rng Range) *ToNode { - return &ToNode{ - Node: node, - DataIndex: dataIndex, - FileHashStoreKey: fileHashStoreKey, - Range: rng, - } -} - -func (t *ToNode) GetDataIndex() int { - return t.DataIndex -} - -func (t *ToNode) GetRange() Range { - return t.Range -} - -// type ToStorage struct { -// Storage cdssdk.Storage -// DataIndex int -// } - -// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage { -// return &ToStorage{ -// Storage: storage, -// DataIndex: dataIndex, -// } -// } - -// func (t *ToStorage) GetDataIndex() int { -// return t.DataIndex -// } diff --git a/common/pkgs/ioswitch/plans/ops.go b/common/pkgs/ioswitch/plans/ops.go deleted file mode 100644 index 410698c..0000000 --- a/common/pkgs/ioswitch/plans/ops.go +++ /dev/null @@ -1,534 +0,0 @@ -package plans - -import ( - "fmt" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/ec" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" -) - -type IPFSReadType struct { - FileHash string - Option ipfs.ReadOption -} - -func (t *IPFSReadType) InitNode(node *Node) { - dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *IPFSReadType) GenerateOp(node *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.IPFSRead{ - Output: node.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), - FileHash: t.FileHash, - Option: t.Option, - }, node.Env, blder) - return nil -} - -func (t *IPFSReadType) String(node *Node) string { - return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) -} - -type IPFSWriteType struct { - FileHashStoreKey string - Range Range -} - -func (t *IPFSWriteType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, VarProps{}) -} - -func (t *IPFSWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.IPFSWrite{ - Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - FileHash: op.OutputValues[0].Props.Var.(*ioswitch.StringVar), - }, op.Env, blder) - return nil -} - -func (t *IPFSWriteType) String(node *Node) string { - return fmt.Sprintf("IPFSWrite[%s,%v+%v](%v>)", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} - -type ChunkedSplitType struct { - OutputCount int - ChunkSize int -} - -func (t *ChunkedSplitType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) - for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, VarProps{}) - } -} - -func (t *ChunkedSplitType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.ChunkedSplit{ - Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { - return v.Props.Var.(*ioswitch.StreamVar) - }), - ChunkSize: t.ChunkSize, - PaddingZeros: true, - }, op.Env, blder) - return nil -} - -func (t *ChunkedSplitType) String(node *Node) string { - return fmt.Sprintf("ChunkedSplit[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) -} - -type ChunkedJoinType struct { - InputCount int - ChunkSize int -} - -func (t *ChunkedJoinType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, t.InputCount) - dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *ChunkedJoinType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.ChunkedJoin{ - Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { - return v.Props.Var.(*ioswitch.StreamVar) - }), - Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), - ChunkSize: t.ChunkSize, - }, op.Env, blder) - return nil -} - -func (t *ChunkedJoinType) String(node *Node) string { - return fmt.Sprintf("ChunkedJoin[%v]", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) -} - -type CloneStreamType struct{} - -func (t *CloneStreamType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) -} - -func (t *CloneStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.CloneStream{ - Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { - return v.Props.Var.(*ioswitch.StreamVar) - }), - }, op.Env, blder) - return nil -} - -func (t *CloneStreamType) NewOutput(node *Node) *StreamVar { - return dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *CloneStreamType) String(node *Node) string { - return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type CloneVarType struct{} - -func (t *CloneVarType) InitNode(node *Node) { - dag.NodeDeclareInputValue(node, 1) -} - -func (t *CloneVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.CloneVar{ - Raw: op.InputValues[0].Props.Var, - Cloneds: lo.Map(op.OutputValues, func(v *ValueVar, idx int) ioswitch.Var { - return v.Props.Var - }), - }, op.Env, blder) - return nil -} - -func (t *CloneVarType) NewOutput(node *Node) *ValueVar { - return dag.NodeNewOutputValue(node, VarProps{}) -} - -func (t *CloneVarType) String(node *Node) string { - return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type MultiplyType struct { - EC cdssdk.ECRedundancy -} - -func (t *MultiplyType) InitNode(node *Node) {} - -func (t *MultiplyType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - var inputIdxs []int - var outputIdxs []int - for _, in := range op.InputStreams { - inputIdxs = append(inputIdxs, in.Props.StreamIndex) - } - for _, out := range op.OutputStreams { - outputIdxs = append(outputIdxs, out.Props.StreamIndex) - } - - rs, err := ec.NewRs(t.EC.K, t.EC.N) - coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs) - if err != nil { - return err - } - - addOpByEnv(&ops.ECMultiply{ - Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Props.Var.(*ioswitch.StreamVar) }), - Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Props.Var.(*ioswitch.StreamVar) }), - ChunkSize: t.EC.ChunkSize, - }, op.Env, blder) - return nil -} - -func (t *MultiplyType) AddInput(node *Node, str *StreamVar) { - node.InputStreams = append(node.InputStreams, str) - str.To(node, len(node.InputStreams)-1) -} - -func (t *MultiplyType) NewOutput(node *Node, dataIndex int) *StreamVar { - return dag.NodeNewOutputStream(node, VarProps{StreamIndex: dataIndex}) -} - -func (t *MultiplyType) String(node *Node) string { - return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type FileReadType struct { - FilePath string -} - -func (t *FileReadType) InitNode(node *Node) { - dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *FileReadType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.FileRead{ - Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), - FilePath: t.FilePath, - }, op.Env, blder) - return nil -} - -func (t *FileReadType) String(node *Node) string { - return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) -} - -type FileWriteType struct { - FilePath string -} - -func (t *FileWriteType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) -} - -func (t *FileWriteType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.FileWrite{ - Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - FilePath: t.FilePath, - }, op.Env, blder) - return nil -} - -type FromExecutorType struct { - Handle *exec.ExecutorWriteStream -} - -func (t *FromExecutorType) InitNode(node *Node) { - dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *FromExecutorType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - t.Handle.Var = op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar) - return nil -} - -func (t *FromExecutorType) String(node *Node) string { - return fmt.Sprintf("FromExecutor[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type ToExecutorType struct { - Handle *exec.ExecutorReadStream - Range Range -} - -func (t *ToExecutorType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) -} - -func (t *ToExecutorType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - t.Handle.Var = op.InputStreams[0].Props.Var.(*ioswitch.StreamVar) - return nil -} - -func (t *ToExecutorType) String(node *Node) string { - return fmt.Sprintf("ToExecutor[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} - -type StoreType struct { - StoreKey string -} - -func (t *StoreType) InitNode(node *Node) { - dag.NodeDeclareInputValue(node, 1) -} - -func (t *StoreType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - blder.AtExecutor().AddOp(&ops.Store{ - Var: op.InputValues[0].Props.Var, - Key: t.StoreKey, - Store: blder.ExecutorPlan.StoreMap, - }) - return nil -} - -func (t *StoreType) String(node *Node) string { - return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) -} - -type DropType struct{} - -func (t *DropType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) -} - -func (t *DropType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.DropStream{ - Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - }, op.Env, blder) - return nil -} - -func (t *DropType) String(node *Node) string { - return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type SendStreamType struct { -} - -func (t *SendStreamType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *SendStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - toAgt := op.OutputStreams[0].Toes[0].Node.Env.Worker.(*AgentWorker) - addOpByEnv(&ops.SendStream{ - Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - Send: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), - Node: toAgt.Node, - }, op.Env, blder) - return nil -} - -func (t *SendStreamType) String(node *Node) string { - return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type SendVarType struct { -} - -func (t *SendVarType) InitNode(node *Node) { - dag.NodeDeclareInputValue(node, 1) - dag.NodeNewOutputValue(node, VarProps{}) -} - -func (t *SendVarType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - toAgt := op.OutputValues[0].Toes[0].Node.Env.Worker.(*AgentWorker) - addOpByEnv(&ops.SendVar{ - Input: op.InputValues[0].Props.Var, - Send: op.OutputValues[0].Props.Var, - Node: toAgt.Node, - }, op.Env, blder) - return nil -} - -func (t *SendVarType) String(node *Node) string { - return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type GetStreamType struct { -} - -func (t *GetStreamType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, VarProps{}) - dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *GetStreamType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - fromAgt := op.InputStreams[0].From.Node.Env.Worker.(*AgentWorker) - addOpByEnv(&ops.GetStream{ - Signal: op.OutputValues[0].Props.Var.(*ioswitch.SignalVar), - Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), - Target: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - Node: fromAgt.Node, - }, op.Env, blder) - return nil -} - -func (t *GetStreamType) String(node *Node) string { - return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type GetVaType struct { -} - -func (t *GetVaType) InitNode(node *Node) { - dag.NodeDeclareInputValue(node, 1) - dag.NodeNewOutputValue(node, VarProps{}) - dag.NodeNewOutputValue(node, VarProps{}) -} - -func (t *GetVaType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - fromAgt := op.InputValues[0].From.Node.Env.Worker.(*AgentWorker) - addOpByEnv(&ops.GetVar{ - Signal: op.OutputValues[0].Props.Var.(*ioswitch.SignalVar), - Output: op.OutputValues[1].Props.Var, - Target: op.InputValues[0].Props.Var, - Node: fromAgt.Node, - }, op.Env, blder) - return nil -} - -func (t *GetVaType) String(node *Node) string { - return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -type RangeType struct { - Range Range -} - -func (t *RangeType) InitNode(node *Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, VarProps{}) -} - -func (t *RangeType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - addOpByEnv(&ops.Range{ - Input: op.InputStreams[0].Props.Var.(*ioswitch.StreamVar), - Output: op.OutputStreams[0].Props.Var.(*ioswitch.StreamVar), - Offset: t.Range.Offset, - Length: t.Range.Length, - }, op.Env, blder) - return nil -} - -func (t *RangeType) String(node *Node) string { - return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} - -type HoldUntilType struct { -} - -func (t *HoldUntilType) InitNode(node *Node) { - dag.NodeDeclareInputValue(node, 1) -} - -func (t *HoldUntilType) GenerateOp(op *Node, blder *exec.PlanBuilder) error { - o := &ops.HoldUntil{ - Waits: []*ioswitch.SignalVar{op.InputValues[0].Props.Var.(*ioswitch.SignalVar)}, - } - - for i := 0; i < len(op.OutputValues); i++ { - o.Holds = append(o.Holds, op.InputValues[i+1].Props.Var) - o.Emits = append(o.Emits, op.OutputValues[i].Props.Var) - } - - for i := 0; i < len(op.OutputStreams); i++ { - o.Holds = append(o.Holds, op.InputStreams[i].Props.Var) - o.Emits = append(o.Emits, op.OutputStreams[i].Props.Var) - } - - addOpByEnv(o, op.Env, blder) - return nil -} - -func (t *HoldUntilType) String(node *Node) string { - return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) -} - -func addOpByEnv(op ioswitch.Op, env dag.NodeEnv, blder *exec.PlanBuilder) { - switch env.Type { - case dag.EnvWorker: - blder.AtAgent(env.Worker.(*AgentWorker).Node).AddOp(op) - case dag.EnvExecutor: - blder.AtExecutor().AddOp(op) - } -} - -func formatStreamIO(node *Node) string { - is := "" - for i, in := range node.InputStreams { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputStreams { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("S{%s>%s}", is, os) -} - -func formatValueIO(node *Node) string { - is := "" - for i, in := range node.InputValues { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputValues { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("V{%s>%s}", is, os) -} diff --git a/common/pkgs/ioswitch/plans/parser.go b/common/pkgs/ioswitch/plans/parser.go index e46798b..d410ad4 100644 --- a/common/pkgs/ioswitch/plans/parser.go +++ b/common/pkgs/ioswitch/plans/parser.go @@ -4,63 +4,16 @@ import ( "fmt" "math" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/parser" "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" ) -type NodeProps struct { - From From - To To -} - -type ValueVarType int - -const ( - StringValueVar ValueVarType = iota - SignalValueVar -) - -type VarProps struct { - StreamIndex int // 流的编号,只在StreamVar上有意义 - ValueType ValueVarType // 值类型,只在ValueVar上有意义 - Var ioswitch.Var // 生成Plan的时候创建的对应的Var -} - -type Graph = dag.Graph[NodeProps, VarProps] - -type Node = dag.Node[NodeProps, VarProps] - -type StreamVar = dag.StreamVar[NodeProps, VarProps] - -type ValueVar = dag.ValueVar[NodeProps, VarProps] - -type AgentWorker struct { - Node cdssdk.Node -} - -func (w *AgentWorker) GetAddress() string { - // TODO 选择地址 - return fmt.Sprintf("%v:%v", w.Node.ExternalIP, w.Node.ExternalGRPCPort) -} - -func (w *AgentWorker) Equals(worker exec.Worker) bool { - aw, ok := worker.(*AgentWorker) - if !ok { - return false - } - - return w.Node.NodeID == aw.Node.NodeID -} - -type FromToParser interface { - Parse(ft FromTo, blder *builder.PlanBuilder) error -} - type DefaultParser struct { EC cdssdk.ECRedundancy } @@ -72,14 +25,14 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { } type ParseContext struct { - Ft FromTo - DAG *Graph + Ft parser.FromTo + DAG *ops.Graph // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 - StreamRange Range + StreamRange exec.Range } -func (p *DefaultParser) Parse(ft FromTo, blder *builder.PlanBuilder) error { +func (p *DefaultParser) Parse(ft parser.FromTo, blder *exec.PlanBuilder) error { ctx := ParseContext{Ft: ft} // 分成两个阶段: @@ -130,9 +83,9 @@ func (p *DefaultParser) Parse(ft FromTo, blder *builder.PlanBuilder) error { return p.buildPlan(&ctx, blder) } -func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *StreamVar { - var ret *StreamVar - ctx.DAG.Walk(func(n *dag.Node[NodeProps, VarProps]) bool { +func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *ops.StreamVar { + var ret *ops.StreamVar + ctx.DAG.Walk(func(n *dag.Node[ops.NodeProps, ops.VarProps]) bool { for _, o := range n.OutputStreams { if o != nil && o.Props.StreamIndex == streamIndex { ret = o @@ -149,11 +102,12 @@ func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *St func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { stripSize := int64(p.EC.ChunkSize * p.EC.K) - rng := Range{ + rng := exec.Range{ Offset: math.MaxInt64, } - for _, to := range ctx.Ft.Toes { + for _, t := range ctx.Ft.Toes { + to := t.(ops.To) if to.GetDataIndex() == -1 { toRng := to.GetRange() rng.ExtendStart(math2.Floor(toRng.Offset, stripSize)) @@ -180,16 +134,18 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { ctx.StreamRange = rng } -func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo) error { +func (p *DefaultParser) extend(ctx *ParseContext, ft parser.FromTo) error { for _, f := range ft.Froms { - _, err := p.buildFromNode(ctx, &ft, f) + fr := f.(ops.From) + + _, err := p.buildFromNode(ctx, &ft, fr) if err != nil { return err } // 对于完整文件的From,生成Split指令 - if f.GetDataIndex() == -1 { - n, _ := dag.NewNode(ctx.DAG, &ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, NodeProps{}) + if fr.GetDataIndex() == -1 { + n, _ := dag.NewNode(ctx.DAG, &ops.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, ops.NodeProps{}) for i := 0; i < p.EC.K; i++ { n.OutputStreams[i].Props.StreamIndex = i } @@ -197,7 +153,7 @@ func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo) error { } // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 - ecInputStrs := make(map[int]*StreamVar) + ecInputStrs := make(map[int]*ops.StreamVar) loop: for _, o := range ctx.DAG.Nodes { for _, s := range o.OutputStreams { @@ -210,9 +166,9 @@ loop: } } if len(ecInputStrs) == p.EC.K { - mulNode, mulType := dag.NewNode(ctx.DAG, &MultiplyType{ + mulNode, mulType := dag.NewNode(ctx.DAG, &ops.MultiplyType{ EC: p.EC, - }, NodeProps{}) + }, ops.NodeProps{}) for _, s := range ecInputStrs { mulType.AddInput(mulNode, s) @@ -221,10 +177,10 @@ loop: mulType.NewOutput(mulNode, i) } - joinNode, _ := dag.NewNode(ctx.DAG, &ChunkedJoinType{ + joinNode, _ := dag.NewNode(ctx.DAG, &ops.ChunkedJoinType{ InputCount: p.EC.K, ChunkSize: p.EC.ChunkSize, - }, NodeProps{}) + }, ops.NodeProps{}) for i := 0; i < p.EC.K; i++ { // 不可能找不到流 @@ -235,14 +191,16 @@ loop: // 为每一个To找到一个输入流 for _, t := range ft.Toes { - n, err := p.buildToNode(ctx, &ft, t) + to := t.(ops.To) + + n, err := p.buildToNode(ctx, &ft, to) if err != nil { return err } - str := p.findOutputStream(ctx, t.GetDataIndex()) + str := p.findOutputStream(ctx, to.GetDataIndex()) if str == nil { - return fmt.Errorf("no output stream found for data index %d", t.GetDataIndex()) + return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex()) } str.To(n, 0) @@ -251,9 +209,9 @@ loop: return nil } -func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*Node, error) { - var repRange Range - var blkRange Range +func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *parser.FromTo, f ops.From) (*ops.Node, error) { + var repRange exec.Range + var blkRange exec.Range repRange.Offset = ctx.StreamRange.Offset blkRange.Offset = ctx.StreamRange.Offset / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize) @@ -266,14 +224,14 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*N } switch f := f.(type) { - case *FromWorker: - n, t := dag.NewNode(ctx.DAG, &IPFSReadType{ + case *ops.FromWorker: + n, t := dag.NewNode(ctx.DAG, &ops.IPFSReadType{ FileHash: f.FileHash, Option: ipfs.ReadOption{ Offset: 0, Length: -1, }, - }, NodeProps{ + }, ops.NodeProps{ From: f, }) n.OutputStreams[0].Props.StreamIndex = f.DataIndex @@ -291,13 +249,13 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*N } if f.Node != nil { - n.Env.ToEnvWorker(&AgentWorker{*f.Node}) + n.Env.ToEnvWorker(&ops.AgentWorker{*f.Node}) } return n, nil - case *FromExecutor: - n, _ := dag.NewNode(ctx.DAG, &FromExecutorType{Handle: f.Handle}, NodeProps{From: f}) + case *ops.FromExecutor: + n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, ops.NodeProps{From: f}) n.Env.ToEnvExecutor() n.OutputStreams[0].Props.StreamIndex = f.DataIndex @@ -316,20 +274,20 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*N } } -func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *FromTo, t To) (*Node, error) { +func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *parser.FromTo, t ops.To) (*ops.Node, error) { switch t := t.(type) { - case *ToNode: - n, _ := dag.NewNode(ctx.DAG, &IPFSWriteType{ + case *ops.ToNode: + n, _ := dag.NewNode(ctx.DAG, &ops.IPFSWriteType{ FileHashStoreKey: t.FileHashStoreKey, Range: t.Range, - }, NodeProps{ + }, ops.NodeProps{ To: t, }) return n, nil - case *ToExecutor: - n, _ := dag.NewNode(ctx.DAG, &ToExecutorType{Handle: t.Handle, Range: t.Range}, NodeProps{To: t}) + case *ops.ToExecutor: + n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, ops.NodeProps{To: t}) n.Env.ToEnvExecutor() return n, nil @@ -343,7 +301,7 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *FromTo, t To) (*Node, func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ChunkedJoinType](ctx.DAG, func(node *Node, typ *ChunkedJoinType) bool { + dag.WalkOnlyType[*ops.ChunkedJoinType](ctx.DAG, func(node *ops.Node, typ *ops.ChunkedJoinType) bool { if len(node.OutputStreams[0].Toes) > 0 { return true } @@ -362,7 +320,7 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*MultiplyType](ctx.DAG, func(node *Node, typ *MultiplyType) bool { + dag.WalkOnlyType[*ops.MultiplyType](ctx.DAG, func(node *ops.Node, typ *ops.MultiplyType) bool { for i2, out := range node.OutputStreams { if len(out.Toes) > 0 { continue @@ -391,7 +349,7 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { // 删除未使用的Split指令 func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ChunkedSplitType](ctx.DAG, func(node *Node, typ *ChunkedSplitType) bool { + dag.WalkOnlyType[*ops.ChunkedSplitType](ctx.DAG, func(node *ops.Node, typ *ops.ChunkedSplitType) bool { // Split出来的每一个流都没有被使用,才能删除这个指令 for _, out := range node.OutputStreams { if len(out.Toes) > 0 { @@ -412,9 +370,9 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ChunkedSplitType](ctx.DAG, func(splitNode *Node, typ *ChunkedSplitType) bool { + dag.WalkOnlyType[*ops.ChunkedSplitType](ctx.DAG, func(splitNode *ops.Node, typ *ops.ChunkedSplitType) bool { // Split指令的每一个输出都有且只有一个目的地 - var joinNode *Node + var joinNode *ops.Node for _, out := range splitNode.OutputStreams { if len(out.Toes) != 1 { continue @@ -432,7 +390,7 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { } // 且这个目的地要是一个Join指令 - _, ok := joinNode.Type.(*ChunkedJoinType) + _, ok := joinNode.Type.(*ops.ChunkedJoinType) if !ok { return true } @@ -466,7 +424,7 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { // 所以理论上不会出现有指令的位置始终无法确定的情况。 func (p *DefaultParser) pin(ctx *ParseContext) bool { changed := false - ctx.DAG.Walk(func(node *Node) bool { + ctx.DAG.Walk(func(node *ops.Node) bool { var toEnv *dag.NodeEnv for _, out := range node.OutputStreams { for _, to := range out.Toes { @@ -522,10 +480,10 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool { // 对于所有未使用的流,增加Drop指令 func (p *DefaultParser) dropUnused(ctx *ParseContext) { - ctx.DAG.Walk(func(node *Node) bool { + ctx.DAG.Walk(func(node *ops.Node) bool { for _, out := range node.OutputStreams { if len(out.Toes) == 0 { - n := ctx.DAG.NewNode(&DropType{}, NodeProps{}) + n := ctx.DAG.NewNode(&ops.DropType{}, ops.NodeProps{}) n.Env = node.Env out.To(n, 0) } @@ -536,14 +494,14 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { // 为IPFS写入指令存储结果 func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { - dag.WalkOnlyType[*IPFSWriteType](ctx.DAG, func(node *Node, typ *IPFSWriteType) bool { + dag.WalkOnlyType[*ops.IPFSWriteType](ctx.DAG, func(node *ops.Node, typ *ops.IPFSWriteType) bool { if typ.FileHashStoreKey == "" { return true } - n := ctx.DAG.NewNode(&StoreType{ + n := ctx.DAG.NewNode(&ops.StoreType{ StoreKey: typ.FileHashStoreKey, - }, NodeProps{}) + }, ops.NodeProps{}) n.Env.ToEnvExecutor() node.OutputValues[0].To(n, 0) @@ -553,7 +511,7 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func (p *DefaultParser) generateRange(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + ctx.DAG.Walk(func(node *ops.Node) bool { if node.Props.To == nil { return true } @@ -562,12 +520,12 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { toRng := node.Props.To.GetRange() if toDataIdx == -1 { - n := ctx.DAG.NewNode(&RangeType{ - Range: Range{ + n := ctx.DAG.NewNode(&ops.RangeType{ + Range: exec.Range{ Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }, - }, NodeProps{}) + }, ops.NodeProps{}) n.Env = node.InputStreams[0].From.Node.Env node.InputStreams[0].To(n, 0) @@ -580,12 +538,12 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { blkStart := blkStartIdx * int64(p.EC.ChunkSize) - n := ctx.DAG.NewNode(&RangeType{ - Range: Range{ + n := ctx.DAG.NewNode(&ops.RangeType{ + Range: exec.Range{ Offset: toRng.Offset - blkStart, Length: toRng.Length, }, - }, NodeProps{}) + }, ops.NodeProps{}) n.Env = node.InputStreams[0].From.Node.Env node.InputStreams[0].To(n, 0) @@ -599,13 +557,13 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { // 生成Clone指令 func (p *DefaultParser) generateClone(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + ctx.DAG.Walk(func(node *ops.Node) bool { for _, out := range node.OutputStreams { if len(out.Toes) <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &CloneStreamType{}, NodeProps{}) + n, t := dag.NewNode(ctx.DAG, &ops.CloneStreamType{}, ops.NodeProps{}) n.Env = node.Env for _, to := range out.Toes { t.NewOutput(node).To(to.Node, to.SlotIndex) @@ -619,7 +577,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { continue } - n, t := dag.NewNode(ctx.DAG, &CloneVarType{}, NodeProps{}) + n, t := dag.NewNode(ctx.DAG, &ops.CloneVarType{}, ops.NodeProps{}) n.Env = node.Env for _, to := range out.Toes { t.NewOutput(node).To(to.Node, to.SlotIndex) @@ -634,7 +592,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { // 生成Send指令 func (p *DefaultParser) generateSend(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + ctx.DAG.Walk(func(node *ops.Node) bool { for _, out := range node.OutputStreams { to := out.Toes[0] if to.Node.Env.Equals(node.Env) { @@ -644,11 +602,11 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { switch to.Node.Env.Type { case dag.EnvExecutor: // // 如果是要送到Executor,则只能由Executor主动去拉取 - getNode := ctx.DAG.NewNode(&GetStreamType{}, NodeProps{}) + getNode := ctx.DAG.NewNode(&ops.GetStreamType{}, ops.NodeProps{}) getNode.Env.ToEnvExecutor() // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 - holdNode := ctx.DAG.NewNode(&HoldUntilType{}, NodeProps{}) + holdNode := ctx.DAG.NewNode(&ops.HoldUntilType{}, ops.NodeProps{}) holdNode.Env = node.Env // 将Get指令的信号送到Hold指令 @@ -663,7 +621,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - n := ctx.DAG.NewNode(&SendStreamType{}, NodeProps{}) + n := ctx.DAG.NewNode(&ops.SendStreamType{}, ops.NodeProps{}) n.Env = node.Env n.OutputStreams[0].To(to.Node, to.SlotIndex) out.Toes = nil @@ -680,11 +638,11 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { switch to.Node.Env.Type { case dag.EnvExecutor: // // 如果是要送到Executor,则只能由Executor主动去拉取 - getNode := ctx.DAG.NewNode(&GetVaType{}, NodeProps{}) + getNode := ctx.DAG.NewNode(&ops.GetVaType{}, ops.NodeProps{}) getNode.Env.ToEnvExecutor() // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 - holdNode := ctx.DAG.NewNode(&HoldUntilType{}, NodeProps{}) + holdNode := ctx.DAG.NewNode(&ops.HoldUntilType{}, ops.NodeProps{}) holdNode.Env = node.Env // 将Get指令的信号送到Hold指令 @@ -699,7 +657,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - n := ctx.DAG.NewNode(&SendVarType{}, NodeProps{}) + n := ctx.DAG.NewNode(&ops.SendVarType{}, ops.NodeProps{}) n.Env = node.Env n.OutputValues[0].To(to.Node, to.SlotIndex) out.Toes = nil @@ -712,9 +670,9 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { } // 生成Plan -func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *builder.PlanBuilder) error { +func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *exec.PlanBuilder) error { var retErr error - ctx.DAG.Walk(func(node *dag.Node[NodeProps, VarProps]) bool { + ctx.DAG.Walk(func(node *dag.Node[ops.NodeProps, ops.VarProps]) bool { for _, out := range node.OutputStreams { if out.Props.Var != nil { continue @@ -737,9 +695,9 @@ func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *builder.PlanBuilder) } switch out.Props.ValueType { - case StringValueVar: + case ops.StringValueVar: out.Props.Var = blder.NewStringVar() - case SignalValueVar: + case ops.SignalValueVar: out.Props.Var = blder.NewSignalVar() } @@ -751,9 +709,9 @@ func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *builder.PlanBuilder) } switch in.Props.ValueType { - case StringValueVar: + case ops.StringValueVar: in.Props.Var = blder.NewStringVar() - case SignalValueVar: + case ops.SignalValueVar: in.Props.Var = blder.NewSignalVar() } } diff --git a/common/pkgs/ioswitch/switch.go b/common/pkgs/ioswitch/switch.go deleted file mode 100644 index f077a00..0000000 --- a/common/pkgs/ioswitch/switch.go +++ /dev/null @@ -1,151 +0,0 @@ -package ioswitch - -import ( - "context" - "fmt" - "sync" - - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/common/utils/sync2" -) - -type bindingVars struct { - Waittings []Var - Bindeds []Var - Callback *future.SetVoidFuture -} - -type Switch struct { - plan Plan - vars map[VarID]Var - bindings []*bindingVars - lock sync.Mutex -} - -func NewSwitch(plan Plan) *Switch { - planning := Switch{ - plan: plan, - vars: make(map[VarID]Var), - } - - return &planning -} - -func (s *Switch) Plan() *Plan { - return &s.plan -} - -func (s *Switch) Run(ctx context.Context) error { - ctx2, cancel := context.WithCancel(ctx) - defer cancel() - - return sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error { - err := o.Execute(ctx2, s) - - s.lock.Lock() - defer s.lock.Unlock() - - if err != nil { - cancel() - return fmt.Errorf("%T: %w", o, err) - } - - return nil - }) -} - -func (s *Switch) BindVars(ctx context.Context, vs ...Var) error { - s.lock.Lock() - - callback := future.NewSetVoid() - binding := &bindingVars{ - Callback: callback, - } - - for _, v := range vs { - v2 := s.vars[v.GetID()] - if v2 == nil { - binding.Waittings = append(binding.Waittings, v) - continue - } - - if err := AssignVar(v2, v); err != nil { - s.lock.Unlock() - return fmt.Errorf("assign var %v to %v: %w", v2.GetID(), v.GetID(), err) - } - - binding.Bindeds = append(binding.Bindeds, v) - } - - if len(binding.Waittings) == 0 { - s.lock.Unlock() - return nil - } - - s.bindings = append(s.bindings, binding) - s.lock.Unlock() - - err := callback.Wait(ctx) - - s.lock.Lock() - defer s.lock.Unlock() - - s.bindings = lo2.Remove(s.bindings, binding) - - return err -} - -func (s *Switch) PutVars(vs ...Var) { - s.lock.Lock() - defer s.lock.Unlock() - -loop: - for _, v := range vs { - for ib, b := range s.bindings { - for iw, w := range b.Waittings { - if w.GetID() != v.GetID() { - continue - } - - if err := AssignVar(v, w); err != nil { - b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err)) - // 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败 - continue loop - } - - b.Bindeds = append(b.Bindeds, w) - b.Waittings = lo2.RemoveAt(b.Waittings, iw) - if len(b.Waittings) == 0 { - b.Callback.SetVoid() - s.bindings = lo2.RemoveAt(s.bindings, ib) - } - - // 绑定成功,继续最外层循环 - continue loop - } - - } - - // 如果没有绑定,则直接放入变量表中 - s.vars[v.GetID()] = v - } -} - -func BindArrayVars[T Var](sw *Switch, ctx context.Context, vs []T) error { - var vs2 []Var - for _, v := range vs { - vs2 = append(vs2, v) - } - - return sw.BindVars(ctx, vs2...) -} - -func PutArrayVars[T Var](sw *Switch, vs []T) { - var vs2 []Var - for _, v := range vs { - vs2 = append(vs2, v) - } - - sw.PutVars(vs2...) -} diff --git a/common/pkgs/ioswitch/utils.go b/common/pkgs/ioswitch/utils.go deleted file mode 100644 index ccbe9c1..0000000 --- a/common/pkgs/ioswitch/utils.go +++ /dev/null @@ -1,24 +0,0 @@ -package ioswitch - -import ( - "fmt" - "reflect" -) - -func AssignVar(from Var, to Var) error { - if reflect.TypeOf(from) != reflect.TypeOf(to) { - return fmt.Errorf("cannot assign %T to %T", from, to) - } - - switch from := from.(type) { - case *StreamVar: - to.(*StreamVar).Stream = from.Stream - case *IntVar: - to.(*IntVar).Value = from.Value - case *StringVar: - to.(*StringVar).Value = from.Value - case *SignalVar: - } - - return nil -}