From 7575bfbb9d889c4713794132228f85cab11cfaac Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 18 Nov 2024 09:39:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Load=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E7=9A=84op?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch2/ops2/shared_store.go | 107 +++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 common/pkgs/ioswitch2/ops2/shared_store.go diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/shared_store.go new file mode 100644 index 0000000..112d06c --- /dev/null +++ b/common/pkgs/ioswitch2/ops2/shared_store.go @@ -0,0 +1,107 @@ +package ops2 + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" +) + +func init() { + exec.UseOp[*ShardWrite]() +} + +type SharedLoad struct { + Input exec.VarID `json:"input"` + StorageID cdssdk.StorageID `json:"storageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + Path string `json:"path"` + FullPathOutput exec.VarID `json:"fullPathOutput"` +} + +func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + logger. + WithField("Input", o.Input). + Debugf("load file to shared store") + defer logger.Debugf("load file to shared store finished") + + stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx) + if err != nil { + return fmt.Errorf("getting storage manager: %w", err) + } + + store, err := stgMgr.GetSharedStore(o.StorageID) + if err != nil { + return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) + } + + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) + if err != nil { + return err + } + defer input.Stream.Close() + + fullPath, err := store.WritePackageObject(o.UserID, o.PackageID, o.Path, input.Stream) + if err != nil { + return fmt.Errorf("writing file to shard store: %w", err) + } + + if o.FullPathOutput > 0 { + e.PutVar(o.FullPathOutput, &exec.StringValue{ + Value: fullPath, + }) + } + return nil +} + +func (o *SharedLoad) String() string { + return fmt.Sprintf("SharedLoad %v -> %v:%v/%v/%v", o.Input, o.StorageID, o.UserID, o.PackageID, o.Path) +} + +type SharedLoadNode struct { + dag.NodeBase + StorageID cdssdk.StorageID + UserID cdssdk.UserID + PackageID cdssdk.PackageID + Path string +} + +func (b *GraphNodeBuilder) NewSharedLoad(stgID cdssdk.StorageID, userID cdssdk.UserID, packageID cdssdk.PackageID, path string) *SharedLoadNode { + node := &SharedLoadNode{ + StorageID: stgID, + UserID: userID, + PackageID: packageID, + Path: path, + } + b.AddNode(node) + return node +} + +func (t *SharedLoadNode) SetInput(input *dag.Var) { + t.InputStreams().EnsureSize(1) + input.StreamTo(t, 0) + t.OutputValues().SetupNew(t, t.Graph().NewVar()) +} + +func (t *SharedLoadNode) Input() dag.Slot { + return dag.Slot{ + Var: t.InputStreams().Get(0), + Index: 0, + } +} + +func (t *SharedLoadNode) FullPathVar() *dag.Var { + return t.OutputValues().Get(0) +} + +func (t *SharedLoadNode) GenerateOp() (exec.Op, error) { + return &ShardWrite{ + Input: t.InputStreams().Get(0).VarID, + FileHash: t.OutputValues().Get(0).VarID, + StorageID: t.StorageID, + }, nil +}