Browse Source

完善下载Object接口

gitlink
Sydonian 2 years ago
parent
commit
dbf33cd7f2
5 changed files with 190 additions and 12 deletions
  1. +27
    -1
      client/internal/services/object.go
  2. +20
    -0
      common/pkgs/db/object_block.go
  3. +20
    -0
      common/pkgs/db/pinned_object.go
  4. +28
    -0
      common/pkgs/mq/coordinator/object.go
  5. +95
    -11
      coordinator/internal/mq/object.go

+ 27
- 1
client/internal/services/object.go View File

@@ -8,6 +8,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
mytask "gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
@@ -51,7 +52,32 @@ func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.Up
}

func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) {
panic("not implement yet!")
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

resp, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objectID}))
if err != nil {
return nil, fmt.Errorf("requesting to coordinator")
}

if resp.Objects[0] == nil {
return nil, fmt.Errorf("object not found")
}

iter := iterator.NewDownloadObjectIterator([]stgmod.ObjectDetail{*resp.Objects[0]}, &iterator.DownloadContext{
Distlock: svc.DistLock,
})
defer iter.Close()

downloading, err := iter.MoveNext()
if err != nil {
return nil, err
}

return downloading.File, nil
}

func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) error {


+ 20
- 0
common/pkgs/db/object_block.go View File

@@ -24,6 +24,26 @@ func (db *ObjectBlockDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]st
return rets, err
}

func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectBlock, error) {
if len(objectIDs) == 0 {
return nil, nil
}

stmt, args, err := sqlx.In("select * from ObjectBlock where ObjectID in (?) order by ObjectID, `Index` asc", objectIDs)
if err != nil {
return nil, err
}
stmt = ctx.Rebind(stmt)

var blocks []stgmod.ObjectBlock
err = sqlx.Select(ctx, &blocks, stmt, args...)
if err != nil {
return nil, err
}

return blocks, nil
}

func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, nodeID cdssdk.NodeID, fileHash string) error {
_, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, nodeID, fileHash)
return err


+ 20
- 0
common/pkgs/db/pinned_object.go View File

@@ -34,6 +34,26 @@ func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cds
return err
}

func (*PinnedObjectDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.PinnedObject, error) {
if len(objectIDs) == 0 {
return nil, nil
}

stmt, args, err := sqlx.In("select * from PinnedObject where ObjectID in (?) order by ObjectID asc", objectIDs)
if err != nil {
return nil, err
}
stmt = ctx.Rebind(stmt)

var pinneds []cdssdk.PinnedObject
err = sqlx.Select(ctx, &pinneds, stmt, args...)
if err != nil {
return nil, err
}

return pinneds, nil
}

func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error {
_, err := ctx.Exec("insert ignore into PinnedObject values(?,?,?)", nodeID, objectID, createTime)
return err


+ 28
- 0
common/pkgs/mq/coordinator/object.go View File

@@ -13,6 +13,8 @@ type ObjectService interface {

GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage)

GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, *mq.CodeMessage)

UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, *mq.CodeMessage)

UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage)
@@ -74,6 +76,32 @@ func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*Ge
return mq.Request(Service.GetPackageObjectDetails, client.rabbitCli, msg)
}

// 获取多个Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序。
var _ = Register(Service.GetObjectDetails)

type GetObjectDetails struct {
mq.MessageBodyBase
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
}
type GetObjectDetailsResp struct {
mq.MessageBodyBase
Objects []*stgmod.ObjectDetail `json:"objects"` // 如果没有查询到某个ID对应的信息,则此数组对应位置为nil
}

func ReqGetObjectDetails(objectIDs []cdssdk.ObjectID) *GetObjectDetails {
return &GetObjectDetails{
ObjectIDs: objectIDs,
}
}
func RespGetObjectDetails(objects []*stgmod.ObjectDetail) *GetObjectDetailsResp {
return &GetObjectDetailsResp{
Objects: objects,
}
}
func (client *Client) GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, error) {
return mq.Request(Service.GetObjectDetails, client.rabbitCli, msg)
}

// 更新Object的冗余方式
var _ = Register(Service.UpdateObjectRedundancy)



+ 95
- 11
coordinator/internal/mq/object.go View File

@@ -54,6 +54,92 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails)
return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details))
}

func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) {
details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs))
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
var err error

msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs)

// 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并
objs, err := svc.db.Object().BatchGet(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch get objects: %w", err)
}

objIDIdx := 0
objIdx := 0
for objIDIdx < len(msg.ObjectIDs) && objIdx < len(objs) {
if msg.ObjectIDs[objIDIdx] < objs[objIdx].ObjectID {
objIDIdx++
continue
}

// 由于是使用msg.ObjectIDs去查询Object,因此不存在msg.ObjectIDs > Object.ObjectID的情况,
// 下面同理
obj := stgmod.ObjectDetail{
Object: objs[objIDIdx],
}
details[objIDIdx] = &obj
objIdx++
}

// 查询合并
blocks, err := svc.db.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch get object blocks: %w", err)
}

objIDIdx = 0
blkIdx := 0
for objIDIdx < len(msg.ObjectIDs) && blkIdx < len(blocks) {
if details[objIDIdx] == nil {
objIDIdx++
continue
}

if msg.ObjectIDs[objIDIdx] < blocks[blkIdx].ObjectID {
objIDIdx++
continue
}

details[objIDIdx].Blocks = append(details[objIDIdx].Blocks, blocks[blkIdx])
blkIdx++
}

// 查询合并
pinneds, err := svc.db.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch get pinned objects: %w", err)
}

objIDIdx = 0
pinIdx := 0
for objIDIdx < len(msg.ObjectIDs) && pinIdx < len(pinneds) {
if details[objIDIdx] == nil {
objIDIdx++
continue
}

if msg.ObjectIDs[objIDIdx] < pinneds[pinIdx].ObjectID {
objIDIdx++
continue
}

details[objIDIdx].PinnedAt = append(details[objIDIdx].PinnedAt, pinneds[pinIdx].NodeID)
pinIdx++
}
return nil
})

if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get object details failed")
}

return mq.ReplyOK(coormq.RespGetObjectDetails(details))
}

func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
return svc.db.Object().BatchUpdateRedundancy(tx, msg.Updatings)
@@ -136,19 +222,17 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up

func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.ObjectID) {
objIdx := 0
IDIdx := 0
idIdx := 0

for IDIdx < len(objIDs) {
if objs[objIdx].ObjectID == objIDs[IDIdx] {
pickedObjs = append(pickedObjs, objs[objIdx])
IDIdx++
objIdx++
} else if objs[objIdx].ObjectID < objIDs[IDIdx] {
objIdx++
} else {
notFoundObjs = append(notFoundObjs, objIDs[IDIdx])
IDIdx++
for idIdx < len(objIDs) && objIdx < len(objs) {
if objIDs[idIdx] < objs[objIdx].ObjectID {
notFoundObjs = append(notFoundObjs, objIDs[idIdx])
idIdx++
continue
}

pickedObjs = append(pickedObjs, objs[objIdx])
objIdx++
}

return


Loading…
Cancel
Save