Browse Source

解决上传调度接口重复添加元数据的问题

gitlink
Sydonian 1 year ago
parent
commit
72c72f15ca
4 changed files with 32 additions and 28 deletions
  1. +15
    -11
      common/pkgs/db2/object.go
  2. +7
    -7
      common/pkgs/mq/coordinator/package.go
  3. +9
    -9
      common/pkgs/uploader/create_load.go
  4. +1
    -1
      common/pkgs/uploader/update.go

+ 15
- 11
common/pkgs/db2/object.go View File

@@ -264,11 +264,13 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
// 创建 ObjectBlock
objBlocks := make([]stgmod.ObjectBlock, len(adds))
for i, add := range adds {
objBlocks[i] = stgmod.ObjectBlock{
ObjectID: affectedObjIDs[i],
Index: 0,
StorageID: add.StorageID,
FileHash: add.FileHash,
for _, stgID := range add.StorageIDs {
objBlocks = append(objBlocks, stgmod.ObjectBlock{
ObjectID: affectedObjIDs[i],
Index: 0,
StorageID: stgID,
FileHash: add.FileHash,
})
}
}
if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
@@ -277,12 +279,14 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []

// 创建 Cache
caches := make([]model.Cache, len(adds))
for i, add := range adds {
caches[i] = model.Cache{
FileHash: add.FileHash,
StorageID: add.StorageID,
CreateTime: time.Now(),
Priority: 0,
for _, add := range adds {
for _, stgID := range add.StorageIDs {
caches = append(caches, model.Cache{
FileHash: add.FileHash,
StorageID: stgID,
CreateTime: time.Now(),
Priority: 0,
})
}
}
if err := db.Cache().BatchCreate(ctx, caches); err != nil {


+ 7
- 7
common/pkgs/mq/coordinator/package.go View File

@@ -129,11 +129,11 @@ type UpdatePackageResp struct {
Added []cdssdk.Object `json:"added"`
}
type AddObjectEntry struct {
Path string `json:"path"`
Size int64 `json:"size,string"`
FileHash cdssdk.FileHash `json:"fileHash"`
UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间
StorageID cdssdk.StorageID `json:"storageID"`
Path string `json:"path"`
Size int64 `json:"size,string"`
FileHash cdssdk.FileHash `json:"fileHash"`
UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间
StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}

func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage {
@@ -148,13 +148,13 @@ func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp {
Added: added,
}
}
func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgID cdssdk.StorageID) AddObjectEntry {
func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgIDs []cdssdk.StorageID) AddObjectEntry {
return AddObjectEntry{
Path: path,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageID: stgID,
StorageIDs: stgIDs,
}
}
func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) {


+ 9
- 9
common/pkgs/uploader/create_load.go View File

@@ -38,6 +38,7 @@ type CreateLoadResult struct {

func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) error {
uploadTime := time.Now()
stgIDs := make([]cdssdk.StorageID, 0, len(u.targetStgs))

ft := ioswitch2.NewFromTo()
fromExec, hd := ioswitch2.NewFromDriver(-1)
@@ -45,6 +46,7 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e
for _, stg := range u.targetStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, -1, "fileHash"))
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, u.userID, u.pkg.PackageID, path))
stgIDs = append(stgIDs, stg.Storage.StorageID)
}

plans := exec.NewPlanBuilder()
@@ -67,15 +69,13 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e

// 记录上传结果
fileHash := ret["fileHash"].(*ops2.FileHashValue).Hash
for _, stg := range u.targetStgs {
u.successes = append(u.successes, coormq.AddObjectEntry{
Path: path,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageID: stg.Storage.StorageID,
})
}
u.successes = append(u.successes, coormq.AddObjectEntry{
Path: path,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageIDs: stgIDs,
})
return nil
}



+ 1
- 1
common/pkgs/uploader/update.go View File

@@ -70,7 +70,7 @@ func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error
Size: size,
FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash,
UploadTime: uploadTime,
StorageID: w.targetStg.Storage.StorageID,
StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID},
})
return nil
}


Loading…
Cancel
Save