diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 94a4a77..a8bb0bd 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -262,13 +262,15 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] } // 创建 ObjectBlock - objBlocks := make([]stgmod.ObjectBlock, len(adds)) + objBlocks := make([]stgmod.ObjectBlock, 0, len(adds)) for i, add := range adds { - objBlocks[i] = stgmod.ObjectBlock{ - ObjectID: affectedObjIDs[i], - Index: 0, - StorageID: add.StorageID, - FileHash: add.FileHash, + for _, stgID := range add.StorageIDs { + objBlocks = append(objBlocks, stgmod.ObjectBlock{ + ObjectID: affectedObjIDs[i], + Index: 0, + StorageID: stgID, + FileHash: add.FileHash, + }) } } if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil { @@ -276,13 +278,15 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] } // 创建 Cache - caches := make([]model.Cache, len(adds)) - for i, add := range adds { - caches[i] = model.Cache{ - FileHash: add.FileHash, - StorageID: add.StorageID, - CreateTime: time.Now(), - Priority: 0, + caches := make([]model.Cache, 0, len(adds)) + for _, add := range adds { + for _, stgID := range add.StorageIDs { + caches = append(caches, model.Cache{ + FileHash: add.FileHash, + StorageID: stgID, + CreateTime: time.Now(), + Priority: 0, + }) } } if err := db.Cache().BatchCreate(ctx, caches); err != nil { diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 2ea60c2..1ae60b7 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -129,11 +129,11 @@ type UpdatePackageResp struct { Added []cdssdk.Object `json:"added"` } type AddObjectEntry struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - FileHash cdssdk.FileHash `json:"fileHash"` - UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 - StorageID cdssdk.StorageID `json:"storageID"` + Path string `json:"path"` + Size int64 `json:"size,string"` + FileHash cdssdk.FileHash `json:"fileHash"` + UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 + StorageIDs []cdssdk.StorageID `json:"storageIDs"` } func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { @@ -148,13 +148,13 @@ func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp { Added: added, } } -func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgID cdssdk.StorageID) AddObjectEntry { +func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgIDs []cdssdk.StorageID) AddObjectEntry { return AddObjectEntry{ Path: path, Size: size, FileHash: fileHash, UploadTime: uploadTime, - StorageID: stgID, + StorageIDs: stgIDs, } } func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 84c2689..a435ee8 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -38,6 +38,7 @@ type CreateLoadResult struct { func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) error { uploadTime := time.Now() + stgIDs := make([]cdssdk.StorageID, 0, len(u.targetStgs)) ft := ioswitch2.FromTo{} fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) @@ -45,6 +46,7 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e for _, stg := range u.targetStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, ioswitch2.RawStream(), "fileHash")) ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, u.userID, u.pkg.PackageID, path)) + stgIDs = append(stgIDs, stg.Storage.StorageID) } plans := exec.NewPlanBuilder() @@ -67,15 +69,13 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e // 记录上传结果 fileHash := ret["fileHash"].(*ops2.FileHashValue).Hash - for _, stg := range u.targetStgs { - u.successes = append(u.successes, coormq.AddObjectEntry{ - Path: path, - Size: size, - FileHash: fileHash, - UploadTime: uploadTime, - StorageID: stg.Storage.StorageID, - }) - } + u.successes = append(u.successes, coormq.AddObjectEntry{ + Path: path, + Size: size, + FileHash: fileHash, + UploadTime: uploadTime, + StorageIDs: stgIDs, + }) return nil } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index a583bc9..8f8ea10 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -70,7 +70,7 @@ func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error Size: size, FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash, UploadTime: uploadTime, - StorageID: w.targetStg.Storage.StorageID, + StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID}, }) return nil }