Browse Source

修复调试问题

gitlink
Sydonian 2 years ago
parent
commit
8f2e26b18f
12 changed files with 59 additions and 21 deletions
  1. +6
    -4
      client/internal/cmdline/package.go
  2. +0
    -0
      client/internal/task/create_package.go
  3. +0
    -0
      client/internal/task/update_package.go
  4. +2
    -2
      common/assets/scripts/create_database.sql
  5. +0
    -0
      common/pkgs/cmd/create_package.go
  6. +0
    -0
      common/pkgs/cmd/update_package.go
  7. +2
    -2
      common/pkgs/db/cache.go
  8. +35
    -0
      common/pkgs/db/model/model.go
  9. +6
    -5
      common/pkgs/db/object.go
  10. +5
    -5
      common/pkgs/db/object_block.go
  11. +2
    -2
      coordinator/internal/services/object.go
  12. +1
    -1
      coordinator/internal/services/package.go

+ 6
- 4
client/internal/cmdline/package.go View File

@@ -33,7 +33,7 @@ func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) err
return nil
}

func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID cdssdk.PackageID) error {
func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error {
err := os.MkdirAll(outputDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err)
@@ -86,7 +86,7 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID cdss
return nil
}

func PackageCreatePackage(ctx CommandContext, rootPath string, bucketID cdssdk.BucketID, name string, nodeAffinity []cdssdk.NodeID) error {
func PackageCreatePackage(ctx CommandContext, name string, rootPath string, bucketID cdssdk.BucketID, nodeAffinity []cdssdk.NodeID) error {
rootPath = filepath.Clean(rootPath)

var uploadFilePathes []string
@@ -194,7 +194,8 @@ func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error
return nil
}

func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error {
userID := cdssdk.UserID(0)
resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID)
fmt.Printf("resp: %v\n", resp)
if err != nil {
@@ -203,7 +204,8 @@ func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID, userI
return nil
}

func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID) error {
userID := cdssdk.UserID(0)
nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID)
fmt.Printf("nodeIDs: %v\n", nodeIDs)
if err != nil {


client/internal/task/create_ec_package.go → client/internal/task/create_package.go View File


client/internal/task/update_ec_package.go → client/internal/task/update_package.go View File


+ 2
- 2
common/assets/scripts/create_database.sql View File

@@ -112,8 +112,7 @@ create table Package (
PackageID int not null auto_increment primary key comment '包ID',
Name varchar(100) not null comment '对象名',
BucketID int not null comment '桶ID',
State varchar(100) not null comment '状态',
Redundancy JSON not null comment '冗余策略'
State varchar(100) not null comment '状态'
);

create table Object (
@@ -122,6 +121,7 @@ create table Object (
Path varchar(500) not null comment '对象路径',
Size bigint not null comment '对象大小(Byte)',
FileHash varchar(100) not null comment '完整对象的FileHash',
Redundancy JSON not null comment '冗余策略',
UNIQUE KEY PackagePath (PackageID, Path)
) comment = '对象表';



common/pkgs/cmd/create_ec_package.go → common/pkgs/cmd/create_package.go View File


common/pkgs/cmd/update_ec_package.go → common/pkgs/cmd/update_package.go View File


+ 2
- 2
common/pkgs/db/cache.go View File

@@ -37,7 +37,7 @@ func (*CacheDB) GetNodeCaches(ctx SQLContext, nodeID cdssdk.NodeID) ([]model.Cac

// CreateNew 创建一条新的缓存记录
func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error {
_, err := ctx.Exec("insert into Cache values(?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, nil, time.Now())
_, err := ctx.Exec("insert into Cache values(?,?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, nil, time.Now(), 0)
if err != nil {
return err
}
@@ -91,7 +91,7 @@ func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID cd

// Create 创建一条Temp状态的缓存记录,如果已存在则不产生效果
func (*CacheDB) CreateTemp(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error {
_, err := ctx.Exec("insert ignore into Cache values(?,?,?,?,?)", fileHash, nodeID, nil, consts.CacheStateTemp, time.Now())
_, err := ctx.Exec("insert ignore into Cache values(?,?,?,?,?,?)", fileHash, nodeID, consts.CacheStateTemp, nil, time.Now(), 0)
return err
}



+ 35
- 0
common/pkgs/db/model/model.go View File

@@ -1,9 +1,12 @@
package model

import (
"fmt"
"reflect"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

@@ -65,6 +68,38 @@ type Package = cdssdk.Package

type Object = cdssdk.Object

// 由于Object的Redundancy字段是interface,所以不能直接将查询结果scan成Object,必须先scan成TempObject,
// 再.ToObject()转成Object
type TempObject struct {
cdssdk.Object
Redundancy RedundancyWarpper `db:"Redundancy"`
}

func (o *TempObject) ToObject() cdssdk.Object {
obj := o.Object
obj.Redundancy = o.Redundancy.Value
return obj
}

type RedundancyWarpper struct {
Value cdssdk.Redundancy
}

func (o *RedundancyWarpper) Scan(src interface{}) error {
data, ok := src.([]uint8)
if !ok {
return fmt.Errorf("unknow src type: %v", reflect.TypeOf(data))
}

red, err := serder.JSONToObjectEx[cdssdk.Redundancy](data)
if err != nil {
return err
}

o.Value = red
return nil
}

type ObjectBlock = stgmod.ObjectBlock

type Cache struct {


+ 6
- 5
common/pkgs/db/object.go View File

@@ -4,6 +4,7 @@ import (
"fmt"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
@@ -18,9 +19,9 @@ func (db *DB) Object() *ObjectDB {
}

func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) {
var ret model.Object
var ret model.TempObject
err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID)
return ret, err
return ret.ToObject(), err
}

func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) {
@@ -88,9 +89,9 @@ func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSi
}

func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) {
var ret []model.Object
var ret []model.TempObject
err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
return ret, err
return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err
}

func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectInfo) ([]cdssdk.ObjectID, error) {
@@ -112,7 +113,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []
}

// 首次上传默认使用不分块的rep模式
err = db.ObjectBlock().Create(ctx, objID, 0, obj.FileHash, obj.NodeID)
err = db.ObjectBlock().Create(ctx, objID, 0, obj.NodeID, obj.FileHash)
if err != nil {
return nil, fmt.Errorf("creating object block: %w", err)
}


+ 5
- 5
common/pkgs/db/object_block.go View File

@@ -20,8 +20,8 @@ func (db *DB) ObjectBlock() *ObjectBlockDB {
return &ObjectBlockDB{DB: db}
}

func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, fileHash string, nodeID cdssdk.NodeID) error {
_, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, fileHash, nodeID)
func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, nodeID cdssdk.NodeID, fileHash string) error {
_, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, nodeID, fileHash)
return err
}

@@ -50,10 +50,10 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in
}

func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
var objs []model.Object
var objs []model.TempObject
err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
if err != nil {
return nil, fmt.Errorf("query objectIDs: %w", err)
return nil, fmt.Errorf("getting objects: %w", err)
}

rets := make([]stgmod.ObjectDetail, 0, len(objs))
@@ -101,7 +101,7 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk
blocks = append(blocks, block)
}

rets = append(rets, stgmod.NewObjectDetail(obj, cachedObjectNodeIDs, blocks))
rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), cachedObjectNodeIDs, blocks))
}

return rets, nil


+ 2
- 2
coordinator/internal/services/object.go View File

@@ -24,9 +24,9 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails)
data, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("query object ec and node id in package: %s", err.Error())
Warnf("getting package block details: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "query object ec and node id in package failed")
return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed")
}

return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(data))


+ 1
- 1
coordinator/internal/services/package.go View File

@@ -73,7 +73,7 @@ func (svc *Service) UpdateECPackage(msg *coormq.UpdatePackage) (*coormq.UpdatePa
})
if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed")
return nil, mq.Failed(errorcode.OperationFailed, "update package failed")
}

return mq.ReplyOK(coormq.NewUpdatePackageResp())


Loading…
Cancel
Save