Browse Source

修复测试中发现的问题

gitlink
Sydonian 2 years ago
parent
commit
f2b2762082
19 changed files with 458 additions and 128 deletions
  1. +3
    -3
      agent/internal/mq/object.go
  2. +8
    -2
      client/internal/cmdline/package.go
  3. +2
    -1
      common/assets/confs/agent.config.json
  4. +2
    -1
      common/assets/confs/client.config.json
  5. +2
    -1
      common/assets/confs/scanner.config.json
  6. +16
    -3
      common/pkgs/db/cache.go
  7. +5
    -0
      common/pkgs/db/db.go
  8. +181
    -68
      common/pkgs/db/object.go
  9. +13
    -2
      common/pkgs/db/object_block.go
  10. +1
    -1
      common/pkgs/db/package.go
  11. +14
    -0
      common/pkgs/db/pinned_object.go
  12. +75
    -0
      common/pkgs/db/utils.go
  13. +23
    -8
      common/pkgs/grpc/agent/pool.go
  14. +56
    -25
      common/pkgs/iterator/download_object_iterator.go
  15. +22
    -4
      common/pkgs/mq/agent/client.go
  16. +16
    -3
      common/pkgs/mq/coordinator/client.go
  17. +2
    -2
      common/pkgs/mq/coordinator/object.go
  18. +16
    -3
      common/pkgs/mq/scanner/client.go
  19. +1
    -1
      scanner/internal/event/agent_check_cache.go

+ 3
- 3
agent/internal/mq/object.go View File

@@ -2,19 +2,19 @@ package mq

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage/agent/internal/task"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
)

func (svc *Service) PinObject(msg *agtmq.PinObject) (*agtmq.PinObjectResp, *mq.CodeMessage) {
log.WithField("FileHash", msg.FileHash).Debugf("pin object")
logger.WithField("FileHash", msg.FileHash).Debugf("pin object")

tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash))

if tsk.Error() != nil {
log.WithField("FileHash", msg.FileHash).
logger.WithField("FileHash", msg.FileHash).
Warnf("pin object failed, err: %s", tsk.Error().Error())
return nil, mq.Failed(errorcode.OperationFailed, "pin object failed")
}


+ 8
- 2
client/internal/cmdline/package.go View File

@@ -46,6 +46,8 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp
}
defer objIter.Close()

madeDirs := make(map[string]bool)

for {
objInfo, err := objIter.MoveNext()
if err == iterator.ErrNoMoreItem {
@@ -61,8 +63,11 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp
fullPath := filepath.Join(outputDir, objInfo.Object.Path)

dirPath := filepath.Dir(fullPath)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
if !madeDirs[dirPath] {
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
}
madeDirs[dirPath] = true
}

outputFile, err := os.Create(fullPath)
@@ -135,6 +140,7 @@ func PackageCreatePackage(ctx CommandContext, name string, rootPath string, buck
})
}
fmt.Print(tb.Render())
fmt.Printf("\n%v", uploadObjectResult.PackageID)
return nil
}



+ 2
- 1
common/assets/confs/agent.config.json View File

@@ -30,6 +30,7 @@
"etcdUsername": "",
"etcdPassword": "",
"etcdLockLeaseTimeSec": 5,
"description": "I am a agent"
"randomReleasingDelayMs": 3000,
"serviceDescription": "I am a agent"
}
}

+ 2
- 1
common/assets/confs/client.config.json View File

@@ -23,6 +23,7 @@
"etcdUsername": "",
"etcdPassword": "",
"etcdLockLeaseTimeSec": 5,
"description": "I am a client"
"randomReleasingDelayMs": 3000,
"serviceDescription": "I am a client"
}
}

+ 2
- 1
common/assets/confs/scanner.config.json View File

@@ -24,6 +24,7 @@
"etcdUsername": "",
"etcdPassword": "",
"etcdLockLeaseTimeSec": 5,
"description": "I am a scanner"
"randomReleasingDelayMs": 3000,
"serviceDescription": "I am a scanner"
}
}

+ 16
- 3
common/pkgs/db/cache.go View File

@@ -44,7 +44,19 @@ func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, pr
return nil
}

func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error {
// 批量创建缓存记录
func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error {
return BatchNamedExec(
ctx,
"insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+
" on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)",
4,
caches,
nil,
)
}

func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error {
var caches []model.Cache
var nowTime = time.Now()
for _, hash := range fileHashes {
@@ -56,8 +68,9 @@ func (*CacheDB) BatchCreate(ctx SQLContext, fileHashes []string, nodeID cdssdk.N
})
}

_, err := sqlx.NamedExec(ctx, "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+
" on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)",
_, err := sqlx.NamedExec(ctx,
"insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+
" on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)",
caches,
)
return err


+ 5
- 0
common/pkgs/db/db.go View File

@@ -18,6 +18,11 @@ type SQLContext interface {
sqlx.Queryer
sqlx.Execer
sqlx.Ext
sqlx.Preparer

NamedQuery(query string, arg interface{}) (*sqlx.Rows, error)
NamedExec(query string, arg interface{}) (sql.Result, error)
PrepareNamed(query string) (*sqlx.NamedStmt, error)
}

func NewDB(cfg *config.Config) (*DB, error) {


+ 181
- 68
common/pkgs/db/object.go View File

@@ -2,6 +2,7 @@ package db

import (
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"
@@ -25,6 +26,23 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj
return ret.ToObject(), err
}

func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) {
// TODO In语句
stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes)
if err != nil {
return nil, err
}
stmt = ctx.Rebind(stmt)

objIDs := make([]cdssdk.ObjectID, 0, len(pathes))
err = sqlx.Select(ctx, &objIDs, stmt, args...)
if err != nil {
return nil, err
}

return objIDs, nil
}

func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) {
sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?)"

@@ -75,6 +93,15 @@ func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, p
return objID, false, nil
}

// 批量创建或者更新记录
func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error {
sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy)" +
" values(:PackageID,:Path,:Size,:FileHash,:Redundancy)" +
" on duplicate key update Size = values(Size), FileHash = values(FileHash), Redundancy = values(Redundancy)"

return BatchNamedExec(ctx, sql, 5, objs, nil)
}

func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) {
ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID)
if err != nil {
@@ -104,103 +131,189 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac

rets := make([]stgmod.ObjectDetail, 0, len(objs))

for _, obj := range objs {
var blocks []stgmod.ObjectBlock
err = sqlx.Select(ctx,
&blocks,
"select * from ObjectBlock where ObjectID = ? order by `Index`",
obj.ObjectID,
)
if err != nil {
return nil, err
var allBlocks []stgmod.ObjectBlock
err = sqlx.Select(ctx, &allBlocks, "select ObjectBlock.* from ObjectBlock, Object where PackageID = ? and ObjectBlock.ObjectID = Object.ObjectID order by ObjectBlock.ObjectID, `Index` asc", packageID)
if err != nil {
return nil, fmt.Errorf("getting all object blocks: %w", err)
}

var allPinnedObjs []cdssdk.PinnedObject
err = sqlx.Select(ctx, &allPinnedObjs, "select PinnedObject.* from PinnedObject, Object where PackageID = ? and PinnedObject.ObjectID = Object.ObjectID order by PinnedObject.ObjectID", packageID)
if err != nil {
return nil, fmt.Errorf("getting all pinned objects: %w", err)
}

blksCur := 0
pinnedsCur := 0
for _, temp := range objs {
detail := stgmod.ObjectDetail{
Object: temp.ToObject(),
}

var pinnedAt []cdssdk.NodeID
err = sqlx.Select(ctx, &pinnedAt, "select NodeID from PinnedObject where ObjectID = ?", obj.ObjectID)
if err != nil {
return nil, err
// 1. 查询Object和ObjectBlock时均按照ObjectID升序排序
// 2. ObjectBlock结果集中的不同ObjectID数只会比Object结果集的少
// 因此在两个结果集上同时从头开始遍历时,如果两边的ObjectID字段不同,那么一定是ObjectBlock这边的ObjectID > Object的ObjectID,
// 此时让Object的遍历游标前进,直到两边的ObjectID再次相等
for ; blksCur < len(allBlocks); blksCur++ {
if allBlocks[blksCur].ObjectID != temp.ObjectID {
break
}
detail.Blocks = append(detail.Blocks, allBlocks[blksCur])
}

rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), pinnedAt, blocks))
for ; pinnedsCur < len(allPinnedObjs); pinnedsCur++ {
if allPinnedObjs[pinnedsCur].ObjectID != temp.ObjectID {
break
}
detail.PinnedAt = append(detail.PinnedAt, allPinnedObjs[pinnedsCur].NodeID)
}

rets = append(rets, detail)
}

return rets, nil
}

func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) {
objIDs := make([]cdssdk.ObjectID, 0, len(objs))
for _, obj := range objs {
// 创建对象的记录
objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size, obj.FileHash)
if err != nil {
return nil, fmt.Errorf("creating object: %w", err)
}
func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) {
objs := make([]cdssdk.Object, 0, len(adds))
for _, add := range adds {
objs = append(objs, cdssdk.Object{
PackageID: packageID,
Path: add.Path,
Size: add.Size,
FileHash: add.FileHash,
Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
})
}

objIDs = append(objIDs, objID)
err := db.BatchCreateOrUpdate(ctx, objs)
if err != nil {
return nil, fmt.Errorf("batch create or update objects: %w", err)
}

if !isCreate {
// 删除原本所有的编码块记录,重新添加
if err = db.ObjectBlock().DeleteByObjectID(ctx, objID); err != nil {
return nil, fmt.Errorf("deleting all object block: %w", err)
}
pathes := make([]string, 0, len(adds))
for _, add := range adds {
pathes = append(pathes, add.Path)
}
objIDs, err := db.BatchGetPackageObjectIDs(ctx, packageID, pathes)
if err != nil {
return nil, fmt.Errorf("batch get object ids: %w", err)
}

// 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
if err = db.PinnedObject().DeleteByObjectID(ctx, objID); err != nil {
return nil, fmt.Errorf("deleting all pinned object: %w", err)
}
}
err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return nil, fmt.Errorf("batch delete object blocks: %w", err)
}

// 首次上传默认使用不分块的none模式
err = db.ObjectBlock().Create(ctx, objID, 0, obj.NodeID, obj.FileHash)
if err != nil {
return nil, fmt.Errorf("creating object block: %w", err)
}
err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return nil, fmt.Errorf("batch delete pinned objects: %w", err)
}

// 创建缓存记录
err = db.Cache().Create(ctx, obj.FileHash, obj.NodeID, 0)
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
}
objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
for i, add := range adds {
objBlocks = append(objBlocks, stgmod.ObjectBlock{
ObjectID: objIDs[i],
Index: 0,
NodeID: add.NodeID,
FileHash: add.FileHash,
})
}

err = db.ObjectBlock().BatchCreate(ctx, objBlocks)
if err != nil {
return nil, fmt.Errorf("batch create object blocks: %w", err)
}

caches := make([]model.Cache, 0, len(adds))
for _, add := range adds {
caches = append(caches, model.Cache{
FileHash: add.FileHash,
NodeID: add.NodeID,
CreateTime: time.Now(),
Priority: 0,
})
}

err = db.Cache().BatchCreate(ctx, caches)
if err != nil {
return nil, fmt.Errorf("batch create caches: %w", err)
}

return objIDs, nil
}

func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error {
objIDs := make([]cdssdk.ObjectID, 0, len(objs))
dummyObjs := make([]cdssdk.Object, 0, len(objs))
for _, obj := range objs {
_, err := ctx.Exec("update Object set Redundancy = ? where ObjectID = ?", obj.Redundancy, obj.ObjectID)
if err != nil {
return fmt.Errorf("updating object: %w", err)
}
objIDs = append(objIDs, obj.ObjectID)
dummyObjs = append(dummyObjs, cdssdk.Object{
ObjectID: obj.ObjectID,
Redundancy: obj.Redundancy,
})
}

// 删除原本所有的编码块记录,重新添加
if err = db.ObjectBlock().DeleteByObjectID(ctx, obj.ObjectID); err != nil {
return fmt.Errorf("deleting all object block: %w", err)
}
// 目前只能使用这种方式来同时更新大量数据
err := BatchNamedExec(ctx,
"insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy)"+
" values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy) as new"+
" on duplicate key update Redundancy=new.Redundancy", 6, dummyObjs, nil)
if err != nil {
return fmt.Errorf("batch update object redundancy: %w", err)
}

// 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
if err = db.PinnedObject().DeleteByObjectID(ctx, obj.ObjectID); err != nil {
return fmt.Errorf("deleting all pinned object: %w", err)
}
// 删除原本所有的编码块记录,重新添加
err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return fmt.Errorf("batch delete object blocks: %w", err)
}

for _, block := range obj.Blocks {
err = db.ObjectBlock().Create(ctx, obj.ObjectID, block.Index, block.NodeID, block.FileHash)
if err != nil {
return fmt.Errorf("creating object block: %w", err)
}
// 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return fmt.Errorf("batch delete pinned object: %w", err)
}

// 创建缓存记录
err = db.Cache().Create(ctx, block.FileHash, block.NodeID, 0)
if err != nil {
return fmt.Errorf("creating cache: %w", err)
}
blocks := make([]stgmod.ObjectBlock, 0, len(objs))
for _, obj := range objs {
blocks = append(blocks, obj.Blocks...)
}
err = db.ObjectBlock().BatchCreate(ctx, blocks)
if err != nil {
return fmt.Errorf("batch create object blocks: %w", err)
}

caches := make([]model.Cache, 0, len(objs))
for _, obj := range objs {
for _, blk := range obj.Blocks {
caches = append(caches, model.Cache{
FileHash: blk.FileHash,
NodeID: blk.NodeID,
CreateTime: time.Now(),
Priority: 0,
})
}
}
err = db.Cache().BatchCreate(ctx, caches)
if err != nil {
return fmt.Errorf("batch create object caches: %w", err)
}

err = db.PinnedObject().ObjectBatchCreate(ctx, obj.ObjectID, obj.PinnedAt)
if err != nil {
return fmt.Errorf("creating pinned object: %w", err)
pinneds := make([]cdssdk.PinnedObject, 0, len(objs))
for _, obj := range objs {
for _, p := range obj.PinnedAt {
pinneds = append(pinneds, cdssdk.PinnedObject{
ObjectID: obj.ObjectID,
NodeID: p,
CreateTime: time.Now(),
})
}
}
err = db.PinnedObject().BatchTryCreate(ctx, pinneds)
if err != nil {
return fmt.Errorf("batch create pinned objects: %w", err)
}

return nil
}


+ 13
- 2
common/pkgs/db/object_block.go View File

@@ -30,11 +30,12 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index
}

func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error {
_, err := sqlx.NamedExec(ctx,
return BatchNamedExec(ctx,
"insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)",
4,
blocks,
nil,
)
return err
}

func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
@@ -42,6 +43,16 @@ func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.Object
return err
}

func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error {
// TODO in语句有长度限制
query, args, err := sqlx.In("delete from ObjectBlock where ObjectID in (?)", objectIDs)
if err != nil {
return err
}
_, err = ctx.Exec(query, args...)
return err
}

func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
_, err := ctx.Exec("delete ObjectBlock from ObjectBlock inner join Object on ObjectBlock.ObjectID = Object.ObjectID where PackageID = ?", packageID)
return err


+ 1
- 1
common/pkgs/db/package.go View File

@@ -80,7 +80,7 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packag
func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) {
// 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误
var packageID int64
err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ?", name, bucketID)
err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ? for update", name, bucketID)
// 无错误代表存在记录
if err == nil {
return 0, fmt.Errorf("package with given Name and BucketID already exists")


+ 14
- 0
common/pkgs/db/pinned_object.go View File

@@ -39,6 +39,10 @@ func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID
return err
}

func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error {
return BatchNamedExec(ctx, "insert ignore into PinnedObject values(:NodeID,:ObjectID,:CreateTime)", 3, pinneds, nil)
}

func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error {
_, err := ctx.Exec(
"insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?",
@@ -69,6 +73,16 @@ func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID
return err
}

func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error {
// TODO in语句有长度限制
query, args, err := sqlx.In("delete from PinnedObject where ObjectID in (?)", objectIDs)
if err != nil {
return err
}
_, err = ctx.Exec(query, args...)
return err
}

func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
_, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ?", packageID)
return err


+ 75
- 0
common/pkgs/db/utils.go View File

@@ -0,0 +1,75 @@
package db

import (
"database/sql"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/utils/math"
)

const (
maxPlaceholderCount = 65535
)

func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(sql.Result) bool) error {
if argCnt == 0 {
ret, err := ctx.NamedExec(sql, arr)
if err != nil {
return err
}

if callback != nil {
callback(ret)
}

return nil
}

batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := math.Min(batchSize, len(arr))

ret, err := ctx.NamedExec(sql, arr[:curBatchSize])
if err != nil {
return nil
}
if callback != nil && !callback(ret) {
return nil
}

arr = arr[curBatchSize:]
}

return nil
}

func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(*sqlx.Rows) bool) error {
if argCnt == 0 {
ret, err := ctx.NamedQuery(sql, arr)
if err != nil {
return err
}

if callback != nil {
callback(ret)
}

return nil
}

batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := math.Min(batchSize, len(arr))

ret, err := ctx.NamedQuery(sql, arr[:curBatchSize])
if err != nil {
return nil
}
if callback != nil && !callback(ret) {
return nil
}

arr = arr[curBatchSize:]
}
return nil
}

+ 23
- 8
common/pkgs/grpc/agent/pool.go View File

@@ -2,6 +2,7 @@ package agent

import (
"fmt"
sync "sync"
)

type PoolConfig struct {
@@ -18,28 +19,42 @@ func (c *PoolClient) Close() {

type Pool struct {
grpcCfg *PoolConfig
shareds map[string]*PoolClient
lock sync.Mutex
}

func NewPool(grpcCfg *PoolConfig) *Pool {
return &Pool{
grpcCfg: grpcCfg,
shareds: make(map[string]*PoolClient),
}
}

// 获取一个GRPC客户端。由于事先不能知道所有agent的GRPC配置信息,所以只能让调用者把建立连接所需的配置都传递进来,
// Pool来决定要不要新建客户端。
func (p *Pool) Acquire(ip string, port int) (*PoolClient, error) {
cli, err := NewClient(fmt.Sprintf("%s:%d", ip, port))
if err != nil {
return nil, err
addr := fmt.Sprintf("%s:%d", ip, port)

p.lock.Lock()
defer p.lock.Unlock()

cli, ok := p.shareds[addr]
if !ok {
c, err := NewClient(addr)
if err != nil {
return nil, err
}
cli = &PoolClient{
Client: c,
owner: p,
}
p.shareds[addr] = cli
}

return &PoolClient{
Client: cli,
owner: p,
}, nil
return cli, nil

}

func (p *Pool) Release(cli *PoolClient) {
cli.Client.Close()
// TODO 释放长时间未使用的client
}

+ 56
- 25
common/pkgs/iterator/download_object_iterator.go View File

@@ -46,8 +46,11 @@ type DownloadObjectIterator struct {

objectDetails []stgmodels.ObjectDetail
currentIndex int
inited bool

downloadCtx *DownloadContext
coorCli *coormq.Client
allNodes map[cdssdk.NodeID]cdssdk.Node
}

func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator {
@@ -58,27 +61,60 @@ func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadC
}

func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
if !i.inited {
if err := i.init(); err != nil {
return nil, err
}

i.inited = true
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

if i.currentIndex >= len(i.objectDetails) {
return nil, ErrNoMoreItem
}

item, err := i.doMove(coorCli)
item, err := i.doMove()
i.currentIndex++
return item, err
}

func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
func (i *DownloadObjectIterator) init() error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
i.coorCli = coorCli

allNodeIDs := make(map[cdssdk.NodeID]bool)
for _, obj := range i.objectDetails {
for _, p := range obj.PinnedAt {
allNodeIDs[p] = true
}

for _, b := range obj.Blocks {
allNodeIDs[b.NodeID] = true
}
}

getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Keys(allNodeIDs)))
if err != nil {
return fmt.Errorf("getting nodes: %w", err)
}

i.allNodes = make(map[cdssdk.NodeID]cdssdk.Node)
for _, n := range getNodes.Nodes {
i.allNodes[n.NodeID] = n
}

return nil
}

func (iter *DownloadObjectIterator) doMove() (*IterDownloadingObject, error) {
obj := iter.objectDetails[iter.currentIndex]

switch red := obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj)
reader, err := iter.downloadNoneOrRepObject(obj)
if err != nil {
return nil, fmt.Errorf("downloading object: %w", err)
}
@@ -89,7 +125,7 @@ func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloa
}, nil

case *cdssdk.RepRedundancy:
reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj)
reader, err := iter.downloadNoneOrRepObject(obj)
if err != nil {
return nil, fmt.Errorf("downloading rep object: %w", err)
}
@@ -100,7 +136,7 @@ func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloa
}, nil

case *cdssdk.ECRedundancy:
reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red)
reader, err := iter.downloadECObject(obj, red)
if err != nil {
return nil, fmt.Errorf("downloading ec object: %w", err)
}
@@ -120,15 +156,15 @@ func (i *DownloadObjectIterator) Close() {
}
}

func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj)
func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(obj)
if err != nil {
return nil, err
}
bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
if bsc < osc {
return downloadFile(ctx, blocks[0].Node, blocks[0].Block.FileHash)
return downloadFile(iter.downloadCtx, blocks[0].Node, blocks[0].Block.FileHash)
}

// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
@@ -136,11 +172,11 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Clie
return nil, fmt.Errorf("no node has this object")
}

return downloadFile(ctx, *node, obj.Object.FileHash)
return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash)
}

func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj)
func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(obj)
if err != nil {
return nil, err
}
@@ -155,7 +191,7 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx
}

for i, b := range blocks {
str, err := downloadFile(ctx, b.Node, b.Block.FileHash)
str, err := downloadFile(iter.downloadCtx, b.Node, b.Block.FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
@@ -185,10 +221,10 @@ func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx
return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
}

return downloadFile(ctx, *node, obj.Object.FileHash)
return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash)
}

func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) {
func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) {
var nodeIDs []cdssdk.NodeID
for _, id := range obj.PinnedAt {
if !lo.Contains(nodeIDs, id) {
@@ -201,16 +237,11 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct
}
}

getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs))
if err != nil {
return nil, fmt.Errorf("getting nodes: %w", err)
}

downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo)
for _, id := range obj.PinnedAt {
node, ok := downloadNodeMap[id]
if !ok {
mod := *getNodes.GetNode(id)
mod := iter.allNodes[id]
node = &DownloadNodeInfo{
Node: mod,
ObjectPinned: true,
@@ -225,7 +256,7 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct
for _, b := range obj.Blocks {
node, ok := downloadNodeMap[b.NodeID]
if !ok {
mod := *getNodes.GetNode(b.NodeID)
mod := iter.allNodes[b.NodeID]
node = &DownloadNodeInfo{
Node: mod,
Distance: iter.getNodeDistance(mod),


+ 22
- 4
common/pkgs/mq/agent/client.go View File

@@ -1,6 +1,8 @@
package agent

import (
"sync"

"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
@@ -33,18 +35,34 @@ type Pool interface {
}

type pool struct {
mqcfg *stgmq.Config
mqcfg *stgmq.Config
shareds map[cdssdk.NodeID]*Client
lock sync.Mutex
}

func NewPool(mqcfg *stgmq.Config) Pool {
return &pool{
mqcfg: mqcfg,
mqcfg: mqcfg,
shareds: make(map[cdssdk.NodeID]*Client),
}
}
func (p *pool) Acquire(id cdssdk.NodeID) (*Client, error) {
return NewClient(id, p.mqcfg)
p.lock.Lock()
defer p.lock.Unlock()

cli, ok := p.shareds[id]
if !ok {
var err error
cli, err = NewClient(id, p.mqcfg)
if err != nil {
return nil, err
}
p.shareds[id] = cli
}

return cli, nil
}

func (p *pool) Release(cli *Client) {
cli.Close()
// TODO 定时关闭
}

+ 16
- 3
common/pkgs/mq/coordinator/client.go View File

@@ -1,6 +1,8 @@
package coordinator

import (
"sync"

"gitlink.org.cn/cloudream/common/pkgs/mq"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)
@@ -30,7 +32,9 @@ type Pool interface {
}

type pool struct {
mqcfg *stgmq.Config
mqcfg *stgmq.Config
shared *Client
lock sync.Mutex
}

func NewPool(mqcfg *stgmq.Config) Pool {
@@ -39,9 +43,18 @@ func NewPool(mqcfg *stgmq.Config) Pool {
}
}
func (p *pool) Acquire() (*Client, error) {
return NewClient(p.mqcfg)
p.lock.Lock()
defer p.lock.Unlock()
if p.shared == nil {
var err error
p.shared, err = NewClient(p.mqcfg)
if err != nil {
return nil, err
}
}

return p.shared, nil
}

func (p *pool) Release(cli *Client) {
cli.Close()
}

+ 2
- 2
common/pkgs/mq/coordinator/object.go View File

@@ -81,8 +81,8 @@ type ChangeObjectRedundancyResp struct {
mq.MessageBodyBase
}
type ChangeObjectRedundancyEntry struct {
ObjectID cdssdk.ObjectID `json:"objectID"`
Redundancy cdssdk.Redundancy `json:"redundancy"`
ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"`
Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"`
PinnedAt []cdssdk.NodeID `json:"pinnedAt"`
Blocks []stgmod.ObjectBlock `json:"blocks"`
}


+ 16
- 3
common/pkgs/mq/scanner/client.go View File

@@ -1,6 +1,8 @@
package scanner

import (
"sync"

"gitlink.org.cn/cloudream/common/pkgs/mq"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)
@@ -30,7 +32,9 @@ type Pool interface {
}

type pool struct {
mqcfg *stgmq.Config
mqcfg *stgmq.Config
shared *Client
lock sync.Mutex
}

func NewPool(mqcfg *stgmq.Config) Pool {
@@ -39,9 +43,18 @@ func NewPool(mqcfg *stgmq.Config) Pool {
}
}
func (p *pool) Acquire() (*Client, error) {
return NewClient(p.mqcfg)
p.lock.Lock()
defer p.lock.Unlock()
if p.shared == nil {
var err error
p.shared, err = NewClient(p.mqcfg)
if err != nil {
return nil, err
}
}

return p.shared, nil
}

func (p *pool) Release(cli *Client) {
cli.Close()
}

+ 1
- 1
scanner/internal/event/agent_check_cache.go View File

@@ -105,7 +105,7 @@ func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx *sqlx.Tx, realFi
}

if len(realFileHashesCp) > 0 {
err = execCtx.Args.DB.Cache().BatchCreate(tx, lo.Keys(realFileHashesCp), t.NodeID, 0)
err = execCtx.Args.DB.Cache().BatchCreateOnSameNode(tx, lo.Keys(realFileHashesCp), t.NodeID, 0)
if err != nil {
log.Warnf("batch create node caches: %w", err)
return


Loading…
Cancel
Save