Browse Source

Merge branch 'master' into feature_gxh

gitlink
Sydonian 8 months ago
parent
commit
417f9bfd9c
7 changed files with 115 additions and 83 deletions
  1. +18
    -0
      .devops/基于gitlink构建.yml
  2. +17
    -11
      client/internal/http/object.go
  3. +5
    -4
      client/internal/http/presigned.go
  4. +5
    -5
      client/internal/services/object.go
  5. +35
    -26
      common/pkgs/db2/object.go
  6. +14
    -24
      common/pkgs/mq/coordinator/object.go
  7. +21
    -13
      coordinator/internal/mq/object.go

+ 18
- 0
.devops/基于gitlink构建.yml View File

@@ -98,6 +98,7 @@ workflow:
task: end
needs:
- ssh_cmd_0
- dockerBuild_0
- ref: docker_image_build_0
name: agent镜像构建
cache:
@@ -137,4 +138,21 @@ workflow:
needs:
- git_clone_0
- git_clone_1
- ref: dockerBuild_0
name: Docker镜像构建(kaylee)
task: kaylee/dockerBuild@1.0.0
input:
context: git_clone_0.git_path
dockerfile: '"Dockerfile"'
docker_build_tags: '"latest"'
docker_build_args: '"TARGET=agentimage"'
docker_login_registry: '""'
docker_login_username: ((docker_registry.registry_user))
docker_login_password: ((docker_registry.registry_password))
docker_run_image: '""'
docker_run_name: '""'
docker_run_network: '""'
docker_run_volumes: '""'
needs:
- shell_0


+ 17
- 11
client/internal/http/object.go View File

@@ -39,14 +39,14 @@ func (s *ObjectService) ListByPath(ctx *gin.Context) {
return
}

coms, objs, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, req.IsPrefix, req.NoRecursive)
resp, err := s.svc.ObjectSvc().GetByPath(req)
if err != nil {
log.Warnf("listing objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("listing objects: %v", err)))
return
}

ctx.JSON(http.StatusOK, OK(cdsapi.ObjectListByPathResp{CommonPrefixes: coms, Objects: objs}))
ctx.JSON(http.StatusOK, OK(resp))
}

func (s *ObjectService) ListByIDs(ctx *gin.Context) {
@@ -191,14 +191,16 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) {
return
}

_, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false)
resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{
UserID: req.UserID, PackageID: req.PackageID, Path: req.Path,
})
if err != nil {
log.Warnf("getting object by path: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed"))
return
}

if len(obj) == 0 {
if len(resp.Objects) == 0 {
log.Warnf("object not found: %s", req.Path)
ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found"))
return
@@ -211,7 +213,7 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) {
}

file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{
ObjectID: obj[0].ObjectID,
ObjectID: resp.Objects[0].ObjectID,
Offset: off,
Length: len,
})
@@ -266,20 +268,22 @@ func (s *ObjectService) UpdateInfoByPath(ctx *gin.Context) {
return
}

_, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, true, false)
resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{
UserID: req.UserID, PackageID: req.PackageID, Path: req.Path,
})
if err != nil {
log.Warnf("getting object by path: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed"))
return
}
if len(obj) == 0 {
if len(resp.Objects) == 0 {
log.Warnf("object not found: %s", req.Path)
ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found"))
return
}

sucs, err := s.svc.ObjectSvc().UpdateInfo(req.UserID, []cdsapi.UpdatingObject{{
ObjectID: obj[0].ObjectID,
ObjectID: resp.Objects[0].ObjectID,
UpdateTime: req.UpdateTime,
}})
if err != nil {
@@ -343,18 +347,20 @@ func (s *ObjectService) DeleteByPath(ctx *gin.Context) {
return
}

_, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false)
resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{
UserID: req.UserID, PackageID: req.PackageID, Path: req.Path,
})
if err != nil {
log.Warnf("getting object by path: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed"))
return
}
if len(obj) == 0 {
if len(resp.Objects) == 0 {
ctx.JSON(http.StatusOK, OK(nil))
return
}

err = s.svc.ObjectSvc().Delete(req.UserID, []cdssdk.ObjectID{obj[0].ObjectID})
err = s.svc.ObjectSvc().Delete(req.UserID, []cdssdk.ObjectID{resp.Objects[0].ObjectID})
if err != nil {
log.Warnf("deleting objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed"))


+ 5
- 4
client/internal/http/presigned.go View File

@@ -37,14 +37,15 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) {
return
}

_, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false)
resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{
UserID: req.UserID, PackageID: req.PackageID, Path: req.Path,
})
if err != nil {
log.Warnf("getting object by path: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed"))
return
}

if len(obj) == 0 {
if len(resp.Objects) == 0 {
log.Warnf("object not found: %s", req.Path)
ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found"))
return
@@ -57,7 +58,7 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) {
}

file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{
ObjectID: obj[0].ObjectID,
ObjectID: resp.Objects[0].ObjectID,
Offset: off,
Length: len,
})


+ 5
- 5
client/internal/services/object.go View File

@@ -26,19 +26,19 @@ func (svc *Service) ObjectSvc() *ObjectService {
return &ObjectService{Service: svc}
}

func (svc *ObjectService) GetByPath(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, isPrefix bool, noRecursive bool) ([]string, []cdssdk.Object, error) {
func (svc *ObjectService) GetByPath(req cdsapi.ObjectListByPath) (cdsapi.ObjectListByPathResp, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, nil, fmt.Errorf("new coordinator client: %w", err)
return cdsapi.ObjectListByPathResp{}, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

listResp, err := coorCli.GetObjectsByPath(coormq.ReqGetObjectsByPath(userID, pkgID, path, isPrefix, noRecursive))
listResp, err := coorCli.ListObjectsByPath(coormq.ReqListObjectsByPath(req))
if err != nil {
return nil, nil, fmt.Errorf("requsting to coodinator: %w", err)
return cdsapi.ObjectListByPathResp{}, fmt.Errorf("requsting to coodinator: %w", err)
}

return listResp.CommonPrefixes, listResp.Objects, nil
return listResp.ObjectListByPathResp, nil
}

func (svc *ObjectService) GetByIDs(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) ([]*cdssdk.Object, error) {


+ 35
- 26
common/pkgs/db2/object.go View File

@@ -49,42 +49,51 @@ func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID
return ret, err
}

func (db *ObjectDB) GetCommonPrefixes(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]string, error) {
var ret []string
// 查询结果将按照Path升序,而不是ObjectID升序
func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path > ? AND Path LIKE ?", packageID, startPath, pathPrefix+"%").Order("Path ASC").Limit(limit).Find(&ret).Error
return ret, err
}

func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) {
type ObjectOrDir struct {
cdssdk.Object
IsObject bool `gorm:"IsObject"`
Prefix string `gorm:"Prefix"`
}

sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1

prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)

err := ctx.Table("Object").Select(prefixStatm+" as Prefix").
grouping := ctx.Table("Object").
Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)).
Where("PackageID = ?", packageID).
Where("Path like ?", escapeLike("", "%", pathPrefix)).
Where(prefixStatm + " <> Path").
Group("Prefix").Find(&ret).Error
Where("Path like ?", pathPrefix+"%").
Group("Prefix, IsObject").
Having("Prefix > ?", startPath).
Limit(limit).
Order("Prefix ASC")

var ret []ObjectOrDir
err = ctx.Table("Object").
Select("Grouped.IsObject, Grouped.Prefix, Object.*").
Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping).
Find(&ret).Error
if err != nil {
return nil, err
return
}

for i := range ret {
ret[i] = ret[i] + cdssdk.ObjectPathSeparator
for _, o := range ret {
if o.IsObject {
objs = append(objs, o.Object)
} else {
commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator)
}
nextStartPath = o.Prefix
}

return ret, nil
}

func (db *ObjectDB) GetDirectChildren(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) {
var ret []cdssdk.Object

sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1

prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)

err := ctx.Table("Object").
Where("PackageID = ?", packageID).
Where("Path like ?", escapeLike("", "%", pathPrefix)).
Where(prefixStatm + " = Path").
Find(&ret).Error
return ret, err
return
}

func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (bool, error) {


+ 14
- 24
common/pkgs/mq/coordinator/object.go View File

@@ -12,7 +12,7 @@ import (
type ObjectService interface {
GetObjects(msg *GetObjects) (*GetObjectsResp, *mq.CodeMessage)

GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, *mq.CodeMessage)
ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, *mq.CodeMessage)

GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage)

@@ -67,39 +67,29 @@ func (client *Client) GetObjects(msg *GetObjects) (*GetObjectsResp, error) {
}

// 查询指定前缀的Object,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetObjectsByPath)
var _ = Register(Service.ListObjectsByPath)

type GetObjectsByPath struct {
type ListObjectsByPath struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
Path string `json:"path"`
IsPrefix bool `json:"isPrefix"`
NoRecursive bool `json:"noRecursive"`
cdsapi.ObjectListByPath
}
type GetObjectsByPathResp struct {
type ListObjectsByPathResp struct {
mq.MessageBodyBase
CommonPrefixes []string `json:"commonPrefixes"`
Objects []model.Object `json:"objects"`
cdsapi.ObjectListByPathResp
}

func ReqGetObjectsByPath(userID cdssdk.UserID, packageID cdssdk.PackageID, path string, isPrefix bool, noRecursive bool) *GetObjectsByPath {
return &GetObjectsByPath{
UserID: userID,
PackageID: packageID,
Path: path,
IsPrefix: isPrefix,
NoRecursive: noRecursive,
func ReqListObjectsByPath(req cdsapi.ObjectListByPath) *ListObjectsByPath {
return &ListObjectsByPath{
ObjectListByPath: req,
}
}
func RespGetObjectsByPath(commonPrefixes []string, objects []model.Object) *GetObjectsByPathResp {
return &GetObjectsByPathResp{
CommonPrefixes: commonPrefixes,
Objects: objects,
func RespListObjectsByPath(resp cdsapi.ObjectListByPathResp) *ListObjectsByPathResp {
return &ListObjectsByPathResp{
ObjectListByPathResp: resp,
}
}
func (client *Client) GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, error) {
return mq.Request(Service.GetObjectsByPath, client.rabbitCli, msg)
func (client *Client) ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, error) {
return mq.Request(Service.ListObjectsByPath, client.rabbitCli, msg)
}

// 查询Package中的所有Object,返回的Objects会按照ObjectID升序


+ 21
- 13
coordinator/internal/mq/object.go View File

@@ -55,9 +55,16 @@ func (svc *Service) GetObjects(msg *coormq.GetObjects) (*coormq.GetObjectsResp,
return mq.ReplyOK(coormq.RespGetObjects(ret))
}

func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetObjectsByPathResp, *mq.CodeMessage) {
func (svc *Service) ListObjectsByPath(msg *coormq.ListObjectsByPath) (*coormq.ListObjectsByPathResp, *mq.CodeMessage) {
var coms []string
var objs []cdssdk.Object
var conToken string

maxKeys := 1000
if msg.MaxKeys > 0 {
maxKeys = msg.MaxKeys
}

err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error

@@ -77,31 +84,32 @@ func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetO
}

if !msg.NoRecursive {
objs, err = svc.db2.Object().GetWithPathPrefix(tx, msg.PackageID, msg.Path)
objs, err = svc.db2.Object().GetWithPathPrefixPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys)
if err != nil {
return fmt.Errorf("getting objects with prefix: %w", err)
}
return nil
}

coms, err = svc.db2.Object().GetCommonPrefixes(tx, msg.PackageID, msg.Path)
if err != nil {
return fmt.Errorf("getting common prefixes: %w", err)
}
if len(objs) > 0 {
conToken = objs[len(objs)-1].Path
}

objs, err = svc.db2.Object().GetDirectChildren(tx, msg.PackageID, msg.Path)
if err != nil {
return fmt.Errorf("getting direct children: %w", err)
return nil
}

return nil
objs, coms, conToken, err = svc.db2.Object().GetByPrefixGroupedPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys)
return err
})
if err != nil {
logger.WithField("PathPrefix", msg.Path).Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed")
}

return mq.ReplyOK(coormq.RespGetObjectsByPath(coms, objs))
return mq.ReplyOK(coormq.RespListObjectsByPath(cdsapi.ObjectListByPathResp{
CommonPrefixes: coms,
Objects: objs,
IsTruncated: len(coms)+len(objs) >= maxKeys,
NextContinuationToken: conToken,
}))
}

func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {


Loading…
Cancel
Save