Browse Source

解决调试问题

gitlink
Sydonian 1 year ago
parent
commit
8b1b58c654
7 changed files with 90 additions and 49 deletions
  1. +4
    -2
      agent/internal/grpc/io.go
  2. +77
    -40
      common/pkgs/db2/object.go
  3. +2
    -1
      common/pkgs/db2/object_block.go
  4. +1
    -0
      common/pkgs/ioswitch2/fromto.go
  5. +2
    -2
      common/pkgs/ioswitch2/ops2/shard_store.go
  6. +1
    -1
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  7. +3
    -3
      common/pkgs/storage/local/shard_store.go

+ 4
- 2
agent/internal/grpc/io.go View File

@@ -20,8 +20,8 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe
return nil, fmt.Errorf("deserializing plan: %w", err)
}

logger.WithField("PlanID", plan.ID).Infof("begin execute io plan")
defer logger.WithField("PlanID", plan.ID).Infof("plan finished")
log := logger.WithField("PlanID", plan.ID)
log.Infof("begin execute io plan")

sw := exec.NewExecutor(plan)

@@ -32,9 +32,11 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe
exec.SetValueByType(execCtx, s.stgMgr)
_, err = sw.Run(execCtx)
if err != nil {
log.Warnf("running io plan: %v", err)
return nil, fmt.Errorf("running io plan: %w", err)
}

log.Infof("plan finished")
return &agtrpc.ExecuteIOPlanResp{}, nil
}



+ 77
- 40
common/pkgs/db2/object.go View File

@@ -2,10 +2,8 @@ package db2

import (
"fmt"
"strings"
"time"

"gitlink.org.cn/cloudream/common/utils/sort2"
"gorm.io/gorm/clause"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -83,6 +81,15 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID,
return obj.ObjectID, nil
}

// 批量创建对象,创建完成后会填充ObjectID。
func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error {
if len(*objs) == 0 {
return nil
}

return ctx.Table("Object").Create(objs).Error
}

func (db *ObjectDB) BatchUpsertByPackagePath(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 {
return nil
@@ -165,68 +172,98 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
return nil, nil
}

objs := make([]cdssdk.Object, 0, len(adds))
// 收集所有路径
pathes := make([]string, 0, len(adds))
for _, add := range adds {
objs = append(objs, cdssdk.Object{
PackageID: packageID,
Path: add.Path,
Size: add.Size,
FileHash: add.FileHash,
Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
CreateTime: add.UploadTime,
UpdateTime: add.UploadTime,
})
pathes = append(pathes, add.Path)
}

err := db.BatchUpsertByPackagePath(ctx, objs)
// 先查询要更新的对象,不存在也没关系
existsObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
if err != nil {
return nil, fmt.Errorf("batch create or update objects: %w", err)
return nil, fmt.Errorf("batch get object by path: %w", err)
}

// 收集所有路径
pathes := make([]string, 0, len(adds))
for _, add := range adds {
pathes = append(pathes, add.Path)
existsObjsMap := make(map[string]cdssdk.Object)
for _, obj := range existsObjs {
existsObjsMap[obj.Path] = obj
}

// 批量获取对象
addedObjs := []cdssdk.Object{}
err = ctx.Table("Object").Where("PackageID = ? AND Path IN ?", packageID, pathes).Find(&addedObjs).Error
if err != nil {
return nil, fmt.Errorf("batch get object ids: %w", err)
var updatingObjs []cdssdk.Object
var addingObjs []cdssdk.Object
for i := range adds {
o := cdssdk.Object{
PackageID: packageID,
Path: adds[i].Path,
Size: adds[i].Size,
FileHash: adds[i].FileHash,
Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
CreateTime: adds[i].UploadTime,
UpdateTime: adds[i].UploadTime,
}

e, ok := existsObjsMap[adds[i].Path]
if ok {
o.ObjectID = e.ObjectID
o.CreateTime = e.CreateTime
updatingObjs = append(updatingObjs, o)

} else {
addingObjs = append(addingObjs, o)
}
}

// 对添加的对象和获取的对象进行排序
adds = sort2.Sort(adds, func(l, r coormq.AddObjectEntry) int { return strings.Compare(l.Path, r.Path) })
addedObjs = sort2.Sort(addedObjs, func(l, r cdssdk.Object) int { return strings.Compare(l.Path, r.Path) })
// 先进行更新
err = db.BatchUpert(ctx, updatingObjs)
if err != nil {
return nil, fmt.Errorf("batch update objects: %w", err)
}

// 收集对象 ID
addedObjIDs := make([]cdssdk.ObjectID, len(addedObjs))
for i := range addedObjs {
addedObjIDs[i] = addedObjs[i].ObjectID
// 再执行插入,Create函数插入后会填充ObjectID
err = db.BatchCreate(ctx, &addingObjs)
if err != nil {
return nil, fmt.Errorf("batch create objects: %w", err)
}

// 批量删除 ObjectBlock
if err := ctx.Table("ObjectBlock").Where("ObjectID IN ?", addedObjIDs).Delete(&stgmod.ObjectBlock{}).Error; err != nil {
return nil, fmt.Errorf("batch delete object blocks: %w", err)
// 按照add参数的顺序返回结果
affectedObjsMp := make(map[string]cdssdk.Object)
for _, o := range updatingObjs {
affectedObjsMp[o.Path] = o
}
for _, o := range addingObjs {
affectedObjsMp[o.Path] = o
}
affectedObjs := make([]cdssdk.Object, 0, len(affectedObjsMp))
affectedObjIDs := make([]cdssdk.ObjectID, 0, len(affectedObjsMp))
for i := range adds {
obj := affectedObjsMp[adds[i].Path]
affectedObjs = append(affectedObjs, obj)
affectedObjIDs = append(affectedObjIDs, obj.ObjectID)
}

// 批量删除 PinnedObject
if err := ctx.Table("PinnedObject").Where("ObjectID IN ?", addedObjIDs).Delete(&cdssdk.PinnedObject{}).Error; err != nil {
return nil, fmt.Errorf("batch delete pinned objects: %w", err)
if len(affectedObjIDs) > 0 {
// 批量删除 ObjectBlock
if err := ctx.Table("ObjectBlock").Where("ObjectID IN ?", affectedObjIDs).Delete(&stgmod.ObjectBlock{}).Error; err != nil {
return nil, fmt.Errorf("batch delete object blocks: %w", err)
}

// 批量删除 PinnedObject
if err := ctx.Table("PinnedObject").Where("ObjectID IN ?", affectedObjIDs).Delete(&cdssdk.PinnedObject{}).Error; err != nil {
return nil, fmt.Errorf("batch delete pinned objects: %w", err)
}
}

// 创建 ObjectBlock
objBlocks := make([]stgmod.ObjectBlock, len(adds))
for i, add := range adds {
objBlocks[i] = stgmod.ObjectBlock{
ObjectID: addedObjIDs[i],
ObjectID: affectedObjIDs[i],
Index: 0,
StorageID: add.StorageID,
FileHash: add.FileHash,
}
}
if err := ctx.Table("ObjectBlock").Create(&objBlocks).Error; err != nil {
if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
return nil, fmt.Errorf("batch create object blocks: %w", err)
}

@@ -240,11 +277,11 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
Priority: 0,
}
}
if err := ctx.Table("Cache").Create(&caches).Error; err != nil {
if err := db.Cache().BatchCreate(ctx, caches); err != nil {
return nil, fmt.Errorf("batch create caches: %w", err)
}

return addedObjs, nil
return affectedObjs, nil
}

func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error {


+ 2
- 1
common/pkgs/db2/object_block.go View File

@@ -6,6 +6,7 @@ import (

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gorm.io/gorm/clause"
)

type ObjectBlockDB struct {
@@ -42,7 +43,7 @@ func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock
return nil
}

return ctx.Table("ObjectBlock").Create(&blocks).Error
return ctx.Clauses(clause.Insert{Modifier: "ignore"}).Create(&blocks).Error
}

func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {


+ 1
- 0
common/pkgs/ioswitch2/fromto.go View File

@@ -68,6 +68,7 @@ func NewFromShardstore(fileHash cdssdk.FileHash, hub cdssdk.Node, storage cdssdk
return &FromShardstore{
FileHash: fileHash,
Hub: hub,
Storage: storage,
DataIndex: dataIndex,
}
}


+ 2
- 2
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -36,7 +36,7 @@ type ShardRead struct {

func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
logger.
WithField("Open", o.Open).
WithField("Open", o.Open.String()).
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

@@ -66,7 +66,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
}

func (o *ShardRead) String() string {
return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output)
return fmt.Sprintf("ShardRead %v -> %v", o.Open.String(), o.Output)
}

type ShardWrite struct {


+ 1
- 1
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -36,7 +36,7 @@ type ShardRead struct {

func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
logger.
WithField("Open", o.Open).
WithField("Open", o.Open.String()).
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")



+ 3
- 3
common/pkgs/storage/local/shard_store.go View File

@@ -75,7 +75,7 @@ func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) {
return nil, fmt.Errorf("invalid file name")
}

filePath := filepath.Join(s.cfg.Root, BlocksDir, fileName)
filePath := filepath.Join(s.cfg.Root, BlocksDir, fileName[:2], fileName)
file, err := os.Open(filePath)
if err != nil {
return nil, err
@@ -117,7 +117,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
// TODO 简单检查一下文件名是否合法

infos = append(infos, types.FileInfo{
Hash: cdssdk.FileHash(info.Name()),
Hash: cdssdk.FileHash(filepath.Base(info.Name())),
Size: info.Size(),
Description: filepath.Join(blockDir, path),
})
@@ -193,5 +193,5 @@ func (s *ShardStore) removeTempFile(path string) {
}

func (s *ShardStore) getLogger() logger.Logger {
return logger.WithField("Svc", SvcName).WithField("Storage", s.stg)
return logger.WithField("S", SvcName).WithField("Storage", s.stg.String())
}

Loading…
Cancel
Save