From 6aa40deca0a761870dd11f945cebe8bd4da936b4 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 10 Jan 2025 10:16:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90S2S?= =?UTF-8?q?=E7=9A=84Op?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/test.go | 22 ++-- client/internal/services/storage.go | 6 +- common/pkgs/downloader/iterator.go | 2 +- common/pkgs/downloader/strip_iterator.go | 2 +- common/pkgs/ioswitch2/fromto.go | 8 +- common/pkgs/ioswitch2/ops2/bypass.go | 104 +++++++++++++-- common/pkgs/ioswitch2/ops2/multipart.go | 27 ++-- common/pkgs/ioswitch2/ops2/s2s.go | 114 +++++++++++++++++ common/pkgs/ioswitch2/ops2/shared_store.go | 9 +- common/pkgs/ioswitch2/parser/parser.go | 25 ++-- common/pkgs/ioswitch2/parser/s2s.go | 119 ++++++++++++++++++ common/pkgs/storage/factory/empty_builder.go | 22 ---- common/pkgs/storage/factory/factory.go | 8 +- common/pkgs/storage/local/local.go | 11 ++ common/pkgs/storage/local/s2s.go | 73 +++++++++++ common/pkgs/storage/local/shard_store.go | 26 +++- common/pkgs/storage/local/shared_store.go | 2 +- common/pkgs/storage/s3/obs/client.go | 27 ++++ common/pkgs/storage/s3/obs/s2s.go | 4 + common/pkgs/storage/s3/s3.go | 21 +--- common/pkgs/storage/s3/shard_store.go | 6 +- common/pkgs/storage/s3/shared_store.go | 2 +- common/pkgs/storage/types/bypass.go | 16 ++- common/pkgs/storage/types/empty_builder.go | 59 +++++++++ common/pkgs/storage/types/s2s.go | 15 ++- common/pkgs/storage/types/types.go | 8 +- common/pkgs/uploader/create_load.go | 2 +- common/pkgs/uploader/update.go | 2 +- .../event/check_package_redundancy.go | 12 +- scanner/internal/event/clean_pinned.go | 4 +- 30 files changed, 644 insertions(+), 114 deletions(-) create mode 100644 common/pkgs/ioswitch2/ops2/s2s.go create mode 100644 common/pkgs/ioswitch2/parser/s2s.go delete mode 100644 common/pkgs/storage/factory/empty_builder.go create mode 100644 common/pkgs/storage/local/s2s.go create mode 100644 common/pkgs/storage/s3/obs/client.go create mode 100644 common/pkgs/storage/s3/obs/s2s.go create mode 100644 common/pkgs/storage/types/empty_builder.go diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index d768084..35ecdf2 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -34,9 +34,9 @@ func init() { ft := ioswitch2.NewFromTo() ft.SegmentParam = cdssdk.NewSegmentRedundancy(1024*100*3, 3) - ft.AddFrom(ioswitch2.NewFromShardstore("FullE58B075E9F7C5744CB1C2CBBECC30F163DE699DCDA94641DDA34A0C2EB01E240", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0))) - ft.AddFrom(ioswitch2.NewFromShardstore("FullEA14D17544786427C3A766F0C5E6DEB221D00D3DE1875BBE3BD0AD5C8118C1A0", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1))) - ft.AddFrom(ioswitch2.NewFromShardstore("Full4D142C458F2399175232D5636235B09A84664D60869E925EB20FFBE931045BDD", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2))) + ft.AddFrom(ioswitch2.NewFromShardstore("FullE58B075E9F7C5744CB1C2CBBECC30F163DE699DCDA94641DDA34A0C2EB01E240", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(0))) + ft.AddFrom(ioswitch2.NewFromShardstore("FullEA14D17544786427C3A766F0C5E6DEB221D00D3DE1875BBE3BD0AD5C8118C1A0", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1))) + ft.AddFrom(ioswitch2.NewFromShardstore("Full4D142C458F2399175232D5636235B09A84664D60869E925EB20FFBE931045BDD", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2))) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[2].MasterHub, *stgs.Storages[2], ioswitch2.RawStream(), "0")) // ft.AddFrom(ioswitch2.NewFromShardstore("CA56E5934859E0220D1F3B848F41619D937D7B874D4EBF63A6CC98D2D8E3280F", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream())) // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) @@ -87,7 +87,7 @@ func init() { ft := ioswitch2.NewFromTo() ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3) - ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[0].MasterHub, stgs.Storages[0].Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.RawStream())) // ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0), "0")) ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1), "1", math2.Range{Offset: 1})) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2), "2")) @@ -134,9 +134,9 @@ func init() { ft := ioswitch2.NewFromTo() ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3) ft.ECParam = &cdssdk.DefaultECRedundancy - ft.AddFrom(ioswitch2.NewFromShardstore("22CC59CE3297F78F2D20DC1E33181B77F21E6782097C94E1664F99F129834069", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0))) - ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1))) - ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2))) + ft.AddFrom(ioswitch2.NewFromShardstore("22CC59CE3297F78F2D20DC1E33181B77F21E6782097C94E1664F99F129834069", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(0))) + ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1))) + ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2))) toDrv, drvStr := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.NewRange(0, 1293)) ft.AddTo(toDrv) @@ -203,7 +203,7 @@ func init() { ft := ioswitch2.NewFromTo() ft.ECParam = &cdssdk.DefaultECRedundancy - ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore("4E69A8B8CD9F42EDE371DA94458BADFB2308AFCA736AA393784A3D81F4746377", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.RawStream())) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECStream(0), "EC0")) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECStream(1), "EC1")) ft.AddTo(ioswitch2.NewToShardStore(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.ECStream(2), "EC2")) @@ -254,9 +254,9 @@ func init() { ft := ioswitch2.NewFromTo() ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3) ft.ECParam = &cdssdk.DefaultECRedundancy - ft.AddFrom(ioswitch2.NewFromShardstore("22CC59CE3297F78F2D20DC1E33181B77F21E6782097C94E1664F99F129834069", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0))) - ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1))) - ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2))) + ft.AddFrom(ioswitch2.NewFromShardstore("22CC59CE3297F78F2D20DC1E33181B77F21E6782097C94E1664F99F129834069", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(0))) + ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(1))) + ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, *stgs.Storages[1], ioswitch2.SegmentStream(2))) ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.RawStream(), "raw", math2.NewRange(10, 645))) plans := exec.NewPlanBuilder() diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 768361c..046de63 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -92,18 +92,18 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa ft := ioswitch2.NewFromTo() switch strg := strg.(type) { case *strategy.DirectStrategy: - ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage.Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage, ioswitch2.RawStream())) case *strategy.ECReconstructStrategy: for i, b := range strg.Blocks { - ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, *strg.Storages[i].MasterHub, strg.Storages[i].Storage, ioswitch2.ECStream(b.Index))) + ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, *strg.Storages[i].MasterHub, strg.Storages[i], ioswitch2.ECStream(b.Index))) ft.ECParam = &strg.Redundancy } default: return fmt.Errorf("unsupported download strategy: %T", strg) } - ft.AddTo(ioswitch2.NewLoadToShared(*destStg.MasterHub, destStg.Storage, path.Join(rootPath, obj.Object.Path))) + ft.AddTo(ioswitch2.NewLoadToShared(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path))) // 顺便保存到同存储服务的分片存储中 if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() { ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), "")) diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 37c061f..df66758 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -127,7 +127,7 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat toExec.Range.Length = &len } - ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage.Storage, ioswitch2.RawStream())).AddTo(toExec) + ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage, ioswitch2.RawStream())).AddTo(toExec) strHandle = handle plans := exec.NewPlanBuilder() diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index 1d6a117..ad7f94c 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -203,7 +203,7 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { ft.ECParam = &s.red for _, b := range s.blocks { stg := b.Storage - ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg.Storage, ioswitch2.ECStream(b.Block.Index))) + ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *stg.MasterHub, stg, ioswitch2.ECStream(b.Block.Index))) } toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.Range{ diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 62833e1..2c72e9b 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -112,11 +112,11 @@ func (f *FromDriver) GetStreamIndex() StreamIndex { type FromShardstore struct { FileHash cdssdk.FileHash Hub cdssdk.Hub - Storage cdssdk.Storage + Storage stgmod.StorageDetail StreamIndex StreamIndex } -func NewFromShardstore(fileHash cdssdk.FileHash, hub cdssdk.Hub, storage cdssdk.Storage, strIdx StreamIndex) *FromShardstore { +func NewFromShardstore(fileHash cdssdk.FileHash, hub cdssdk.Hub, storage stgmod.StorageDetail, strIdx StreamIndex) *FromShardstore { return &FromShardstore{ FileHash: fileHash, Hub: hub, @@ -197,11 +197,11 @@ func (t *ToShardStore) GetRange() math2.Range { type LoadToShared struct { Hub cdssdk.Hub - Storage cdssdk.Storage + Storage stgmod.StorageDetail ObjectPath string } -func NewLoadToShared(hub cdssdk.Hub, storage cdssdk.Storage, objectPath string) *LoadToShared { +func NewLoadToShared(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToShared { return &LoadToShared{ Hub: hub, Storage: storage, diff --git a/common/pkgs/ioswitch2/ops2/bypass.go b/common/pkgs/ioswitch2/ops2/bypass.go index 09613e8..c6726b9 100644 --- a/common/pkgs/ioswitch2/ops2/bypass.go +++ b/common/pkgs/ioswitch2/ops2/bypass.go @@ -14,6 +14,9 @@ func init() { exec.UseOp[*BypassToShardStore]() exec.UseVarValue[*BypassFileInfoValue]() exec.UseVarValue[*BypassHandleResultValue]() + + exec.UseOp[*BypassFromShardStore]() + exec.UseVarValue[*BypassFilePathValue]() } type BypassFileInfoValue struct { @@ -54,7 +57,7 @@ func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) er return err } - notifier, ok := shardStore.(types.BypassNotifier) + br, ok := shardStore.(types.BypassWrite) if !ok { return fmt.Errorf("shard store %v not support bypass", o.StorageID) } @@ -64,7 +67,7 @@ func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) er return err } - err = notifier.BypassUploaded(fileInfo.BypassFileInfo) + err = br.BypassUploaded(fileInfo.BypassFileInfo) if err != nil { return err } @@ -78,6 +81,52 @@ func (o *BypassToShardStore) String() string { return fmt.Sprintf("BypassToShardStore[StorageID:%v] Info: %v, Callback: %v", o.StorageID, o.BypassFileInfo, o.BypassCallback) } +type BypassFilePathValue struct { + types.BypassFilePath +} + +func (v *BypassFilePathValue) Clone() exec.VarValue { + return &BypassFilePathValue{ + BypassFilePath: v.BypassFilePath, + } +} + +type BypassFromShardStore struct { + StorageID cdssdk.StorageID + FileHash cdssdk.FileHash + Output exec.VarID +} + +func (o *BypassFromShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) + if err != nil { + return err + } + + shardStore, err := stgAgts.GetShardStore(o.StorageID) + if err != nil { + return err + } + + br, ok := shardStore.(types.BypassRead) + if !ok { + return fmt.Errorf("shard store %v not support bypass", o.StorageID) + } + + path, err := br.BypassRead(o.FileHash) + if err != nil { + return err + } + + e.PutVar(o.Output, &BypassFilePathValue{BypassFilePath: path}) + return nil +} + +func (o *BypassFromShardStore) String() string { + return fmt.Sprintf("BypassFromShardStore[StorageID:%v] FileHash: %v, Output: %v", o.StorageID, o.FileHash, o.Output) +} + +// 旁路写入 type BypassToShardStoreNode struct { dag.NodeBase StorageID cdssdk.StorageID @@ -103,19 +152,58 @@ func (n *BypassToShardStoreNode) BypassFileInfoSlot() dag.ValueInputSlot { } } -func (n *BypassToShardStoreNode) BypassCallbackVar() *dag.ValueVar { - return n.OutputValues().Get(0) +func (n *BypassToShardStoreNode) BypassCallbackVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 0, + } } -func (n *BypassToShardStoreNode) FileHashVar() *dag.ValueVar { - return n.OutputValues().Get(1) +func (n *BypassToShardStoreNode) FileHashVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 1, + } } func (t *BypassToShardStoreNode) GenerateOp() (exec.Op, error) { return &BypassToShardStore{ StorageID: t.StorageID, BypassFileInfo: t.BypassFileInfoSlot().Var().VarID, - BypassCallback: t.BypassCallbackVar().VarID, - FileHash: t.FileHashVar().VarID, + BypassCallback: t.BypassCallbackVar().Var().VarID, + FileHash: t.FileHashVar().Var().VarID, + }, nil +} + +// 旁路读取 +type BypassFromShardStoreNode struct { + dag.NodeBase + StorageID cdssdk.StorageID + FileHash cdssdk.FileHash +} + +func (b *GraphNodeBuilder) NewBypassFromShardStore(storageID cdssdk.StorageID, fileHash cdssdk.FileHash) *BypassFromShardStoreNode { + node := &BypassFromShardStoreNode{ + StorageID: storageID, + FileHash: fileHash, + } + b.AddNode(node) + + node.OutputValues().Init(node, 1) + return node +} + +func (n *BypassFromShardStoreNode) FilePathVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 0, + } +} + +func (n *BypassFromShardStoreNode) GenerateOp() (exec.Op, error) { + return &BypassFromShardStore{ + StorageID: n.StorageID, + FileHash: n.FileHash, + Output: n.FilePathVar().Var().VarID, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index 133aec4..db64d2a 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -166,12 +166,18 @@ func (b *GraphNodeBuilder) NewMultipartInitiator(storage stgmod.StorageDetail) * return node } -func (n *MultipartInitiatorNode) UploadArgsVar() *dag.ValueVar { - return n.OutputValues().Get(0) +func (n *MultipartInitiatorNode) UploadArgsVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 0, + } } -func (n *MultipartInitiatorNode) BypassFileInfoVar() *dag.ValueVar { - return n.OutputValues().Get(1) +func (n *MultipartInitiatorNode) BypassFileInfoVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 1, + } } func (n *MultipartInitiatorNode) BypassCallbackSlot() dag.ValueInputSlot { @@ -191,9 +197,9 @@ func (n *MultipartInitiatorNode) AppendPartInfoSlot() dag.ValueInputSlot { func (n *MultipartInitiatorNode) GenerateOp() (exec.Op, error) { return &MultipartInitiator{ Storage: n.Storage, - UploadArgs: n.UploadArgsVar().VarID, + UploadArgs: n.UploadArgsVar().Var().VarID, UploadedParts: n.InputValues().GetVarIDsStart(1), - BypassFileOutput: n.BypassFileInfoVar().VarID, + BypassFileOutput: n.BypassFileInfoVar().Var().VarID, BypassCallback: n.BypassCallbackSlot().Var().VarID, }, nil } @@ -226,8 +232,11 @@ func (n *MultipartUploadNode) UploadArgsSlot() dag.ValueInputSlot { } } -func (n *MultipartUploadNode) UploadResultVar() *dag.ValueVar { - return n.OutputValues().Get(0) +func (n *MultipartUploadNode) UploadResultVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 0, + } } func (n *MultipartUploadNode) PartStreamSlot() dag.StreamInputSlot { @@ -241,7 +250,7 @@ func (n *MultipartUploadNode) GenerateOp() (exec.Op, error) { return &MultipartUpload{ Storage: n.Storage, UploadArgs: n.UploadArgsSlot().Var().VarID, - UploadResult: n.UploadResultVar().VarID, + UploadResult: n.UploadResultVar().Var().VarID, PartStream: n.PartStreamSlot().Var().VarID, PartNumber: n.PartNumber, PartSize: n.PartSize, diff --git a/common/pkgs/ioswitch2/ops2/s2s.go b/common/pkgs/ioswitch2/ops2/s2s.go new file mode 100644 index 0000000..202cad8 --- /dev/null +++ b/common/pkgs/ioswitch2/ops2/s2s.go @@ -0,0 +1,114 @@ +package ops2 + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +func init() { + exec.UseOp[*S2STransfer]() +} + +type S2STransfer struct { + Src stgmod.StorageDetail + SrcPath exec.VarID + Dst stgmod.StorageDetail + Output exec.VarID + BypassCallback exec.VarID +} + +func (o *S2STransfer) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + srcPath, err := exec.BindVar[*BypassFilePathValue](e, ctx.Context, o.SrcPath) + if err != nil { + return err + } + + s2s, err := factory.GetBuilder(o.Dst).CreateS2STransfer() + if err != nil { + return err + } + + // 传输文件 + dstPath, err := s2s.Transfer(ctx.Context, o.Src, srcPath.Path) + if err != nil { + return err + } + defer s2s.Abort() + + // 告知后续Op处理临时文件 + e.PutVar(o.Output, &BypassFileInfoValue{BypassFileInfo: types.BypassFileInfo{ + TempFilePath: dstPath, + FileHash: srcPath.Info.Hash, + Size: srcPath.Info.Size, + }}) + + // 等待后续Op处理临时文件 + cb, err := exec.BindVar[*BypassHandleResultValue](e, ctx.Context, o.BypassCallback) + if err != nil { + return fmt.Errorf("getting temp file callback: %v", err) + } + + if cb.Commited { + s2s.Complete() + } + + return nil +} + +func (o *S2STransfer) String() string { + return fmt.Sprintf("S2STransfer(%v@%v -> %v:%v)", o.Src.Storage.String(), o.SrcPath, o.Dst.Storage.String(), o.Output) +} + +type S2STransferNode struct { + dag.NodeBase + Src stgmod.StorageDetail + Dst stgmod.StorageDetail +} + +func (b *GraphNodeBuilder) NewS2STransfer(src stgmod.StorageDetail, dst stgmod.StorageDetail) *S2STransferNode { + n := &S2STransferNode{ + Src: src, + Dst: dst, + } + + n.OutputValues().Init(n, 1) + n.InputValues().Init(2) + + return n +} + +func (n *S2STransferNode) SrcPathSlot() dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: n, + Index: 0, + } +} + +func (n *S2STransferNode) BypassCallbackSlot() dag.ValueInputSlot { + return dag.ValueInputSlot{ + Node: n, + Index: 1, + } +} + +func (n *S2STransferNode) BypassFileInfoVar() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: n, + Index: 0, + } +} + +func (n *S2STransferNode) GenerateOp() (exec.Op, error) { + return &S2STransfer{ + Src: n.Src, + SrcPath: n.SrcPathSlot().Var().VarID, + Dst: n.Dst, + Output: n.BypassFileInfoVar().Var().VarID, + BypassCallback: n.BypassCallbackSlot().Var().VarID, + }, nil +} diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/shared_store.go index d370c5e..1d1bb11 100644 --- a/common/pkgs/ioswitch2/ops2/shared_store.go +++ b/common/pkgs/ioswitch2/ops2/shared_store.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" ) @@ -53,14 +54,14 @@ func (o *SharedLoad) String() string { type SharedLoadNode struct { dag.NodeBase To ioswitch2.To - StorageID cdssdk.StorageID + Storage stgmod.StorageDetail ObjectPath string } -func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stgID cdssdk.StorageID, objPath string) *SharedLoadNode { +func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *SharedLoadNode { node := &SharedLoadNode{ To: to, - StorageID: stgID, + Storage: stg, ObjectPath: objPath, } b.AddNode(node) @@ -87,7 +88,7 @@ func (t *SharedLoadNode) Input() dag.StreamInputSlot { func (t *SharedLoadNode) GenerateOp() (exec.Op, error) { return &SharedLoad{ Input: t.InputStreams().Get(0).VarID, - StorageID: t.StorageID, + StorageID: t.Storage.Storage.StorageID, ObjectPath: t.ObjectPath, }, nil } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index dbc945e..fb933ad 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -27,6 +27,7 @@ type ParseContext struct { // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 ToNodes map[ioswitch2.To]ops2.ToNode + FromNodes map[ioswitch2.From]ops2.FromNode IndexedStreams []IndexedStream StreamRange math2.Range UseEC bool // 是否使用纠删码 @@ -35,9 +36,10 @@ type ParseContext struct { func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { ctx := ParseContext{ - Ft: ft, - DAG: ops2.NewGraphNodeBuilder(), - ToNodes: make(map[ioswitch2.To]ops2.ToNode), + Ft: ft, + DAG: ops2.NewGraphNodeBuilder(), + ToNodes: make(map[ioswitch2.To]ops2.ToNode), + FromNodes: make(map[ioswitch2.From]ops2.FromNode), } // 分成两个阶段: @@ -105,6 +107,7 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { // 下面这些只需要执行一次,但需要按顺序 removeUnusedFromNode(&ctx) + useS2STransfer(&ctx) useMultipartUploadToShardStore(&ctx) dropUnused(&ctx) storeShardWriteResult(&ctx) @@ -221,6 +224,7 @@ func extend(ctx *ParseContext) error { if err != nil { return err } + ctx.FromNodes[fr] = frNode ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ Stream: frNode.Output().Var(), @@ -368,7 +372,7 @@ func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) { switch f := f.(type) { case *ioswitch2.FromShardstore: - t := ctx.DAG.NewShardRead(f, f.Storage.StorageID, types.NewOpen(f.FileHash)) + t := ctx.DAG.NewShardRead(f, f.Storage.Storage.StorageID, types.NewOpen(f.FileHash)) if f.StreamIndex.IsRaw() { t.Open.WithNullableLength(repRange.Offset, repRange.Length) @@ -471,7 +475,7 @@ func buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) { return n, nil case *ioswitch2.LoadToShared: - n := ctx.DAG.NewSharedLoad(t, t.Storage.StorageID, t.ObjectPath) + n := ctx.DAG.NewSharedLoad(t, t.Storage, t.ObjectPath) if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err @@ -979,9 +983,7 @@ func useMultipartUploadToShardStore(ctx *ParseContext) { // 最后删除Join指令和ToShardStore指令 ctx.DAG.RemoveNode(joinNode) ctx.DAG.RemoveNode(shardNode) - // 因为ToShardStore已经被替换,所以对应的To也要删除。 - // 虽然会跳过后续的Range过程,但由于之前做的流范围判断,不加Range也可以 - ctx.Ft.Toes = lo2.Remove(ctx.Ft.Toes, shardNode.GetTo()) + delete(ctx.ToNodes, shardNode.GetTo()) return true }) } @@ -1008,17 +1010,14 @@ func storeShardWriteResult(ctx *ParseContext) { storeNode := ctx.DAG.NewStore() storeNode.Env().ToEnvDriver() - storeNode.Store(n.FileHashStoreKey, n.FileHashVar()) + storeNode.Store(n.FileHashStoreKey, n.FileHashVar().Var()) return true }) } // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func generateRange(ctx *ParseContext) { - for i := 0; i < len(ctx.Ft.Toes); i++ { - to := ctx.Ft.Toes[i] - toNode := ctx.ToNodes[to] - + for to, toNode := range ctx.ToNodes { toStrIdx := to.GetStreamIndex() toRng := to.GetRange() diff --git a/common/pkgs/ioswitch2/parser/s2s.go b/common/pkgs/ioswitch2/parser/s2s.go new file mode 100644 index 0000000..409bafc --- /dev/null +++ b/common/pkgs/ioswitch2/parser/s2s.go @@ -0,0 +1,119 @@ +package parser + +import ( + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" +) + +// 将直接从一个存储服务传到另一个存储服务的过程换成S2S传输 +func useS2STransfer(ctx *ParseContext) { + // S2S传输暂不支持只传文件的一部分 + if ctx.StreamRange.Offset != 0 || ctx.StreamRange.Length != nil { + return + } + + for fr, frNode := range ctx.FromNodes { + fromShard, ok := fr.(*ioswitch2.FromShardstore) + if !ok { + continue + } + + s2s, err := factory.GetBuilder(fromShard.Storage).CreateS2STransfer() + if err != nil { + continue + } + + // 此输出流的所有目的地都要能支持S2S传输 + outVar := frNode.Output().Var() + if outVar.Dst.Len() == 0 { + continue + } + + failed := false + var toShards []*ops2.ShardWriteNode + // var toShareds []*ops2.SharedLoadNode + + loop: + for i := 0; i < outVar.Dst.Len(); i++ { + dstNode := outVar.Dst.Get(i) + + switch dstNode := dstNode.(type) { + case *ops2.ShardWriteNode: + if !s2s.CanTransfer(dstNode.Storage) { + failed = true + break + } + + toShards = append(toShards, dstNode) + + /* TODO 暂不支持共享存储服务 + case *ops2.SharedLoadNode: + if !s2s.CanTransfer(to.Storage) { + failed = true + break + } + toShareds = append(toShareds, to) + */ + default: + failed = true + break loop + } + } + if failed { + continue + } + + for _, toShard := range toShards { + s2sNode := ctx.DAG.NewS2STransfer(fromShard.Storage, toShard.Storage) + // 直传指令在目的地Hub上执行 + s2sNode.Env().CopyFrom(toShard.Env()) + + // 先获取文件路径,送到S2S节点 + brNode := ctx.DAG.NewBypassFromShardStore(fromShard.Storage.Storage.StorageID, fromShard.FileHash) + brNode.Env().CopyFrom(frNode.Env()) + brNode.FilePathVar().ToSlot(s2sNode.SrcPathSlot()) + + // 传输结果通知目的节点 + to := toShard.To.(*ioswitch2.ToShardStore) + bwNode := ctx.DAG.NewBypassToShardStore(toShard.Storage.Storage.StorageID, to.FileHashStoreKey) + bwNode.Env().CopyFrom(toShard.Env()) + + s2sNode.BypassFileInfoVar().ToSlot(bwNode.BypassFileInfoSlot()) + bwNode.BypassCallbackVar().ToSlot(s2sNode.BypassCallbackSlot()) + + // 从计划中删除目标节点 + ctx.DAG.RemoveNode(toShard) + delete(ctx.ToNodes, toShard.To) + } + + /* + for _, toShared := range toShareds { + s2sNode := ctx.DAG.NewS2STransfer(fromShard.Storage, toShared.Storage) + // 直传指令在目的地Hub上执行 + s2sNode.Env().CopyFrom(toShared.Env()) + + // 先获取文件路径,送到S2S节点 + brNode := ctx.DAG.NewBypassFromShardStore(fromShard.Storage.Storage.StorageID, fromShard.FileHash) + brNode.Env().CopyFrom(toShared.Env()) + brNode.FilePathVar().ToSlot(s2sNode.SrcPathSlot()) + + // 传输结果通知目的节点 + to := toShared.To.(*ioswitch2.LoadToShared) + bwNode := ctx.DAG.NewBypassToShardStore(toShard.Storage.Storage.StorageID, to.FileHashStoreKey) + bwNode.Env().CopyFrom(toShard.Env()) + + s2sNode.BypassFileInfoVar().ToSlot(bwNode.BypassFileInfoSlot()) + bwNode.BypassCallbackVar().ToSlot(s2sNode.BypassCallbackSlot()) + + // 从计划中删除目标节点 + ctx.DAG.RemoveNode(toShared) + delete(ctx.ToNodes, toShared.To) + } + */ + + // 从计划中删除源节点 + ctx.DAG.RemoveNode(frNode) + delete(ctx.FromNodes, fr) + } +} diff --git a/common/pkgs/storage/factory/empty_builder.go b/common/pkgs/storage/factory/empty_builder.go deleted file mode 100644 index 0b13b1d..0000000 --- a/common/pkgs/storage/factory/empty_builder.go +++ /dev/null @@ -1,22 +0,0 @@ -package factory - -import ( - "fmt" - - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -) - -type EmptyBuilder struct { - detail stgmod.StorageDetail -} - -// 创建一个在MasterHub上长期运行的存储服务 -func (b *EmptyBuilder) CreateAgent() (types.StorageAgent, error) { - return nil, fmt.Errorf("create agent for %T: %w", b.detail.Storage.Type, types.ErrUnsupported) -} - -// 创建一个分片上传组件 -func (b *EmptyBuilder) CreateMultiparter() (types.Multiparter, error) { - return nil, fmt.Errorf("create multipart initiator for %T: %w", b.detail.Storage.Type, types.ErrUnsupported) -} diff --git a/common/pkgs/storage/factory/factory.go b/common/pkgs/storage/factory/factory.go index 5026e9d..ae46ceb 100644 --- a/common/pkgs/storage/factory/factory.go +++ b/common/pkgs/storage/factory/factory.go @@ -16,5 +16,11 @@ import ( // 此Builder的所有函数都会返回否定值或者封装后的ErrUnsupported错误(需要使用errors.Is检查) func GetBuilder(detail stgmod.StorageDetail) types.StorageBuilder { typ := reflect.TypeOf(detail.Storage.Type) - return reg.StorageBuilders[typ](detail) + + ctor, ok := reg.StorageBuilders[typ] + if !ok { + return &types.EmptyBuilder{} + } + + return ctor(detail) } diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index 64b17ab..ca49db8 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -76,3 +76,14 @@ func (b *builder) CreateMultiparter() (types.Multiparter, error) { feat: feat, }, nil } + +func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { + feat := utils.FindFeature[*cdssdk.S2STransferFeature](b.detail) + if feat == nil { + return nil, fmt.Errorf("feature %T not found", cdssdk.S2STransferFeature{}) + } + + return &S2STransfer{ + detail: b.detail, + }, nil +} diff --git a/common/pkgs/storage/local/s2s.go b/common/pkgs/storage/local/s2s.go new file mode 100644 index 0000000..c7b8126 --- /dev/null +++ b/common/pkgs/storage/local/s2s.go @@ -0,0 +1,73 @@ +package local + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/os2" + stgmod "gitlink.org.cn/cloudream/storage/common/models" +) + +type S2STransfer struct { + feat cdssdk.S2STransferFeature + detail stgmod.StorageDetail + dstPath string +} + +// 只有同一个机器的存储之间才可以进行数据直传 +func (s *S2STransfer) CanTransfer(src stgmod.StorageDetail) bool { + _, ok := src.Storage.Type.(*cdssdk.LocalStorageType) + if !ok { + return false + } + + if src.Storage.MasterHub != s.detail.Storage.MasterHub { + return false + } + + return true +} + +// 执行数据直传 +func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, srcPath string) (string, error) { + absTempDir, err := filepath.Abs(s.feat.TempDir) + if err != nil { + return "", fmt.Errorf("get abs temp dir %v: %v", s.feat.TempDir, err) + } + + tempFileName := os2.GenerateRandomFileName(10) + s.dstPath = filepath.Join(absTempDir, tempFileName) + + copy, err := os.OpenFile(s.dstPath, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return "", err + } + defer copy.Close() + + srcFile, err := os.Open(srcPath) + if err != nil { + return "", err + } + defer srcFile.Close() + + _, err = io.Copy(copy, srcFile) + if err != nil { + return "", err + } + + return s.dstPath, nil +} + +func (s *S2STransfer) Complete() { + +} + +func (s *S2STransfer) Abort() { + if s.dstPath != "" { + os.Remove(s.dstPath) + } +} diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 7682f59..f367160 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -30,7 +30,11 @@ func (s *ShardStoreDesc) Enabled() bool { return s.builder.detail.Storage.ShardStore != nil } -func (s *ShardStoreDesc) HasBypassNotifier() bool { +func (s *ShardStoreDesc) HasBypassWrite() bool { + return true +} + +func (s *ShardStoreDesc) HasBypassRead() bool { return true } @@ -422,6 +426,26 @@ func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error { return nil } +func (s *ShardStore) BypassRead(fileHash cdssdk.FileHash) (types.BypassFilePath, error) { + s.lock.Lock() + defer s.lock.Unlock() + + filePath := s.getFilePathFromHash(fileHash) + stat, err := os.Stat(filePath) + if err != nil { + return types.BypassFilePath{}, err + } + + return types.BypassFilePath{ + Path: filePath, + Info: types.FileInfo{ + Hash: fileHash, + Size: stat.Size(), + Description: filePath, + }, + }, nil +} + func (s *ShardStore) getLogger() logger.Logger { return logger.WithField("ShardStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) } diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/shared_store.go index 9bfd895..bfd9730 100644 --- a/common/pkgs/storage/local/shared_store.go +++ b/common/pkgs/storage/local/shared_store.go @@ -18,7 +18,7 @@ func (d *SharedStoreDesc) Enabled() bool { return d.builder.detail.Storage.SharedStore != nil } -func (d *SharedStoreDesc) HasBypassNotifier() bool { +func (d *SharedStoreDesc) HasBypassWrite() bool { return false } diff --git a/common/pkgs/storage/s3/obs/client.go b/common/pkgs/storage/s3/obs/client.go new file mode 100644 index 0000000..e2f73c0 --- /dev/null +++ b/common/pkgs/storage/s3/obs/client.go @@ -0,0 +1,27 @@ +package obs + +import ( + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func CreateS2Client(addr *cdssdk.OBSType) (*s3.Client, string, error) { + awsConfig := aws.Config{} + + cre := aws.Credentials{ + AccessKeyID: addr.AK, + SecretAccessKey: addr.SK, + } + awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre} + awsConfig.Region = addr.Region + + options := []func(*s3.Options){} + options = append(options, func(s3Opt *s3.Options) { + s3Opt.BaseEndpoint = &addr.Endpoint + }) + + cli := s3.NewFromConfig(awsConfig, options...) + return cli, addr.Bucket, nil +} diff --git a/common/pkgs/storage/s3/obs/s2s.go b/common/pkgs/storage/s3/obs/s2s.go new file mode 100644 index 0000000..b583ef8 --- /dev/null +++ b/common/pkgs/storage/s3/obs/s2s.go @@ -0,0 +1,4 @@ +package obs + +type S2STransfer struct { +} diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/s3/s3.go index e84cfef..7b42391 100644 --- a/common/pkgs/storage/s3/s3.go +++ b/common/pkgs/storage/s3/s3.go @@ -3,12 +3,11 @@ package s3 import ( "fmt" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3/obs" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" ) @@ -20,6 +19,7 @@ func init() { } type builder struct { + types.EmptyBuilder detail stgmod.StorageDetail } @@ -86,22 +86,7 @@ func createS3Client(addr cdssdk.StorageType) (*s3.Client, string, error) { // case *cdssdk.OSSType: case *cdssdk.OBSType: - awsConfig := aws.Config{} - - cre := aws.Credentials{ - AccessKeyID: addr.AK, - SecretAccessKey: addr.SK, - } - awsConfig.Credentials = &credentials.StaticCredentialsProvider{Value: cre} - awsConfig.Region = addr.Region - - options := []func(*s3.Options){} - options = append(options, func(s3Opt *s3.Options) { - s3Opt.BaseEndpoint = &addr.Endpoint - }) - - cli := s3.NewFromConfig(awsConfig, options...) - return cli, addr.Bucket, nil + return obs.CreateS2Client(addr) default: return nil, "", fmt.Errorf("unsupported storage type %T", addr) diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 498e618..ac75883 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -33,10 +33,14 @@ func (s *ShardStoreDesc) Enabled() bool { return s.builder.detail.Storage.ShardStore != nil } -func (s *ShardStoreDesc) HasBypassNotifier() bool { +func (s *ShardStoreDesc) HasBypassWrite() bool { return true } +func (s *ShardStoreDesc) HasBypassRead() bool { + return false +} + type ShardStoreOption struct { UseAWSSha256 bool // 能否直接使用AWS提供的SHA256校验,如果不行,则使用本地计算。默认使用本地计算。 } diff --git a/common/pkgs/storage/s3/shared_store.go b/common/pkgs/storage/s3/shared_store.go index 1805946..bb822e8 100644 --- a/common/pkgs/storage/s3/shared_store.go +++ b/common/pkgs/storage/s3/shared_store.go @@ -7,6 +7,6 @@ func (d *SharedStoreDesc) Enabled() bool { return false } -func (d *SharedStoreDesc) HasBypassNotifier() bool { +func (d *SharedStoreDesc) HasBypassWrite() bool { return false } diff --git a/common/pkgs/storage/types/bypass.go b/common/pkgs/storage/types/bypass.go index 7c33e68..5e8d23f 100644 --- a/common/pkgs/storage/types/bypass.go +++ b/common/pkgs/storage/types/bypass.go @@ -10,6 +10,20 @@ type BypassFileInfo struct { Size int64 } -type BypassNotifier interface { +// 不通过ShardStore上传文件,但上传完成后需要通知ShardStore。 +// 也可以用于共享存储。 +type BypassWrite interface { BypassUploaded(info BypassFileInfo) error } + +// 描述指定文件在分片存储中的路径。可以考虑设计成interface。 +type BypassFilePath struct { + Path string + Info FileInfo +} + +// 不通过ShardStore读取文件,但需要它返回文件的路径。 +// 仅用于分片存储。 +type BypassRead interface { + BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error) +} diff --git a/common/pkgs/storage/types/empty_builder.go b/common/pkgs/storage/types/empty_builder.go new file mode 100644 index 0000000..be0178c --- /dev/null +++ b/common/pkgs/storage/types/empty_builder.go @@ -0,0 +1,59 @@ +package types + +import ( + "fmt" + + stgmod "gitlink.org.cn/cloudream/storage/common/models" +) + +type EmptyBuilder struct { + Detail stgmod.StorageDetail +} + +// 创建一个在MasterHub上长期运行的存储服务 +func (b *EmptyBuilder) CreateAgent() (StorageAgent, error) { + return nil, fmt.Errorf("create agent for %T: %w", b.Detail.Storage.Type, ErrUnsupported) +} + +func (b *EmptyBuilder) ShardStoreDesc() ShardStoreDesc { + return &EmptyShardStoreDesc{} +} + +func (b *EmptyBuilder) SharedStoreDesc() SharedStoreDesc { + return &EmptySharedStoreDesc{} +} + +// 创建一个分片上传组件 +func (b *EmptyBuilder) CreateMultiparter() (Multiparter, error) { + return nil, fmt.Errorf("create multipart initiator for %T: %w", b.Detail.Storage.Type, ErrUnsupported) +} + +func (b *EmptyBuilder) CreateS2STransfer() (S2STransfer, error) { + return nil, fmt.Errorf("create s2s transfer for %T: %w", b.Detail.Storage.Type, ErrUnsupported) +} + +type EmptyShardStoreDesc struct { +} + +func (d *EmptyShardStoreDesc) Enabled() bool { + return false +} + +func (d *EmptyShardStoreDesc) HasBypassWrite() bool { + return false +} + +func (d *EmptyShardStoreDesc) HasBypassRead() bool { + return false +} + +type EmptySharedStoreDesc struct { +} + +func (d *EmptySharedStoreDesc) Enabled() bool { + return false +} + +func (d *EmptySharedStoreDesc) HasBypassWrite() bool { + return false +} diff --git a/common/pkgs/storage/types/s2s.go b/common/pkgs/storage/types/s2s.go index 58103b7..fccb1f2 100644 --- a/common/pkgs/storage/types/s2s.go +++ b/common/pkgs/storage/types/s2s.go @@ -1,7 +1,18 @@ package types -import stgmod "gitlink.org.cn/cloudream/storage/common/models" +import ( + "context" + + stgmod "gitlink.org.cn/cloudream/storage/common/models" +) type S2STransfer interface { - Transfer(src stgmod.StorageDetail, srcPath string, dstPath string) error + // 判断是否能从指定的源存储中直传到当前存储的目的路径 + CanTransfer(src stgmod.StorageDetail) bool + // 执行数据直传。返回传输后的文件路径 + Transfer(ctx context.Context, src stgmod.StorageDetail, srcPath string) (string, error) + // 完成传输 + Complete() + // 取消传输。如果已经调用了Complete,则这个方法应该无效果 + Abort() } diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 9612d5f..11b2516 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -48,18 +48,22 @@ type StorageBuilder interface { SharedStoreDesc() SharedStoreDesc // 创建一个分片上传组件 CreateMultiparter() (Multiparter, error) + // 创建一个存储服务直传组件 + CreateS2STransfer() (S2STransfer, error) } type ShardStoreDesc interface { // 是否已启动 Enabled() bool // 是否能旁路上传 - HasBypassNotifier() bool + HasBypassWrite() bool + // 是否能旁路读取 + HasBypassRead() bool } type SharedStoreDesc interface { // 是否已启动 Enabled() bool // 是否能旁路上传 - HasBypassNotifier() bool + HasBypassWrite() bool } diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index a0ec252..16590b4 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -45,7 +45,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err ft.AddFrom(fromExec) for i, stg := range u.targetStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "fileHash")) - ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, path.Join(u.loadRoots[i], pa))) + ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa))) stgIDs = append(stgIDs, stg.Storage.StorageID) } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 0a19a86..862b9f6 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -51,7 +51,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash")) for i, stg := range w.loadToStgs { - ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, path.Join(w.loadToPath[i], pat))) + ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) } plans := exec.NewPlanBuilder() diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index affd35c..47e149a 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -463,7 +463,7 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) for i, stg := range uploadStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i))) } @@ -514,7 +514,7 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD ft := ioswitch2.NewFromTo() ft.ECParam = red - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) for i := 0; i < red.N; i++ { ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i))) } @@ -614,7 +614,7 @@ func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.Object ft := ioswitch2.NewFromTo() ft.SegmentParam = red - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) for i, stg := range uploadStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i))) } @@ -667,7 +667,7 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream())) for i, stg := range uploadStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i))) } @@ -746,7 +746,7 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe ft.ECParam = srcRed for i2, block := range chosenBlocks { - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2].Storage, ioswitch2.ECStream(block.Index))) + ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2], ioswitch2.ECStream(block.Index))) } len := obj.Object.Size @@ -842,7 +842,7 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet ft := ioswitch2.NewFromTo() ft.ECParam = srcRed for i2, block := range chosenBlocks { - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2].Storage, ioswitch2.ECStream(block.Index))) + ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2], ioswitch2.ECStream(block.Index))) } // 输出只需要自己要保存的那一块 diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index a8ff0b0..4af4c90 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -741,7 +741,7 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st ft := ioswitch2.NewFromTo() fromStg := allStgInfos[obj.Blocks[0].StorageID] - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, fromStg.Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, *fromStg, ioswitch2.RawStream())) toStg := allStgInfos[solu.blockList[i].StorageID] ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, *toStg, ioswitch2.RawStream(), fmt.Sprintf("%d.0", obj.Object.ObjectID))) @@ -799,7 +799,7 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg for id, idxs := range reconstrct { ft := ioswitch2.NewFromTo() ft.ECParam = ecRed - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *allStgInfos[id].MasterHub, allStgInfos[id].Storage, ioswitch2.RawStream())) + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *allStgInfos[id].MasterHub, *allStgInfos[id], ioswitch2.RawStream())) for _, i := range *idxs { ft.AddTo(ioswitch2.NewToShardStore(*allStgInfos[id].MasterHub, *allStgInfos[id], ioswitch2.ECStream(i), fmt.Sprintf("%d.%d", obj.Object.ObjectID, i)))