From 181fb5e5be0ba0f56519641d5cb64d8affc32886 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 9 Jan 2025 11:23:05 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/services/cache.go | 3 +- client/internal/services/storage.go | 3 +- common/pkgs/ioswitch2/ops2/multipart.go | 27 ++-- common/pkgs/ioswitch2/parser/parser.go | 12 +- common/pkgs/storage/agtpool/pool.go | 7 +- common/pkgs/storage/factory/empty_builder.go | 22 ++++ common/pkgs/storage/factory/factory.go | 4 +- common/pkgs/storage/factory/reg/reg.go | 9 +- common/pkgs/storage/local/local.go | 53 ++++---- common/pkgs/storage/local/multipart_upload.go | 104 +++++++++------ common/pkgs/storage/s3/multipart_upload.go | 122 +++++++++++------- common/pkgs/storage/s3/s3.go | 63 ++++----- common/pkgs/storage/types/s3_client.go | 17 ++- common/pkgs/storage/types/types.go | 27 ++-- common/pkgs/uploader/uploader.go | 3 +- 15 files changed, 270 insertions(+), 206 deletions(-) create mode 100644 common/pkgs/storage/factory/empty_builder.go diff --git a/client/internal/services/cache.go b/client/internal/services/cache.go index 964c2a5..8842593 100644 --- a/client/internal/services/cache.go +++ b/client/internal/services/cache.go @@ -9,6 +9,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" ) type CacheService struct { @@ -31,7 +32,7 @@ func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID c return 0, "", fmt.Errorf("get storage detail: %w", err) } - if getStg.Storages[0].Storage.ShardStore == nil { + if !factory.GetBuilder(*getStg.Storages[0]).HasShardStore() { return 0, "", fmt.Errorf("shard storage is not enabled") } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index de2673f..8240c35 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -17,6 +17,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" ) type StorageService struct { @@ -104,7 +105,7 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa ft.AddTo(ioswitch2.NewLoadToShared(*destStg.MasterHub, destStg.Storage, path.Join(rootPath, obj.Object.Path))) // 顺便保存到同存储服务的分片存储中 - if destStg.Storage.ShardStore != nil { + if factory.GetBuilder(*destStg).HasShardStore() { ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), "")) pinned = append(pinned, obj.Object.ObjectID) } diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index f853edd..133aec4 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -49,24 +49,21 @@ type MultipartInitiator struct { func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) error { blder := factory.GetBuilder(o.Storage) - if blder == nil { - return fmt.Errorf("unsupported storage type: %T", o.Storage.Storage.Type) - } - - initiator, err := blder.CreateMultipartInitiator(o.Storage) + multi, err := blder.CreateMultiparter() if err != nil { return err } - defer initiator.Abort() - // 启动一个新的上传任务 - initState, err := initiator.Initiate(ctx.Context) + // 启动一个新的上传任务W + multiTask, err := multi.Initiate(ctx.Context) if err != nil { return err } + defer multiTask.Abort() + // 分发上传参数 e.PutVar(o.UploadArgs, &MultipartUploadArgsValue{ - InitState: initState, + InitState: multiTask.InitState(), }) // 收集分片上传结果 @@ -81,7 +78,7 @@ func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) er } // 合并分片 - fileInfo, err := initiator.JoinParts(ctx.Context, partInfos) + fileInfo, err := multiTask.JoinParts(ctx.Context, partInfos) if err != nil { return fmt.Errorf("completing multipart upload: %v", err) } @@ -98,7 +95,7 @@ func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) er } if cb.Commited { - initiator.Complete() + multiTask.Complete() } return nil @@ -119,10 +116,6 @@ type MultipartUpload struct { func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error { blder := factory.GetBuilder(o.Storage) - if blder == nil { - return fmt.Errorf("unsupported storage type: %T", o.Storage.Storage.Type) - } - uploadArgs, err := exec.BindVar[*MultipartUploadArgsValue](e, ctx.Context, o.UploadArgs) if err != nil { return err @@ -134,13 +127,13 @@ func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error } defer partStr.Stream.Close() - uploader, err := blder.CreateMultipartUploader(o.Storage) + multi, err := blder.CreateMultiparter() if err != nil { return err } startTime := time.Now() - uploadedInfo, err := uploader.UploadPart(ctx.Context, uploadArgs.InitState, o.PartSize, o.PartNumber, partStr.Stream) + uploadedInfo, err := multi.UploadPart(ctx.Context, uploadArgs.InitState, o.PartSize, o.PartNumber, partStr.Stream) if err != nil { return err } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index e3134dc..dbc945e 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -12,8 +12,8 @@ import ( "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" ) type IndexedStream struct { @@ -914,15 +914,15 @@ func useMultipartUploadToShardStore(ctx *ParseContext) { } // Join的目的地必须支持MultipartUpload功能才能替换成分片上传 - multiUpload := utils.FindFeature[*cdssdk.MultipartUploadFeature](shardNode.Storage) - if multiUpload == nil { + multiUpload, err := factory.GetBuilder(shardNode.Storage).CreateMultiparter() + if err != nil { return true } // Join的每一个段的大小必须超过最小分片大小。 // 目前只支持拆分超过最大分片的流,不支持合并多个小段流以达到最小分片大小。 for _, size := range joinNode.Segments { - if size < multiUpload.MinPartSize { + if size < multiUpload.MinPartSize() { return true } } @@ -934,10 +934,10 @@ func useMultipartUploadToShardStore(ctx *ParseContext) { for i, size := range joinNode.Segments { joinInput := joinNode.InputSlot(i) - if size > multiUpload.MaxPartSize { + if size > multiUpload.MaxPartSize() { // 如果一个分段的大小大于最大分片大小,则需要拆分为多个小段上传 // 拆分以及上传指令直接在流的产生节点执行 - splits := math2.SplitLessThan(size, multiUpload.MaxPartSize) + splits := math2.SplitLessThan(size, multiUpload.MaxPartSize()) splitNode := ctx.DAG.NewSegmentSplit(splits) splitNode.Env().CopyFrom(joinInput.Var().Src.Env()) diff --git a/common/pkgs/storage/agtpool/pool.go b/common/pkgs/storage/agtpool/pool.go index 29dfd20..7b5bb1a 100644 --- a/common/pkgs/storage/agtpool/pool.go +++ b/common/pkgs/storage/agtpool/pool.go @@ -1,7 +1,6 @@ package agtpool import ( - "fmt" "sync" "gitlink.org.cn/cloudream/common/pkgs/async" @@ -39,11 +38,7 @@ func (m *AgentPool) SetupAgent(detail stgmod.StorageDetail) error { stg := &storage{} bld := factory.GetBuilder(detail) - if bld == nil { - return fmt.Errorf("unsupported storage type: %T", detail.Storage.Type) - } - - svc, err := bld.CreateAgent(detail) + svc, err := bld.CreateAgent() if err != nil { return err } diff --git a/common/pkgs/storage/factory/empty_builder.go b/common/pkgs/storage/factory/empty_builder.go new file mode 100644 index 0000000..0b13b1d --- /dev/null +++ b/common/pkgs/storage/factory/empty_builder.go @@ -0,0 +1,22 @@ +package factory + +import ( + "fmt" + + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type EmptyBuilder struct { + detail stgmod.StorageDetail +} + +// 创建一个在MasterHub上长期运行的存储服务 +func (b *EmptyBuilder) CreateAgent() (types.StorageAgent, error) { + return nil, fmt.Errorf("create agent for %T: %w", b.detail.Storage.Type, types.ErrUnsupported) +} + +// 创建一个分片上传组件 +func (b *EmptyBuilder) CreateMultiparter() (types.Multiparter, error) { + return nil, fmt.Errorf("create multipart initiator for %T: %w", b.detail.Storage.Type, types.ErrUnsupported) +} diff --git a/common/pkgs/storage/factory/factory.go b/common/pkgs/storage/factory/factory.go index 51cf953..5026e9d 100644 --- a/common/pkgs/storage/factory/factory.go +++ b/common/pkgs/storage/factory/factory.go @@ -12,7 +12,9 @@ import ( _ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3" ) +// 此函数永远不会返回nil。如果找不到对应的Builder,则会返回EmptyBuilder, +// 此Builder的所有函数都会返回否定值或者封装后的ErrUnsupported错误(需要使用errors.Is检查) func GetBuilder(detail stgmod.StorageDetail) types.StorageBuilder { typ := reflect.TypeOf(detail.Storage.Type) - return reg.StorageBuilders[typ] + return reg.StorageBuilders[typ](detail) } diff --git a/common/pkgs/storage/factory/reg/reg.go b/common/pkgs/storage/factory/reg/reg.go index bafcb66..4161080 100644 --- a/common/pkgs/storage/factory/reg/reg.go +++ b/common/pkgs/storage/factory/reg/reg.go @@ -5,12 +5,15 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/reflect2" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -var StorageBuilders = make(map[reflect.Type]types.StorageBuilder) +type BuilderCtor func(detail stgmod.StorageDetail) types.StorageBuilder + +var StorageBuilders = make(map[reflect.Type]BuilderCtor) // 注册针对指定存储服务类型的Builder -func RegisterBuilder[T cdssdk.StorageType](builder types.StorageBuilder) { - StorageBuilders[reflect2.TypeOf[T]()] = builder +func RegisterBuilder[T cdssdk.StorageType](ctor BuilderCtor) { + StorageBuilders[reflect2.TypeOf[T]()] = ctor } diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index 0c0b3fd..5dd3193 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -2,7 +2,6 @@ package local import ( "fmt" - "path/filepath" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" @@ -12,20 +11,26 @@ import ( ) func init() { - reg.RegisterBuilder[*cdssdk.LocalStorageType](&builder{}) + reg.RegisterBuilder[*cdssdk.LocalStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } + }) } -type builder struct{} +type builder struct { + detail stgmod.StorageDetail +} -func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, error) { +func (b *builder) CreateAgent() (types.StorageAgent, error) { agt := &agent{ - Detail: detail, + Detail: b.detail, } - if detail.Storage.ShardStore != nil { - local, ok := detail.Storage.ShardStore.(*cdssdk.LocalShardStorage) + if b.detail.Storage.ShardStore != nil { + local, ok := b.detail.Storage.ShardStore.(*cdssdk.LocalShardStorage) if !ok { - return nil, fmt.Errorf("invalid shard store type %T for local storage", detail.Storage.ShardStore) + return nil, fmt.Errorf("invalid shard store type %T for local storage", b.detail.Storage.ShardStore) } store, err := NewShardStore(agt, *local) @@ -36,10 +41,10 @@ func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, agt.ShardStore = store } - if detail.Storage.SharedStore != nil { - local, ok := detail.Storage.SharedStore.(*cdssdk.LocalSharedStorage) + if b.detail.Storage.SharedStore != nil { + local, ok := b.detail.Storage.SharedStore.(*cdssdk.LocalSharedStorage) if !ok { - return nil, fmt.Errorf("invalid shared store type %T for local storage", detail.Storage.SharedStore) + return nil, fmt.Errorf("invalid shared store type %T for local storage", b.detail.Storage.SharedStore) } store, err := NewSharedStore(agt, *local) @@ -53,27 +58,21 @@ func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, return agt, nil } -func (b *builder) CreateMultipartInitiator(detail stgmod.StorageDetail) (types.MultipartInitiator, error) { - feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail) - if feat == nil { - return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) - } - - absTempDir, err := filepath.Abs(feat.TempDir) - if err != nil { - return nil, fmt.Errorf("get abs temp dir %v: %v", feat.TempDir, err) - } +func (b *builder) HasShardStore() bool { + return b.detail.Storage.ShardStore != nil +} - return &MultipartInitiator{ - absTempDir: absTempDir, - }, nil +func (b *builder) HasSharedStore() bool { + return b.detail.Storage.SharedStore != nil } -func (b *builder) CreateMultipartUploader(detail stgmod.StorageDetail) (types.MultipartUploader, error) { - feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail) +func (b *builder) CreateMultiparter() (types.Multiparter, error) { + feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) } - return &MultipartUploader{}, nil + return &Multiparter{ + feat: feat, + }, nil } diff --git a/common/pkgs/storage/local/multipart_upload.go b/common/pkgs/storage/local/multipart_upload.go index 50eeae7..b5c4374 100644 --- a/common/pkgs/storage/local/multipart_upload.go +++ b/common/pkgs/storage/local/multipart_upload.go @@ -16,30 +16,76 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type MultipartInitiator struct { - absTempDir string // 应该要是绝对路径 - tempFileName string - tempPartsDir string - joinedFilePath string +type Multiparter struct { + feat *cdssdk.MultipartUploadFeature } -func (i *MultipartInitiator) Initiate(ctx context.Context) (types.MultipartInitState, error) { - i.tempFileName = os2.GenerateRandomFileName(10) - i.tempPartsDir = filepath.Join(i.absTempDir, i.tempFileName) - i.joinedFilePath = filepath.Join(i.absTempDir, i.tempFileName+".joined") +func (m *Multiparter) MinPartSize() int64 { + return m.feat.MinPartSize +} - err := os.MkdirAll(i.tempPartsDir, 0777) +func (m *Multiparter) MaxPartSize() int64 { + return m.feat.MaxPartSize +} +func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) { + absTempDir, err := filepath.Abs(m.feat.TempDir) if err != nil { - return types.MultipartInitState{}, err + return nil, fmt.Errorf("get abs temp dir %v: %v", m.feat.TempDir, err) } - return types.MultipartInitState{ - UploadID: i.tempPartsDir, + tempFileName := os2.GenerateRandomFileName(10) + tempPartsDir := filepath.Join(absTempDir, tempFileName) + joinedFilePath := filepath.Join(absTempDir, tempFileName+".joined") + + err = os.MkdirAll(tempPartsDir, 0777) + + if err != nil { + return nil, err + } + + return &MultipartTask{ + absTempDir: absTempDir, + tempFileName: tempFileName, + tempPartsDir: tempPartsDir, + joinedFilePath: joinedFilePath, + uploadID: tempPartsDir, + }, nil +} + +func (m *Multiparter) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) { + partFilePath := filepath.Join(init.UploadID, fmt.Sprintf("%v", partNumber)) + partFile, err := os.Create(partFilePath) + if err != nil { + return types.UploadedPartInfo{}, err + } + defer partFile.Close() + + _, err = io.Copy(partFile, stream) + if err != nil { + return types.UploadedPartInfo{}, err + } + return types.UploadedPartInfo{ + ETag: partFilePath, + PartNumber: partNumber, }, nil } -func (i *MultipartInitiator) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) { +type MultipartTask struct { + absTempDir string // 应该要是绝对路径 + tempFileName string + tempPartsDir string + joinedFilePath string + uploadID string +} + +func (i *MultipartTask) InitState() types.MultipartInitState { + return types.MultipartInitState{ + UploadID: i.uploadID, + } +} + +func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) { parts = sort2.Sort(parts, func(l, r types.UploadedPartInfo) int { return l.PartNumber - r.PartNumber }) @@ -70,7 +116,7 @@ func (i *MultipartInitiator) JoinParts(ctx context.Context, parts []types.Upload }, nil } -func (i *MultipartInitiator) writePart(partInfo types.UploadedPartInfo, joined *os.File, hasher hash.Hash) (int64, error) { +func (i *MultipartTask) writePart(partInfo types.UploadedPartInfo, joined *os.File, hasher hash.Hash) (int64, error) { part, err := os.Open(partInfo.ETag) if err != nil { return 0, err @@ -100,35 +146,11 @@ func (i *MultipartInitiator) writePart(partInfo types.UploadedPartInfo, joined * return size, nil } -func (i *MultipartInitiator) Complete() { +func (i *MultipartTask) Complete() { i.Abort() } -func (i *MultipartInitiator) Abort() { +func (i *MultipartTask) Abort() { os.Remove(i.joinedFilePath) os.RemoveAll(i.tempPartsDir) } - -type MultipartUploader struct{} - -func (u *MultipartUploader) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) { - partFilePath := filepath.Join(init.UploadID, fmt.Sprintf("%v", partNumber)) - partFile, err := os.Create(partFilePath) - if err != nil { - return types.UploadedPartInfo{}, err - } - defer partFile.Close() - - _, err = io.Copy(partFile, stream) - if err != nil { - return types.UploadedPartInfo{}, err - } - return types.UploadedPartInfo{ - ETag: partFilePath, - PartNumber: partNumber, - }, nil -} - -func (u *MultipartUploader) Close() { - -} diff --git a/common/pkgs/storage/s3/multipart_upload.go b/common/pkgs/storage/s3/multipart_upload.go index e6e371d..b80ac1e 100644 --- a/common/pkgs/storage/s3/multipart_upload.go +++ b/common/pkgs/storage/s3/multipart_upload.go @@ -13,41 +13,94 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/os2" "gitlink.org.cn/cloudream/common/utils/sort2" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type MultipartInitiator struct { - cli *s3.Client - bucket string - tempDir string - tempFileName string - tempFilePath string - uploadID string +type Multiparter struct { + detail stgmod.StorageDetail + feat *cdssdk.MultipartUploadFeature +} + +func (m *Multiparter) MinPartSize() int64 { + return m.feat.MinPartSize +} + +func (m *Multiparter) MaxPartSize() int64 { + return m.feat.MaxPartSize } -func (i *MultipartInitiator) Initiate(ctx context.Context) (types.MultipartInitState, error) { - i.tempFileName = os2.GenerateRandomFileName(10) - i.tempFilePath = filepath.Join(i.tempDir, i.tempFileName) +func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) { + tempFileName := os2.GenerateRandomFileName(10) + tempFilePath := filepath.Join(m.feat.TempDir, tempFileName) - resp, err := i.cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ - Bucket: aws.String(i.bucket), - Key: aws.String(i.tempFilePath), + cli, bkt, err := createS3Client(m.detail.Storage.Type) + if err != nil { + return nil, err + } + + resp, err := cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bkt), + Key: aws.String(tempFilePath), ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, }) if err != nil { - return types.MultipartInitState{}, err + return nil, err } - i.uploadID = *resp.UploadId + return &MultipartTask{ + cli: cli, + bucket: bkt, + tempDir: m.feat.TempDir, + tempFileName: tempFileName, + tempFilePath: tempFilePath, + uploadID: *resp.UploadId, + }, nil +} + +func (m *Multiparter) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) { + cli, _, err := createS3Client(m.detail.Storage.Type) + if err != nil { + return types.UploadedPartInfo{}, err + } + + hashStr := io2.NewReadHasher(sha256.New(), stream) + resp, err := cli.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(init.Bucket), + Key: aws.String(init.Key), + UploadId: aws.String(init.UploadID), + PartNumber: aws.Int32(int32(partNumber)), + Body: hashStr, + }) + if err != nil { + return types.UploadedPartInfo{}, err + } + + return types.UploadedPartInfo{ + ETag: *resp.ETag, + PartNumber: partNumber, + PartHash: hashStr.Sum(), + }, nil +} + +type MultipartTask struct { + cli *s3.Client + bucket string + tempDir string + tempFileName string + tempFilePath string + uploadID string +} +func (i *MultipartTask) InitState() types.MultipartInitState { return types.MultipartInitState{ - UploadID: *resp.UploadId, + UploadID: i.uploadID, Bucket: i.bucket, Key: i.tempFilePath, - }, nil + } } -func (i *MultipartInitiator) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) { +func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) { parts = sort2.Sort(parts, func(l, r types.UploadedPartInfo) int { return l.PartNumber - r.PartNumber }) @@ -94,11 +147,11 @@ func (i *MultipartInitiator) JoinParts(ctx context.Context, parts []types.Upload } -func (i *MultipartInitiator) Complete() { +func (i *MultipartTask) Complete() { } -func (i *MultipartInitiator) Abort() { +func (i *MultipartTask) Abort() { // TODO2 根据注释描述,Abort不能停止正在上传的分片,需要等待其上传完成才能彻底删除, // 考虑增加定时任务去定时清理 i.cli.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ @@ -111,32 +164,3 @@ func (i *MultipartInitiator) Abort() { Key: aws.String(i.tempFilePath), }) } - -type MultipartUploader struct { - cli *s3.Client - bucket string -} - -func (u *MultipartUploader) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) { - hashStr := io2.NewReadHasher(sha256.New(), stream) - resp, err := u.cli.UploadPart(ctx, &s3.UploadPartInput{ - Bucket: aws.String(init.Bucket), - Key: aws.String(init.Key), - UploadId: aws.String(init.UploadID), - PartNumber: aws.Int32(int32(partNumber)), - Body: hashStr, - }) - if err != nil { - return types.UploadedPartInfo{}, err - } - - return types.UploadedPartInfo{ - ETag: *resp.ETag, - PartNumber: partNumber, - PartHash: hashStr.Sum(), - }, nil -} - -func (u *MultipartUploader) Close() { - -} diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/s3/s3.go index 1c0f6ca..ea97523 100644 --- a/common/pkgs/storage/s3/s3.go +++ b/common/pkgs/storage/s3/s3.go @@ -14,25 +14,33 @@ import ( ) func init() { - reg.RegisterBuilder[*cdssdk.COSType](&builder{}) - reg.RegisterBuilder[*cdssdk.OSSType](&builder{}) - reg.RegisterBuilder[*cdssdk.OBSType](&builder{}) + reg.RegisterBuilder[*cdssdk.COSType](newBuilder) + reg.RegisterBuilder[*cdssdk.OSSType](newBuilder) + reg.RegisterBuilder[*cdssdk.OBSType](newBuilder) } -type builder struct{} +type builder struct { + detail stgmod.StorageDetail +} + +func newBuilder(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } +} -func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, error) { +func (b *builder) CreateAgent() (types.StorageAgent, error) { agt := &Agent{ - Detail: detail, + Detail: b.detail, } - if detail.Storage.ShardStore != nil { - cfg, ok := detail.Storage.ShardStore.(*cdssdk.S3ShardStorage) + if b.detail.Storage.ShardStore != nil { + cfg, ok := b.detail.Storage.ShardStore.(*cdssdk.S3ShardStorage) if !ok { - return nil, fmt.Errorf("invalid shard store type %T for local storage", detail.Storage.ShardStore) + return nil, fmt.Errorf("invalid shard store type %T for local storage", b.detail.Storage.ShardStore) } - cli, bkt, err := createS3Client(detail.Storage.Type) + cli, bkt, err := createS3Client(b.detail.Storage.Type) if err != nil { return nil, err } @@ -51,38 +59,23 @@ func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, return agt, nil } -func (b *builder) CreateMultipartInitiator(detail stgmod.StorageDetail) (types.MultipartInitiator, error) { - feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail) - if feat == nil { - return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) - } - - cli, bkt, err := createS3Client(detail.Storage.Type) - if err != nil { - return nil, err - } +func (b *builder) HasShardStore() bool { + return b.detail.Storage.ShardStore != nil +} - return &MultipartInitiator{ - cli: cli, - bucket: bkt, - tempDir: feat.TempDir, - }, nil +func (b *builder) HasSharedStore() bool { + return false } -func (b *builder) CreateMultipartUploader(detail stgmod.StorageDetail) (types.MultipartUploader, error) { - feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail) +func (b *builder) CreateMultiparter() (types.Multiparter, error) { + feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](b.detail) if feat == nil { return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{}) } - cli, bkt, err := createS3Client(detail.Storage.Type) - if err != nil { - return nil, err - } - - return &MultipartUploader{ - cli: cli, - bucket: bkt, + return &Multiparter{ + detail: b.detail, + feat: feat, }, nil } diff --git a/common/pkgs/storage/types/s3_client.go b/common/pkgs/storage/types/s3_client.go index 9514d24..4ec3f9c 100644 --- a/common/pkgs/storage/types/s3_client.go +++ b/common/pkgs/storage/types/s3_client.go @@ -5,9 +5,17 @@ import ( "io" ) -type MultipartInitiator interface { +type Multiparter interface { + MaxPartSize() int64 + MinPartSize() int64 // 启动一个分片上传 - Initiate(ctx context.Context) (MultipartInitState, error) + Initiate(ctx context.Context) (MultipartTask, error) + // 上传一个分片 + UploadPart(ctx context.Context, init MultipartInitState, partSize int64, partNumber int, stream io.Reader) (UploadedPartInfo, error) +} + +type MultipartTask interface { + InitState() MultipartInitState // 所有分片上传完成后,合并分片 JoinParts(ctx context.Context, parts []UploadedPartInfo) (BypassFileInfo, error) // 合成之后的文件已被使用 @@ -16,11 +24,6 @@ type MultipartInitiator interface { Abort() } -type MultipartUploader interface { - UploadPart(ctx context.Context, init MultipartInitState, partSize int64, partNumber int, stream io.Reader) (UploadedPartInfo, error) - Close() -} - // TODO 可以考虑重构成一个接口,支持不同的类型的分片有不同内容的实现 type MultipartInitState struct { UploadID string diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 22fa6f4..0f355d5 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -18,12 +18,11 @@ type StorageEvent interface{} type StorageEventChan = async.UnboundChannel[StorageEvent] -/* -如果一个组件需要与Agent交互(比如实际是ShardStore功能的一部分),那么就将Create函数放到StorageAgent接口中。 -如果组件十分独立,仅需要存储服务的配置信息就行,那么就把Create函数放到StorageBuilder中去。 -*/ - // 在MasterHub上运行,代理一个存储服务。 +// +// 存放Storage的运行时数据。如果一个组件需要与Agent交互(比如实际是ShardStore功能的一部分),或者是需要长期运行, +// 那么就将该组件的Get函数放到StorageAgent接口中。可以同时在StorageBuilder中同时提供HasXXX函数, +// 用于判断该Storage是否支持某个功能,用于生成ioswitch计划时判断是否能利用此功能。 type StorageAgent interface { Start(ch *StorageEventChan) Stop() @@ -35,12 +34,18 @@ type StorageAgent interface { GetSharedStore() (SharedStore, error) } -// 创建存储服务的指定组件 +// 创建存储服务的指定组件。 +// +// 如果指定组件比较独立,不需要依赖运行时数据,或者不需要与Agent交互,那么就可以将Create函数放到这个接口中。 +// 增加Has函数用于判断该Storage是否有某个组件。 +// 如果Create函数仅仅只是创建一个结构体,没有其他副作用,那么也可以用Create函数来判断是否支持某个功能。 type StorageBuilder interface { // 创建一个在MasterHub上长期运行的存储服务 - CreateAgent(detail stgmod.StorageDetail) (StorageAgent, error) - // 创建一个分片上传功能的初始化器 - CreateMultipartInitiator(detail stgmod.StorageDetail) (MultipartInitiator, error) - // 创建一个分片上传功能的上传器 - CreateMultipartUploader(detail stgmod.StorageDetail) (MultipartUploader, error) + CreateAgent() (StorageAgent, error) + // 是否支持分片存储服务 + HasShardStore() bool + // 是否支持共享存储服务 + HasSharedStore() bool + // 创建一个分片上传组件 + CreateMultiparter() (Multiparter, error) } diff --git a/common/pkgs/uploader/uploader.go b/common/pkgs/uploader/uploader.go index bcb3f4d..08ebb14 100644 --- a/common/pkgs/uploader/uploader.go +++ b/common/pkgs/uploader/uploader.go @@ -17,6 +17,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/metacache" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" ) type Uploader struct { @@ -85,7 +86,7 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff if stg.MasterHub == nil { return nil, fmt.Errorf("load to storage %v has no master hub", stgID) } - if stg.Storage.SharedStore == nil { + if factory.GetBuilder(stg).HasSharedStore() { return nil, fmt.Errorf("load to storage %v has no shared store", stgID) }