Browse Source

增加获取package里所有object的filehash的接口

gitlink
Sydonian 2 years ago
parent
commit
4d64287421
13 changed files with 226 additions and 45 deletions
  1. +1
    -1
      agent/internal/task/cache_move_package.go
  2. +27
    -0
      client/internal/http/cacah.go
  3. +29
    -0
      client/internal/http/package.go
  4. +2
    -0
      client/internal/http/server.go
  5. +16
    -0
      client/internal/services/cacah.go
  6. +23
    -1
      client/internal/services/object.go
  7. +15
    -0
      client/internal/services/package.go
  8. +4
    -13
      common/pkgs/db/model/model.go
  9. +22
    -0
      common/pkgs/db/object_rep.go
  10. +31
    -0
      common/pkgs/mq/coordinator/cache.go
  11. +31
    -0
      common/pkgs/mq/coordinator/object.go
  12. +0
    -30
      common/pkgs/mq/coordinator/package.go
  13. +25
    -0
      coordinator/internal/services/object.go

+ 1
- 1
agent/internal/task/cache_move_package.go View File

@@ -96,7 +96,7 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.Client, pkg
}

fileHashes = append(fileHashes, rep.FileHash)
t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object.ObjectID, rep.FileHash))
t.ResultCacheInfos = append(t.ResultCacheInfos, stgsdk.NewObjectCacheInfo(rep.Object, rep.FileHash))
}

_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *stgglb.Local.NodeID, fileHashes))


+ 27
- 0
client/internal/http/cacah.go View File

@@ -68,3 +68,30 @@ func (s *CacheService) MovePackage(ctx *gin.Context) {
}
}
}

type CacheGetPackageObjectCacheInfosReq struct {
UserID *int64 `form:"userID" binding:"required"`
PackageID *int64 `form:"packageID" binding:"required"`
}

type CacheGetPackageObjectCacheInfosResp = stgsdk.GetPackageObjectCacheInfosResp

func (s *CacheService) GetPackageObjectCacheInfos(ctx *gin.Context) {
log := logger.WithField("HTTP", "Cache.GetPackageObjectCacheInfos")

var req CacheGetPackageObjectCacheInfosReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

infos, err := s.svc.CacheSvc().GetPackageObjectCacheInfos(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("getting package object cache infos: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object cache infos failed"))
return
}

ctx.JSON(http.StatusOK, OK(CacheGetPackageObjectCacheInfosResp{Infos: infos}))
}

+ 29
- 0
client/internal/http/package.go View File

@@ -11,6 +11,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

@@ -24,6 +25,34 @@ func (s *Server) PackageSvc() *PackageService {
}
}

type PackageGetReq struct {
UserID *int64 `form:"userID" binding:"required"`
PackageID *int64 `form:"packageID" binding:"required"`
}
type PackageGetResp struct {
model.Package
}

func (s *PackageService) Get(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.Get")

var req PackageGetReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

pkg, err := s.svc.PackageSvc().Get(*req.UserID, *req.PackageID)
if err != nil {
log.Warnf("getting package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed"))
return
}

ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg}))
}

type PackageUploadReq struct {
Info PackageUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`


+ 2
- 0
client/internal/http/server.go View File

@@ -40,6 +40,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() {
s.engine.GET("/object/download", s.ObjectSvc().Download)

s.engine.GET("/package/get", s.PackageSvc().Get)
s.engine.POST("/package/upload", s.PackageSvc().Upload)
s.engine.POST("/package/delete", s.PackageSvc().Delete)
s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes)
@@ -49,4 +50,5 @@ func (s *Server) initRouters() {
s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage)

s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage)
s.engine.GET("/cache/getPackageObjectCacheInfos", s.CacheSvc().GetPackageObjectCacheInfos)
}

+ 16
- 0
client/internal/services/cacah.go View File

@@ -8,6 +8,7 @@ import (

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type CacheService struct {
@@ -55,3 +56,18 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitT

return true, waitResp.CacheInfos, nil
}

func (svc *CacheService) GetPackageObjectCacheInfos(userID int64, packageID int64) ([]stgsdk.ObjectCacheInfo, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetPackageObjectCacheInfos(coormq.NewGetPackageObjectCacheInfos(userID, packageID))
if err != nil {
return nil, fmt.Errorf("requesting to coodinator: %w", err)
}

return getResp.Infos, nil
}

+ 23
- 1
client/internal/services/object.go View File

@@ -1,6 +1,13 @@
package services

import "io"
import (
"fmt"
"io"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type ObjectService struct {
*Service
@@ -13,3 +20,18 @@ func (svc *Service) ObjectSvc() *ObjectService {
func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) {
panic("not implement yet!")
}

func (svc *ObjectService) GetPackageObjects(userID int64, packageID int64) ([]model.Object, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return getResp.Objects, nil
}

+ 15
- 0
client/internal/services/package.go View File

@@ -23,6 +23,21 @@ func (svc *Service) PackageSvc() *PackageService {
return &PackageService{Service: svc}
}

func (svc *PackageService) Get(userID int64, packageID int64) (*model.Package, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return &getResp.Package, nil
}

func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {


+ 4
- 13
common/pkgs/db/model/model.go View File

@@ -6,6 +6,8 @@ import (
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

// TODO 可以考虑逐步迁移到stgsdk中。迁移思路:数据对象应该包含的字段都迁移到stgsdk中,内部使用的一些特殊字段则留在这里

type Node struct {
NodeID int64 `db:"NodeID" json:"nodeID"`
Name string `db:"Name" json:"name"`
@@ -56,20 +58,9 @@ type Bucket struct {
CreatorID int64 `db:"CreatorID" json:"creatorID"`
}

type Package struct {
PackageID int64 `db:"PackageID" json:"packageID"`
Name string `db:"Name" json:"name"`
BucketID int64 `db:"BucketID" json:"bucketID"`
State string `db:"State" json:"state"`
Redundancy stgsdk.TypedRedundancyInfo `db:"Redundancy" json:"redundancy"`
}
type Package = stgsdk.Package

type Object struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
PackageID int64 `db:"PackageID" json:"packageID"`
Path string `db:"Path" json:"path"`
Size int64 `db:"Size" json:"size,string"`
}
type Object = stgsdk.Object

type ObjectRep struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`


+ 22
- 0
common/pkgs/db/object_rep.go View File

@@ -7,6 +7,7 @@ import (
"strings"

"github.com/jmoiron/sqlx"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/consts"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
@@ -116,6 +117,27 @@ func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) (
return rets, nil
}

func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int64) ([]stgsdk.ObjectCacheInfo, error) {
var tmpRet []struct {
stgsdk.Object
FileHash string `db:"FileHash"`
}

err := sqlx.Select(ctx, &tmpRet, "select Object.*, ObjectRep.FileHash from Object"+
" left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+
" where Object.PackageID = ? order by Object.ObjectID asc", packageID)
if err != nil {
return nil, err
}

ret := make([]stgsdk.ObjectCacheInfo, len(tmpRet))
for i, r := range tmpRet {
ret[i] = stgsdk.NewObjectCacheInfo(r.Object, r.FileHash)
}

return ret, nil
}

// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
func splitIDStringUnsafe(idStr string) []int64 {


+ 31
- 0
common/pkgs/mq/coordinator/cache.go View File

@@ -2,10 +2,13 @@ package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type CacheService interface {
CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage)

GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, *mq.CodeMessage)
}

// Package的Object移动到了节点的Cache中
@@ -34,3 +37,31 @@ func NewCachePackageMovedResp() *CachePackageMovedResp {
func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) {
return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg)
}

// 获取Package中所有Object的FileHash
var _ = Register(Service.GetPackageObjectCacheInfos)

type GetPackageObjectCacheInfos struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type GetPackageObjectCacheInfosResp struct {
mq.MessageBodyBase
Infos []stgsdk.ObjectCacheInfo
}

func NewGetPackageObjectCacheInfos(userID int64, packageID int64) *GetPackageObjectCacheInfos {
return &GetPackageObjectCacheInfos{
UserID: userID,
PackageID: packageID,
}
}
func NewGetPackageObjectCacheInfosResp(infos []stgsdk.ObjectCacheInfo) *GetPackageObjectCacheInfosResp {
return &GetPackageObjectCacheInfosResp{
Infos: infos,
}
}
func (client *Client) GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, error) {
return mq.Request(Service.GetPackageObjectCacheInfos, client.rabbitCli, msg)
}

+ 31
- 0
common/pkgs/mq/coordinator/object.go View File

@@ -4,14 +4,45 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq"

stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type ObjectService interface {
GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage)

GetPackageObjectRepData(msg *GetPackageObjectRepData) (*GetPackageObjectRepDataResp, *mq.CodeMessage)

GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, *mq.CodeMessage)
}

// 查询Package中的所有Object,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjects)

type GetPackageObjects struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type GetPackageObjectsResp struct {
mq.MessageBodyBase
Objects []model.Object `json:"objects"`
}

func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects {
return &GetPackageObjects{
UserID: userID,
PackageID: packageID,
}
}
func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp {
return &GetPackageObjectsResp{
Objects: objects,
}
}
func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) {
return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg)
}

// 获取指定Object的Rep数据,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjectRepData)



+ 0
- 30
common/pkgs/mq/coordinator/package.go View File

@@ -10,8 +10,6 @@ import (
type PackageService interface {
GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage)

GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage)

CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage)

UpdateRepPackage(msg *UpdateRepPackage) (*UpdateRepPackageResp, *mq.CodeMessage)
@@ -53,34 +51,6 @@ func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) {
return mq.Request(Service.GetPackage, client.rabbitCli, msg)
}

// 查询Package中的所有Object,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjects)

type GetPackageObjects struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type GetPackageObjectsResp struct {
mq.MessageBodyBase
Objects []model.Object `json:"objects"`
}

func NewGetPackageObjects(userID int64, packageID int64) *GetPackageObjects {
return &GetPackageObjects{
UserID: userID,
PackageID: packageID,
}
}
func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp {
return &GetPackageObjectsResp{
Objects: objects,
}
}
func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) {
return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg)
}

// 创建一个Package
var _ = Register(Service.CreatePackage)



+ 25
- 0
coordinator/internal/services/object.go View File

@@ -7,6 +7,31 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *Service) GetPackageObjectCacheInfos(msg *coormq.GetPackageObjectCacheInfos) (*coormq.GetPackageObjectCacheInfosResp, *mq.CodeMessage) {
pkg, err := svc.db.Package().GetUserPackage(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("getting package: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

if pkg.Redundancy.IsRepInfo() {
infos, err := svc.db.ObjectRep().GetPackageObjectCacheInfos(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("getting rep package object cache infos: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get rep package object cache infos failed")
}

return mq.ReplyOK(coormq.NewGetPackageObjectCacheInfosResp(infos))
}
// TODO EC

return nil, mq.Failed(errorcode.OperationFailed, "not implement yet")
}

func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) {
data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
if err != nil {


Loading…
Cancel
Save