From 72c72f15ca0bd189e3eb71af360a33ec587cf014 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 22 Nov 2024 19:54:40 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=B8=8A=E4=BC=A0=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E6=8E=A5=E5=8F=A3=E9=87=8D=E5=A4=8D=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/db2/object.go | 26 +++++++++++++++----------- common/pkgs/mq/coordinator/package.go | 14 +++++++------- common/pkgs/uploader/create_load.go | 18 +++++++++--------- common/pkgs/uploader/update.go | 2 +- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 94a4a77..e365057 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -264,11 +264,13 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] // 创建 ObjectBlock objBlocks := make([]stgmod.ObjectBlock, len(adds)) for i, add := range adds { - objBlocks[i] = stgmod.ObjectBlock{ - ObjectID: affectedObjIDs[i], - Index: 0, - StorageID: add.StorageID, - FileHash: add.FileHash, + for _, stgID := range add.StorageIDs { + objBlocks = append(objBlocks, stgmod.ObjectBlock{ + ObjectID: affectedObjIDs[i], + Index: 0, + StorageID: stgID, + FileHash: add.FileHash, + }) } } if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil { @@ -277,12 +279,14 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] // 创建 Cache caches := make([]model.Cache, len(adds)) - for i, add := range adds { - caches[i] = model.Cache{ - FileHash: add.FileHash, - StorageID: add.StorageID, - CreateTime: time.Now(), - Priority: 0, + for _, add := range adds { + for _, stgID := range add.StorageIDs { + caches = append(caches, model.Cache{ + FileHash: add.FileHash, + StorageID: stgID, + CreateTime: time.Now(), + Priority: 0, + }) } } if err := db.Cache().BatchCreate(ctx, caches); err != nil { diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 2ea60c2..1ae60b7 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -129,11 +129,11 @@ type UpdatePackageResp struct { Added []cdssdk.Object `json:"added"` } type AddObjectEntry struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - FileHash cdssdk.FileHash `json:"fileHash"` - UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 - StorageID cdssdk.StorageID `json:"storageID"` + Path string `json:"path"` + Size int64 `json:"size,string"` + FileHash cdssdk.FileHash `json:"fileHash"` + UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 + StorageIDs []cdssdk.StorageID `json:"storageIDs"` } func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { @@ -148,13 +148,13 @@ func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp { Added: added, } } -func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgID cdssdk.StorageID) AddObjectEntry { +func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgIDs []cdssdk.StorageID) AddObjectEntry { return AddObjectEntry{ Path: path, Size: size, FileHash: fileHash, UploadTime: uploadTime, - StorageID: stgID, + StorageIDs: stgIDs, } } func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 1605a2e..59772ed 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -38,6 +38,7 @@ type CreateLoadResult struct { func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) error { uploadTime := time.Now() + stgIDs := make([]cdssdk.StorageID, 0, len(u.targetStgs)) ft := ioswitch2.NewFromTo() fromExec, hd := ioswitch2.NewFromDriver(-1) @@ -45,6 +46,7 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e for _, stg := range u.targetStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, -1, "fileHash")) ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, u.userID, u.pkg.PackageID, path)) + stgIDs = append(stgIDs, stg.Storage.StorageID) } plans := exec.NewPlanBuilder() @@ -67,15 +69,13 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e // 记录上传结果 fileHash := ret["fileHash"].(*ops2.FileHashValue).Hash - for _, stg := range u.targetStgs { - u.successes = append(u.successes, coormq.AddObjectEntry{ - Path: path, - Size: size, - FileHash: fileHash, - UploadTime: uploadTime, - StorageID: stg.Storage.StorageID, - }) - } + u.successes = append(u.successes, coormq.AddObjectEntry{ + Path: path, + Size: size, + FileHash: fileHash, + UploadTime: uploadTime, + StorageIDs: stgIDs, + }) return nil } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index aed711b..1d3f837 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -70,7 +70,7 @@ func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error Size: size, FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash, UploadTime: uploadTime, - StorageID: w.targetStg.Storage.StorageID, + StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID}, }) return nil }