Browse Source

Merge branch 'feature_gxh'

gitlink
Sydonian 1 year ago
parent
commit
677c5128fd
4 changed files with 34 additions and 30 deletions
  1. +17
    -13
      common/pkgs/db2/object.go
  2. +7
    -7
      common/pkgs/mq/coordinator/package.go
  3. +9
    -9
      common/pkgs/uploader/create_load.go
  4. +1
    -1
      common/pkgs/uploader/update.go

+ 17
- 13
common/pkgs/db2/object.go View File

@@ -262,13 +262,15 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
}

// 创建 ObjectBlock
objBlocks := make([]stgmod.ObjectBlock, len(adds))
objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
for i, add := range adds {
objBlocks[i] = stgmod.ObjectBlock{
ObjectID: affectedObjIDs[i],
Index: 0,
StorageID: add.StorageID,
FileHash: add.FileHash,
for _, stgID := range add.StorageIDs {
objBlocks = append(objBlocks, stgmod.ObjectBlock{
ObjectID: affectedObjIDs[i],
Index: 0,
StorageID: stgID,
FileHash: add.FileHash,
})
}
}
if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
@@ -276,13 +278,15 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
}

// 创建 Cache
caches := make([]model.Cache, len(adds))
for i, add := range adds {
caches[i] = model.Cache{
FileHash: add.FileHash,
StorageID: add.StorageID,
CreateTime: time.Now(),
Priority: 0,
caches := make([]model.Cache, 0, len(adds))
for _, add := range adds {
for _, stgID := range add.StorageIDs {
caches = append(caches, model.Cache{
FileHash: add.FileHash,
StorageID: stgID,
CreateTime: time.Now(),
Priority: 0,
})
}
}
if err := db.Cache().BatchCreate(ctx, caches); err != nil {


+ 7
- 7
common/pkgs/mq/coordinator/package.go View File

@@ -129,11 +129,11 @@ type UpdatePackageResp struct {
Added []cdssdk.Object `json:"added"`
}
type AddObjectEntry struct {
Path string `json:"path"`
Size int64 `json:"size,string"`
FileHash cdssdk.FileHash `json:"fileHash"`
UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间
StorageID cdssdk.StorageID `json:"storageID"`
Path string `json:"path"`
Size int64 `json:"size,string"`
FileHash cdssdk.FileHash `json:"fileHash"`
UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间
StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}

func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage {
@@ -148,13 +148,13 @@ func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp {
Added: added,
}
}
func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgID cdssdk.StorageID) AddObjectEntry {
func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgIDs []cdssdk.StorageID) AddObjectEntry {
return AddObjectEntry{
Path: path,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageID: stgID,
StorageIDs: stgIDs,
}
}
func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) {


+ 9
- 9
common/pkgs/uploader/create_load.go View File

@@ -38,6 +38,7 @@ type CreateLoadResult struct {

func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) error {
uploadTime := time.Now()
stgIDs := make([]cdssdk.StorageID, 0, len(u.targetStgs))

ft := ioswitch2.FromTo{}
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
@@ -45,6 +46,7 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e
for _, stg := range u.targetStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg.Storage, ioswitch2.RawStream(), "fileHash"))
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg.Storage, u.userID, u.pkg.PackageID, path))
stgIDs = append(stgIDs, stg.Storage.StorageID)
}

plans := exec.NewPlanBuilder()
@@ -67,15 +69,13 @@ func (u *CreateLoadUploader) Upload(path string, size int64, stream io.Reader) e

// 记录上传结果
fileHash := ret["fileHash"].(*ops2.FileHashValue).Hash
for _, stg := range u.targetStgs {
u.successes = append(u.successes, coormq.AddObjectEntry{
Path: path,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageID: stg.Storage.StorageID,
})
}
u.successes = append(u.successes, coormq.AddObjectEntry{
Path: path,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageIDs: stgIDs,
})
return nil
}



+ 1
- 1
common/pkgs/uploader/update.go View File

@@ -70,7 +70,7 @@ func (w *UpdateUploader) Upload(path string, size int64, stream io.Reader) error
Size: size,
FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash,
UploadTime: uploadTime,
StorageID: w.targetStg.Storage.StorageID,
StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID},
})
return nil
}


Loading…
Cancel
Save