From a5f34619aeef7489e1a69098d6893b7081b514d6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 3 Dec 2024 10:01:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=86=E7=89=87=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0Op?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch2/ops2/bypass.go | 111 ++++++++ common/pkgs/ioswitch2/ops2/clone.go | 4 +- common/pkgs/ioswitch2/ops2/driver.go | 12 +- common/pkgs/ioswitch2/ops2/ec.go | 2 +- common/pkgs/ioswitch2/ops2/file.go | 12 +- common/pkgs/ioswitch2/ops2/multipart.go | 285 ++++++++++++------- common/pkgs/ioswitch2/ops2/ops.go | 4 +- common/pkgs/ioswitch2/ops2/shard_store.go | 12 +- common/pkgs/ioswitch2/ops2/shared_store.go | 6 +- common/pkgs/ioswitch2/parser/parser.go | 26 +- common/pkgs/ioswitchlrc/ops2/clone.go | 4 +- common/pkgs/ioswitchlrc/ops2/ec.go | 2 +- common/pkgs/ioswitchlrc/ops2/ops.go | 4 +- common/pkgs/ioswitchlrc/ops2/shard_store.go | 12 +- common/pkgs/ioswitchlrc/parser/generator.go | 12 +- common/pkgs/ioswitchlrc/parser/passes.go | 12 +- common/pkgs/storage/cos/multiPartUploader.go | 21 +- common/pkgs/storage/obs/multiPartUploader.go | 13 +- common/pkgs/storage/oss/multiPartUploader.go | 17 +- common/pkgs/storage/svcmgr/mgr.go | 12 + common/pkgs/storage/types/bypass.go | 17 +- common/pkgs/storage/types/s3_client.go | 25 +- common/pkgs/storage/types/temp_store.go | 4 +- common/pkgs/storage/utils/utils.go | 13 + 24 files changed, 440 insertions(+), 202 deletions(-) create mode 100644 common/pkgs/ioswitch2/ops2/bypass.go diff --git a/common/pkgs/ioswitch2/ops2/bypass.go b/common/pkgs/ioswitch2/ops2/bypass.go new file mode 100644 index 0000000..473079e --- /dev/null +++ b/common/pkgs/ioswitch2/ops2/bypass.go @@ -0,0 +1,111 @@ +package ops2 + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +func init() { + exec.UseOp[*BypassToShardStore]() + exec.UseVarValue[*BypassFileInfoValue]() +} + +type BypassFileInfoValue struct { + types.BypassFileInfo +} + +func (v *BypassFileInfoValue) Clone() exec.VarValue { + return &BypassFileInfoValue{ + BypassFileInfo: v.BypassFileInfo, + } +} + +type BypassHandleResultValue struct { + Commited bool +} + +func (r *BypassHandleResultValue) Clone() exec.VarValue { + return &BypassHandleResultValue{ + Commited: r.Commited, + } +} + +type BypassToShardStore struct { + StorageID cdssdk.StorageID + BypassFileInfo exec.VarID + BypassCallback exec.VarID +} + +func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + svcMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx) + if err != nil { + return err + } + + shardStore, err := svcMgr.GetShardStore(o.StorageID) + if err != nil { + return err + } + + notifier, ok := shardStore.(types.BypassNotifier) + if !ok { + return fmt.Errorf("shard store %v not support bypass", o.StorageID) + } + + fileInfo, err := exec.BindVar[*BypassFileInfoValue](e, ctx.Context, o.BypassFileInfo) + if err != nil { + return err + } + + err = notifier.BypassUploaded(fileInfo.BypassFileInfo) + if err != nil { + return err + } + + e.PutVar(o.BypassCallback, &BypassHandleResultValue{Commited: true}) + return nil +} + +func (o *BypassToShardStore) String() string { + return fmt.Sprintf("BypassToShardStore[StorageID:%v] Info: %v, Callback: %v", o.StorageID, o.BypassFileInfo, o.BypassCallback) +} + +type BypassToShardStoreNode struct { + dag.NodeBase + StorageID cdssdk.StorageID +} + +func (b *GraphNodeBuilder) NewBypassToShardStore(storageID cdssdk.StorageID) *BypassToShardStoreNode { + node := &BypassToShardStoreNode{ + StorageID: storageID, + } + b.AddNode(node) + + node.InputValues().Init(1) + node.OutputValues().Init(node, 1) + return node +} + +func (n *BypassToShardStoreNode) BypassFileInfoSlot() dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: n, + Index: 0, + } +} + +func (n *BypassToShardStoreNode) BypassCallbackVar() *dag.ValueVar { + return n.OutputValues().Get(0) +} + +func (t *BypassToShardStoreNode) GenerateOp() (exec.Op, error) { + return &BypassToShardStore{ + StorageID: t.StorageID, + BypassFileInfo: t.BypassFileInfoSlot().Var().VarID, + BypassCallback: t.BypassCallbackVar().VarID, + }, nil +} diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index 4bf1cdc..328249e 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -89,7 +89,7 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { } func (t *CloneStreamType) NewOutput() *dag.StreamVar { - return t.OutputStreams().AppendNew(t).Var + return t.OutputStreams().AppendNew(t).Var() } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { @@ -119,7 +119,7 @@ func (t *CloneVarType) SetInput(raw *dag.ValueVar) { } func (t *CloneVarType) NewOutput() *dag.ValueVar { - return t.OutputValues().AppendNew(t).Var + return t.OutputValues().AppendNew(t).Var() } func (t *CloneVarType) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitch2/ops2/driver.go b/common/pkgs/ioswitch2/ops2/driver.go index a3e539c..e38cb91 100644 --- a/common/pkgs/ioswitch2/ops2/driver.go +++ b/common/pkgs/ioswitch2/ops2/driver.go @@ -29,9 +29,9 @@ func (f *FromDriverNode) GetFrom() ioswitch2.From { return f.From } -func (t *FromDriverNode) Output() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.OutputStreams().Get(0), +func (t *FromDriverNode) Output() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } @@ -67,9 +67,9 @@ func (t *ToDriverNode) SetInput(v *dag.StreamVar) { v.To(t, 0) } -func (t *ToDriverNode) Input() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.InputStreams().Get(0), +func (t *ToDriverNode) Input() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index e87666d..2e81063 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -142,7 +142,7 @@ func (t *ECMultiplyNode) RemoveAllInputs() { func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return t.OutputStreams().AppendNew(t).Var + return t.OutputStreams().AppendNew(t).Var() } func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index f490dc6..edd1fff 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -93,9 +93,9 @@ func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode { return node } -func (t *FileReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.OutputStreams().Get(0), +func (t *FileReadNode) Output() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } @@ -126,9 +126,9 @@ func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode { return node } -func (t *FileWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.InputStreams().Get(0), +func (t *FileWriteNode) Input() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index c98d2cb..69131f2 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -1,190 +1,265 @@ package ops2 -/* import ( "fmt" + "time" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" log "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/cos" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/oss" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - "io" - "time" ) func init() { - exec.UseOp[*MultipartManage]() + exec.UseOp[*MultipartInitiator]() exec.UseOp[*MultipartUpload]() - exec.UseVarValue[*InitUploadValue]() + exec.UseVarValue[*MultipartUploadArgsValue]() + exec.UseVarValue[*UploadedPartInfoValue]() } -type InitUploadValue struct { - Key string `xml:"Key"` // Object name to upload - UploadID string `xml:"UploadId"` // Generated UploadId +type MultipartUploadArgsValue struct { + Key string + InitState types.MultipartInitState } -func (v *InitUploadValue) Clone() exec.VarValue { - return &*v +func (v *MultipartUploadArgsValue) Clone() exec.VarValue { + return &MultipartUploadArgsValue{ + InitState: v.InitState, + } } -type MultipartManage struct { - Address cdssdk.StorageAddress `json:"address"` - UploadArgs exec.VarID `json:"uploadArgs"` - UploadOutput exec.VarID `json:"uploadOutput"` - StorageID cdssdk.StorageID `json:"storageID"` +type UploadedPartInfoValue struct { + types.UploadedPartInfo } -func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - manager, err := exec.GetValueByType[*mgr.Manager](ctx) - if err != nil { - return err +func (v *UploadedPartInfoValue) Clone() exec.VarValue { + return &UploadedPartInfoValue{ + UploadedPartInfo: v.UploadedPartInfo, } +} - var client types.MultipartUploader - switch addr := o.Address.(type) { - case *cdssdk.OSSAddress: - client = oss.NewMultiPartUpload(addr) - case *cdssdk.OBSAddress: - client = obs.NewMultiPartUpload(addr) - case *cdssdk.COSAddress: - client = cos.NewMultiPartUpload(addr) +type MultipartInitiator struct { + StorageID cdssdk.StorageID + UploadArgs exec.VarID + UploadedParts []exec.VarID + BypassFileOutput exec.VarID // 分片上传之后的临时文件的路径 + BypassCallback exec.VarID // 临时文件使用结果,用于告知Initiator如何处理临时文件 +} + +func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx) + if err != nil { + return err } - defer client.Close() - tempStore, err := manager.GetTempStore(o.StorageID) + tempStore, err := svcmgr.GetComponent[types.TempStore](stgMgr, o.StorageID) if err != nil { return err } objName := tempStore.CreateTemp() + defer tempStore.Drop(objName) - uploadID, err := client.InitiateMultipartUpload(objName) + initiator, err := svcmgr.GetComponent[types.MultipartInitiator](stgMgr, o.StorageID) if err != nil { return err } - e.PutVar(o.UploadArgs, &InitUploadValue{ - UploadID: uploadID, - Key: objName, - }) - parts, err := exec.BindVar[*UploadPartOutputValue](e, ctx.Context, o.UploadOutput) + // 启动一个新的上传任务 + initState, err := initiator.Initiate(ctx.Context, objName) if err != nil { return err } - err = client.CompleteMultipartUpload(uploadID, objName, parts.Parts) + // 分发上传参数 + e.PutVar(o.UploadArgs, &MultipartUploadArgsValue{ + Key: objName, + InitState: initState, + }) + + // 收集分片上传结果 + partInfoValues, err := exec.BindArray[*UploadedPartInfoValue](e, ctx.Context, o.UploadedParts) if err != nil { - return err + return fmt.Errorf("getting uploaded parts: %v", err) } - return nil -} + partInfos := make([]types.UploadedPartInfo, len(partInfoValues)) + for i, v := range partInfoValues { + partInfos[i] = v.UploadedPartInfo + } -func (o *MultipartManage) String() string { - return "MultipartManage" -} + // 完成分片上传 + compInfo, err := initiator.Complete(ctx.Context, partInfos) + if err != nil { + return fmt.Errorf("completing multipart upload: %v", err) + } -type MultipartManageNode struct { - dag.NodeBase - Address cdssdk.StorageAddress - StorageID cdssdk.StorageID `json:"storageID"` -} + // 告知后续Op临时文件的路径 + e.PutVar(o.BypassFileOutput, &BypassFileInfoValue{ + BypassFileInfo: types.BypassFileInfo{ + TempFilePath: objName, + FileHash: compInfo.FileHash, + }, + }) -func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress, storageID cdssdk.StorageID) *MultipartManageNode { - node := &MultipartManageNode{ - Address: addr, - StorageID: storageID, + // 等待后续Op处理临时文件 + cb, err := exec.BindVar[*BypassHandleResultValue](e, ctx.Context, o.BypassCallback) + if err != nil { + return fmt.Errorf("getting temp file callback: %v", err) } - b.AddNode(node) - return node -} -func (t *MultipartManageNode) GenerateOp() (exec.Op, error) { - return &MultipartManage{ - Address: t.Address, - StorageID: t.StorageID, - }, nil -} + if cb.Commited { + tempStore.Commited(objName) + } -type MultipartUpload struct { - Address cdssdk.StorageAddress `json:"address"` - UploadArgs exec.VarID `json:"uploadArgs"` - UploadOutput exec.VarID `json:"uploadOutput"` - PartNumbers []int `json:"partNumbers"` - PartSize []int64 `json:"partSize"` - Input exec.VarID `json:"input"` + return nil } -type UploadPartOutputValue struct { - Parts []*types.UploadPartOutput `json:"parts"` +func (o *MultipartInitiator) String() string { + return "MultipartInitiator" } -func (v *UploadPartOutputValue) Clone() exec.VarValue { - return &*v +type MultipartUpload struct { + Storage stgmod.StorageDetail + UploadArgs exec.VarID + UploadResult exec.VarID + PartStream exec.VarID + PartNumber int + PartSize int64 } func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - initUploadResult, err := exec.BindVar[*InitUploadValue](e, ctx.Context, o.UploadArgs) + uploadArgs, err := exec.BindVar[*MultipartUploadArgsValue](e, ctx.Context, o.UploadArgs) if err == nil { return err } - input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) + partStr, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.PartStream) if err != nil { return err } - defer input.Stream.Close() + defer partStr.Stream.Close() - var client types.MultipartUploader - switch addr := o.Address.(type) { - case *cdssdk.OSSAddress: - client = oss.NewMultiPartUpload(addr) + uploader, err := factory.CreateComponent[types.MultipartUploader](o.Storage) + if err != nil { + return err } - var parts UploadPartOutputValue - for i := 0; i < len(o.PartNumbers); i++ { - startTime := time.Now() - uploadPart, err := client.UploadPart(initUploadResult.UploadID, initUploadResult.Key, o.PartSize[i], o.PartNumbers[i], io.LimitReader(input.Stream, o.PartSize[i])) - log.Debugf("upload multipart spend time: %v", time.Since(startTime)) - if err != nil { - return fmt.Errorf("failed to upload part: %w", err) - } - parts.Parts = append(parts.Parts, uploadPart) + startTime := time.Now() + uploadedInfo, err := uploader.UploadPart(ctx.Context, uploadArgs.InitState, uploadArgs.Key, o.PartSize, o.PartNumber, partStr.Stream) + log.Debugf("upload finished in %v", time.Since(startTime)) + + if err != nil { + return err } - e.PutVar(o.UploadOutput, &parts) + e.PutVar(o.UploadResult, &UploadedPartInfoValue{ + uploadedInfo, + }) return nil } func (o *MultipartUpload) String() string { - return "MultipartUpload" + return fmt.Sprintf("MultipartUpload[PartNumber=%v,PartSize=%v] Args: %v, Result: %v, Stream: %v", o.PartNumber, o.PartSize, o.UploadArgs, o.UploadResult, o.PartStream) +} + +type MultipartInitiatorNode struct { + dag.NodeBase + StorageID cdssdk.StorageID `json:"storageID"` +} + +func (b *GraphNodeBuilder) NewMultipartInitiator(storageID cdssdk.StorageID) *MultipartInitiatorNode { + node := &MultipartInitiatorNode{ + StorageID: storageID, + } + b.AddNode(node) + + node.OutputValues().Init(node, 2) + node.InputValues().Init(1) + return node +} + +func (n *MultipartInitiatorNode) UploadArgsVar() *dag.ValueVar { + return n.OutputValues().Get(0) +} + +func (n *MultipartInitiatorNode) BypassFileInfoVar() *dag.ValueVar { + return n.OutputValues().Get(1) +} + +func (n *MultipartInitiatorNode) BypassCallbackSlot() dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: n, + Index: 0, + } +} + +func (n *MultipartInitiatorNode) AppendPartInfoSlot() dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: n, + Index: n.InputStreams().EnlargeOne(), + } +} + +func (n *MultipartInitiatorNode) GenerateOp() (exec.Op, error) { + return &MultipartInitiator{ + StorageID: n.StorageID, + UploadArgs: n.UploadArgsVar().VarID, + UploadedParts: n.InputValues().GetVarIDsStart(1), + BypassFileOutput: n.BypassFileInfoVar().VarID, + BypassCallback: n.BypassCallbackSlot().Var().VarID, + }, nil } type MultipartUploadNode struct { dag.NodeBase - Address cdssdk.StorageAddress - PartNumbers []int `json:"partNumbers"` - PartSize []int64 `json:"partSize"` + Storage stgmod.StorageDetail + PartNumber int + PartSize int64 } -func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress, partNumbers []int, partSize []int64) *MultipartUploadNode { +func (b *GraphNodeBuilder) NewMultipartUpload(stg stgmod.StorageDetail, partNumber int, partSize int64) *MultipartUploadNode { node := &MultipartUploadNode{ - Address: addr, - PartNumbers: partNumbers, - PartSize: partSize, + Storage: stg, + PartNumber: partNumber, + PartSize: partSize, } b.AddNode(node) + + node.InputValues().Init(1) + node.OutputValues().Init(node, 1) + node.InputStreams().Init(1) return node } -func (t MultipartUploadNode) GenerateOp() (exec.Op, error) { +func (n *MultipartUploadNode) UploadArgsSlot() dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: n, + Index: 0, + } +} + +func (n *MultipartUploadNode) UploadResultVar() *dag.ValueVar { + return n.OutputValues().Get(0) +} + +func (n *MultipartUploadNode) PartStreamSlot() dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: n, + Index: 0, + } +} + +func (n *MultipartUploadNode) GenerateOp() (exec.Op, error) { return &MultipartUpload{ - Address: t.Address, - PartNumbers: t.PartNumbers, - PartSize: t.PartSize, + Storage: n.Storage, + UploadArgs: n.UploadArgsSlot().Var().VarID, + UploadResult: n.UploadResultVar().VarID, + PartStream: n.PartStreamSlot().Var().VarID, + PartNumber: n.PartNumber, + PartSize: n.PartSize, }, nil } -*/ diff --git a/common/pkgs/ioswitch2/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go index 701bad9..9138841 100644 --- a/common/pkgs/ioswitch2/ops2/ops.go +++ b/common/pkgs/ioswitch2/ops2/ops.go @@ -17,13 +17,13 @@ func NewGraphNodeBuilder() *GraphNodeBuilder { type FromNode interface { dag.Node GetFrom() ioswitch2.From - Output() dag.StreamSlot + Output() dag.StreamOutputSlot } type ToNode interface { dag.Node GetTo() ioswitch2.To - Input() dag.StreamSlot + Input() dag.StreamOutputSlot SetInput(input *dag.StreamVar) } diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 3a7906c..800e1ab 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -137,9 +137,9 @@ func (t *ShardReadNode) GetFrom() ioswitch2.From { return t.From } -func (t *ShardReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.OutputStreams().Get(0), +func (t *ShardReadNode) Output() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } @@ -184,9 +184,9 @@ func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { input.To(t, 0) } -func (t *ShardWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.InputStreams().Get(0), +func (t *ShardWriteNode) Input() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/shared_store.go index 2c6b629..18f06e8 100644 --- a/common/pkgs/ioswitch2/ops2/shared_store.go +++ b/common/pkgs/ioswitch2/ops2/shared_store.go @@ -95,9 +95,9 @@ func (t *SharedLoadNode) SetInput(input *dag.StreamVar) { input.To(t, 0) } -func (t *SharedLoadNode) Input() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.InputStreams().Get(0), +func (t *SharedLoadNode) Input() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index e618e7b..94d7cc2 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -221,7 +221,7 @@ func extend(ctx *ParseContext) error { } ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ - Stream: frNode.Output().Var, + Stream: frNode.Output().Var(), StreamIndex: fr.GetStreamIndex(), }) @@ -230,7 +230,7 @@ func extend(ctx *ParseContext) error { // 只有输入输出需要EC编码的块时,才生成相关指令 if ctx.UseEC { splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K) - splitNode.Split(frNode.Output().Var) + splitNode.Split(frNode.Output().Var()) for i := 0; i < ctx.Ft.ECParam.K; i++ { ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ Stream: splitNode.SubStream(i), @@ -242,7 +242,7 @@ func extend(ctx *ParseContext) error { // 同上 if ctx.UseSegment { splitNode := ctx.DAG.NewSegmentSplit(ctx.Ft.SegmentParam.Segments) - splitNode.SetInput(frNode.Output().Var) + splitNode.SetInput(frNode.Output().Var()) for i := 0; i < len(ctx.Ft.SegmentParam.Segments); i++ { ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ Stream: splitNode.Segment(i), @@ -893,12 +893,12 @@ func generateRange(ctx *ParseContext) { if toStrIdx.IsRaw() { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.Src.Env() - rnged := n.RangeStream(toInput.Var, exec.Range{ + *n.Env() = *toInput.Var().Src.Env() + rnged := n.RangeStream(toInput.Var(), exec.Range{ Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }) - toInput.Var.NotTo(toNode) + toInput.Var().NotTo(toNode) toNode.SetInput(rnged) } else if toStrIdx.IsEC() { @@ -909,15 +909,15 @@ func generateRange(ctx *ParseContext) { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.Src.Env() - rnged := n.RangeStream(toInput.Var, exec.Range{ + *n.Env() = *toInput.Var().Src.Env() + rnged := n.RangeStream(toInput.Var(), exec.Range{ Offset: toRng.Offset - blkStart, Length: toRng.Length, }) - toInput.Var.NotTo(toNode) + toInput.Var().NotTo(toNode) toNode.SetInput(rnged) } else if toStrIdx.IsSegment() { - // if frNode, ok := toNode.Input().Var.From().Node.(ops2.FromNode); ok { + // if frNode, ok := toNode.Input().Var().From().Node.(ops2.FromNode); ok { // // 目前只有To也是分段时,才可能对接一个提供分段的From,此时不需要再生成Range指令 // if frNode.GetFrom().GetStreamIndex().IsSegment() { // continue @@ -929,12 +929,12 @@ func generateRange(ctx *ParseContext) { // n := ctx.DAG.NewRange() // toInput := toNode.Input() - // *n.Env() = *toInput.Var.From().Node.Env() - // rnged := n.RangeStream(toInput.Var, exec.Range{ + // *n.Env() = *toInput.Var().From().Node.Env() + // rnged := n.RangeStream(toInput.Var(), exec.Range{ // Offset: strStart - ctx.StreamRange.Offset, // Length: toRng.Length, // }) - // toInput.Var.NotTo(toNode, toInput.Index) + // toInput.Var().NotTo(toNode, toInput.Index) // toNode.SetInput(rnged) } } diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index 5e730be..88f3f4b 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -89,7 +89,7 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { } func (t *CloneStreamType) NewOutput() *dag.StreamVar { - return t.OutputStreams().AppendNew(t).Var + return t.OutputStreams().AppendNew(t).Var() } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { @@ -119,7 +119,7 @@ func (t *CloneVarType) SetInput(raw *dag.ValueVar) { } func (t *CloneVarType) NewOutput() *dag.ValueVar { - return t.OutputValues().AppendNew(t).Var + return t.OutputValues().AppendNew(t).Var() } func (t *CloneVarType) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 4e180f8..b12b71c 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -141,7 +141,7 @@ func (t *LRCConstructAnyNode) RemoveAllInputs() { func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return t.OutputStreams().AppendNew(t).Var + return t.OutputStreams().AppendNew(t).Var() } func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go index a41ec08..412cb1d 100644 --- a/common/pkgs/ioswitchlrc/ops2/ops.go +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -15,12 +15,12 @@ func NewGraphNodeBuilder() *GraphNodeBuilder { type FromNode interface { dag.Node - Output() dag.StreamSlot + Output() dag.StreamOutputSlot } type ToNode interface { dag.Node - Input() dag.StreamSlot + Input() dag.StreamOutputSlot SetInput(input *dag.StreamVar) } diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index 1f8033e..d6348f5 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -137,9 +137,9 @@ func (t *ShardReadNode) GetFrom() ioswitchlrc.From { return t.From } -func (t *ShardReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.OutputStreams().Get(0), +func (t *ShardReadNode) Output() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } @@ -184,9 +184,9 @@ func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { input.To(t, 0) } -func (t *ShardWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.InputStreams().Get(0), +func (t *ShardWriteNode) Input() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, Index: 0, } } diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index 27e5425..8cd720e 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -70,7 +70,7 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr } ctx.ToNodes[to] = toNode - toNode.SetInput(frNode.Output().Var) + toNode.SetInput(frNode.Output().Var()) } else if idx < ctx.LRC.K { dataToes = append(dataToes, to) } else { @@ -84,7 +84,7 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr // 需要文件块,则生成Split指令 splitNode := ctx.DAG.NewChunkedSplit(ctx.LRC.ChunkSize, ctx.LRC.K) - splitNode.Split(frNode.Output().Var) + splitNode.Split(frNode.Output().Var()) for _, to := range dataToes { toNode, err := buildToNode(ctx, to) @@ -173,7 +173,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ } ctx.ToNodes[to] = toNode - toNode.SetInput(fr.Output().Var) + toNode.SetInput(fr.Output().Var()) continue } @@ -192,7 +192,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ conNode := ctx.DAG.NewLRCConstructAny(ctx.LRC) for i, fr := range frNodes { - conNode.AddInput(fr.Output().Var, i) + conNode.AddInput(fr.Output().Var(), i) } for _, to := range missedToes { @@ -218,7 +218,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ if fr == nil { joinNode.AddInput(conNode.NewOutput(i)) } else { - joinNode.AddInput(fr.Output().Var) + joinNode.AddInput(fr.Output().Var()) } } @@ -278,7 +278,7 @@ func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes return fmt.Errorf("building from node: %w", err) } - inputs = append(inputs, frNode.Output().Var) + inputs = append(inputs, frNode.Output().Var()) } missedGrpIdx := toes[0].GetDataIndex() diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 08c6153..d028b22 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -233,12 +233,12 @@ func generateRange(ctx *GenerateContext) { if toDataIdx == -1 { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.Src.Env() - rnged := n.RangeStream(toInput.Var, exec.Range{ + *n.Env() = *toInput.Var().Src.Env() + rnged := n.RangeStream(toInput.Var(), exec.Range{ Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }) - toInput.Var.NotTo(toNode) + toInput.Var().NotTo(toNode) toNode.SetInput(rnged) } else { @@ -249,12 +249,12 @@ func generateRange(ctx *GenerateContext) { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.Src.Env() - rnged := n.RangeStream(toInput.Var, exec.Range{ + *n.Env() = *toInput.Var().Src.Env() + rnged := n.RangeStream(toInput.Var(), exec.Range{ Offset: toRng.Offset - blkStart, Length: toRng.Length, }) - toInput.Var.NotTo(toNode) + toInput.Var().NotTo(toNode) toNode.SetInput(rnged) } } diff --git a/common/pkgs/storage/cos/multiPartUploader.go b/common/pkgs/storage/cos/multiPartUploader.go index 3155bcb..696de55 100644 --- a/common/pkgs/storage/cos/multiPartUploader.go +++ b/common/pkgs/storage/cos/multiPartUploader.go @@ -3,13 +3,13 @@ package cos import ( "context" "fmt" - "github.com/tencentyun/cos-go-sdk-v5" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" "io" "net/http" "net/url" + + "github.com/tencentyun/cos-go-sdk-v5" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) type MultiPartUploader struct { @@ -32,16 +32,15 @@ func NewMultiPartUpload(address *cdssdk.COSAddress) *MultiPartUploader { } } -func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) { +func (c *MultiPartUploader) Initiate(objectName string) (string, error) { v, _, err := c.client.Object.InitiateMultipartUpload(context.Background(), objectName, nil) if err != nil { - log.Error("Failed to initiate multipart upload: %v", err) - return "", err + return "", fmt.Errorf("failed to initiate multipart upload: %w", err) } return v.UploadID, nil } -func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) { +func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) { resp, err := c.client.Object.UploadPart( context.Background(), key, uploadID, partNumber, stream, nil, ) @@ -49,14 +48,14 @@ func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int return nil, fmt.Errorf("failed to upload part: %w", err) } - result := &types.UploadPartOutput{ + result := &types.UploadedPartInfo{ ETag: resp.Header.Get("ETag"), PartNumber: partNumber, } return result, nil } -func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, key string, parts []*types.UploadPartOutput) error { +func (c *MultiPartUploader) Complete(uploadID string, key string, parts []*types.UploadedPartInfo) error { opt := &cos.CompleteMultipartUploadOptions{} for i := 0; i < len(parts); i++ { opt.Parts = append(opt.Parts, cos.Object{ @@ -72,7 +71,7 @@ func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, key string, return nil } -func (c *MultiPartUploader) AbortMultipartUpload() { +func (c *MultiPartUploader) Abort() { } diff --git a/common/pkgs/storage/obs/multiPartUploader.go b/common/pkgs/storage/obs/multiPartUploader.go index 7d4b48e..26a18d4 100644 --- a/common/pkgs/storage/obs/multiPartUploader.go +++ b/common/pkgs/storage/obs/multiPartUploader.go @@ -2,11 +2,12 @@ package obs import ( "fmt" + "io" + "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" log "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - "io" ) type MultiPartUploader struct { @@ -26,7 +27,7 @@ func NewMultiPartUpload(address *cdssdk.OBSAddress) *MultiPartUploader { } } -func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) { +func (c *MultiPartUploader) Initiate(objectName string) (string, error) { input := &obs.InitiateMultipartUploadInput{} input.Bucket = c.bucket input.Key = objectName @@ -37,7 +38,7 @@ func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, return imur.UploadId, nil } -func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) { +func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) { uploadParam := &obs.UploadPartInput{ Bucket: c.bucket, Key: key, @@ -51,14 +52,14 @@ func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int if err != nil { return nil, fmt.Errorf("failed to upload part: %w", err) } - result := &types.UploadPartOutput{ + result := &types.UploadedPartInfo{ ETag: part.ETag, PartNumber: partNumber, } return result, nil } -func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error { +func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error { var uploadPart []obs.Part for i := 0; i < len(parts); i++ { uploadPart = append(uploadPart, obs.Part{ @@ -80,7 +81,7 @@ func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, } return nil } -func (c *MultiPartUploader) AbortMultipartUpload() { +func (c *MultiPartUploader) Abort() { } diff --git a/common/pkgs/storage/oss/multiPartUploader.go b/common/pkgs/storage/oss/multiPartUploader.go index 201e66a..7ce5771 100644 --- a/common/pkgs/storage/oss/multiPartUploader.go +++ b/common/pkgs/storage/oss/multiPartUploader.go @@ -2,11 +2,12 @@ package oss import ( "fmt" + "io" + "log" + "github.com/aliyun/aliyun-oss-go-sdk/oss" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - "io" - "log" ) type MultiPartUploader struct { @@ -14,7 +15,7 @@ type MultiPartUploader struct { bucket *oss.Bucket } -func NewMultiPartUpload(address *cdssdk.OSSAddress) *MultiPartUploader { +func NewMultiPartUpload(address *cdssdk.OSSType) *MultiPartUploader { // 创建OSSClient实例。 client, err := oss.New(address.Endpoint, address.AK, address.SK) if err != nil { @@ -32,7 +33,7 @@ func NewMultiPartUpload(address *cdssdk.OSSAddress) *MultiPartUploader { } } -func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) { +func (c *MultiPartUploader) Initiate(objectName string) (string, error) { imur, err := c.bucket.InitiateMultipartUpload(objectName) if err != nil { return "", fmt.Errorf("failed to initiate multipart upload: %w", err) @@ -40,7 +41,7 @@ func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, return imur.UploadID, nil } -func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) { +func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) { uploadParam := oss.InitiateMultipartUploadResult{ UploadID: uploadID, Key: key, @@ -50,14 +51,14 @@ func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int if err != nil { return nil, fmt.Errorf("failed to upload part: %w", err) } - result := &types.UploadPartOutput{ + result := &types.UploadedPartInfo{ ETag: part.ETag, PartNumber: partNumber, } return result, nil } -func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error { +func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error { notifyParam := oss.InitiateMultipartUploadResult{ UploadID: uploadID, Key: Key, @@ -77,7 +78,7 @@ func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, return nil } -func (c *MultiPartUploader) AbortMultipartUpload() { +func (c *MultiPartUploader) Abort() { } diff --git a/common/pkgs/storage/svcmgr/mgr.go b/common/pkgs/storage/svcmgr/mgr.go index c3cb161..83f627e 100644 --- a/common/pkgs/storage/svcmgr/mgr.go +++ b/common/pkgs/storage/svcmgr/mgr.go @@ -51,6 +51,18 @@ func (m *Manager) CreateService(detail stgmod.StorageDetail) error { return nil } +func (m *Manager) GetInfo(stgID cdssdk.StorageID) (stgmod.StorageDetail, error) { + m.lock.Lock() + defer m.lock.Unlock() + + stg := m.storages[stgID] + if stg == nil { + return stgmod.StorageDetail{}, types.ErrStorageNotFound + } + + return stg.Service.Info(), nil +} + // 查找指定Storage的ShardStore组件 func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) { return GetComponent[types.ShardStore](m, stgID) diff --git a/common/pkgs/storage/types/bypass.go b/common/pkgs/storage/types/bypass.go index f56c2d3..e8795db 100644 --- a/common/pkgs/storage/types/bypass.go +++ b/common/pkgs/storage/types/bypass.go @@ -1,7 +1,18 @@ package types -import "io" +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) -type BypassWriter interface { - Write(stream io.Reader) (string, error) +// type BypassWriter interface { +// Write(stream io.Reader) (string, error) +// } + +type BypassFileInfo struct { + TempFilePath string + FileHash cdssdk.FileHash +} + +type BypassNotifier interface { + BypassUploaded(info BypassFileInfo) error } diff --git a/common/pkgs/storage/types/s3_client.go b/common/pkgs/storage/types/s3_client.go index 4090218..9f9c11b 100644 --- a/common/pkgs/storage/types/s3_client.go +++ b/common/pkgs/storage/types/s3_client.go @@ -1,18 +1,33 @@ package types import ( + "context" "io" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) +type MultipartInitiator interface { + MultipartUploader + Initiate(ctx context.Context, objectName string) (MultipartInitState, error) + Complete(ctx context.Context, parts []UploadedPartInfo) (CompletedFileInfo, error) + Abort() +} + type MultipartUploader interface { - InitiateMultipartUpload(objectName string) (string, error) - UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*UploadPartOutput, error) - CompleteMultipartUpload(uploadID string, Key string, Parts []*UploadPartOutput) error - AbortMultipartUpload() + UploadPart(ctx context.Context, init MultipartInitState, objectName string, partSize int64, partNumber int, stream io.Reader) (UploadedPartInfo, error) Close() } -type UploadPartOutput struct { +type MultipartInitState struct { + UploadID string +} + +type UploadedPartInfo struct { PartNumber int ETag string } + +type CompletedFileInfo struct { + FileHash cdssdk.FileHash // 可以为空,为空代表获取不到FileHash值 +} diff --git a/common/pkgs/storage/types/temp_store.go b/common/pkgs/storage/types/temp_store.go index c88af30..02533c6 100644 --- a/common/pkgs/storage/types/temp_store.go +++ b/common/pkgs/storage/types/temp_store.go @@ -1,10 +1,10 @@ package types type TempStore interface { - // 生成并注册一个临时文件名。在名字有效期间此临时文件不会被清理 + // 生成并注册一个临时文件路径,在Commited或Drop之前,此文件不会被清理。 CreateTemp() string // 指示一个临时文件已经被移动作它用,不需要再关注它了(也不需要删除这个文件)。 Commited(filePath string) - // 临时文件被放弃,可以删除这个文件了 + // 临时文件被放弃,可以删除这个文件了。如果提前调用了Commited,则此函数应该什么也不做。 Drop(filePath string) } diff --git a/common/pkgs/storage/utils/utils.go b/common/pkgs/storage/utils/utils.go index 97b2ec7..df1ecf5 100644 --- a/common/pkgs/storage/utils/utils.go +++ b/common/pkgs/storage/utils/utils.go @@ -5,8 +5,21 @@ import ( "path/filepath" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" ) func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string { return filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", packageID)) } + +func FindFeature[T cdssdk.StorageFeature](detail stgmod.StorageDetail) T { + for _, f := range detail.Storage.Features { + f2, ok := f.(T) + if ok { + return f2 + } + } + + var def T + return def +}