diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 39e84ab..2ba37f5 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -39,14 +39,14 @@ func (s *ObjectService) ListByPath(ctx *gin.Context) { return } - coms, objs, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, req.IsPrefix, req.NoRecursive) + resp, err := s.svc.ObjectSvc().GetByPath(req) if err != nil { log.Warnf("listing objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("listing objects: %v", err))) return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectListByPathResp{CommonPrefixes: coms, Objects: objs})) + ctx.JSON(http.StatusOK, OK(resp)) } func (s *ObjectService) ListByIDs(ctx *gin.Context) { @@ -191,14 +191,16 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) { return } - _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false) + resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, + }) if err != nil { log.Warnf("getting object by path: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) return } - if len(obj) == 0 { + if len(resp.Objects) == 0 { log.Warnf("object not found: %s", req.Path) ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) return @@ -211,7 +213,7 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) { } file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{ - ObjectID: obj[0].ObjectID, + ObjectID: resp.Objects[0].ObjectID, Offset: off, Length: len, }) @@ -266,20 +268,22 @@ func (s *ObjectService) UpdateInfoByPath(ctx *gin.Context) { return } - _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, true, false) + resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, + }) if err != nil { log.Warnf("getting object by path: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) return } - if len(obj) == 0 { + if len(resp.Objects) == 0 { log.Warnf("object not found: %s", req.Path) ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) return } sucs, err := s.svc.ObjectSvc().UpdateInfo(req.UserID, []cdsapi.UpdatingObject{{ - ObjectID: obj[0].ObjectID, + ObjectID: resp.Objects[0].ObjectID, UpdateTime: req.UpdateTime, }}) if err != nil { @@ -343,18 +347,20 @@ func (s *ObjectService) DeleteByPath(ctx *gin.Context) { return } - _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false) + resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, + }) if err != nil { log.Warnf("getting object by path: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) return } - if len(obj) == 0 { + if len(resp.Objects) == 0 { ctx.JSON(http.StatusOK, OK(nil)) return } - err = s.svc.ObjectSvc().Delete(req.UserID, []cdssdk.ObjectID{obj[0].ObjectID}) + err = s.svc.ObjectSvc().Delete(req.UserID, []cdssdk.ObjectID{resp.Objects[0].ObjectID}) if err != nil { log.Warnf("deleting objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed")) diff --git a/client/internal/http/presigned.go b/client/internal/http/presigned.go index aef1c0e..926ef36 100644 --- a/client/internal/http/presigned.go +++ b/client/internal/http/presigned.go @@ -37,14 +37,15 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { return } - _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false) + resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, + }) if err != nil { log.Warnf("getting object by path: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) return } - - if len(obj) == 0 { + if len(resp.Objects) == 0 { log.Warnf("object not found: %s", req.Path) ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) return @@ -57,7 +58,7 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { } file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{ - ObjectID: obj[0].ObjectID, + ObjectID: resp.Objects[0].ObjectID, Offset: off, Length: len, }) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 780be64..6488769 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -26,19 +26,19 @@ func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } -func (svc *ObjectService) GetByPath(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, isPrefix bool, noRecursive bool) ([]string, []cdssdk.Object, error) { +func (svc *ObjectService) GetByPath(req cdsapi.ObjectListByPath) (cdsapi.ObjectListByPathResp, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return nil, nil, fmt.Errorf("new coordinator client: %w", err) + return cdsapi.ObjectListByPathResp{}, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) - listResp, err := coorCli.GetObjectsByPath(coormq.ReqGetObjectsByPath(userID, pkgID, path, isPrefix, noRecursive)) + listResp, err := coorCli.ListObjectsByPath(coormq.ReqListObjectsByPath(req)) if err != nil { - return nil, nil, fmt.Errorf("requsting to coodinator: %w", err) + return cdsapi.ObjectListByPathResp{}, fmt.Errorf("requsting to coodinator: %w", err) } - return listResp.CommonPrefixes, listResp.Objects, nil + return listResp.ObjectListByPathResp, nil } func (svc *ObjectService) GetByIDs(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) ([]*cdssdk.Object, error) { diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 14633ed..a90b25d 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -39,42 +39,51 @@ func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID return ret, err } -func (db *ObjectDB) GetCommonPrefixes(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]string, error) { - var ret []string +// 查询结果将按照Path升序,而不是ObjectID升序 +func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) ([]cdssdk.Object, error) { + var ret []cdssdk.Object + err := ctx.Table("Object").Where("PackageID = ? AND Path > ? AND Path LIKE ?", packageID, startPath, pathPrefix+"%").Order("Path ASC").Limit(limit).Find(&ret).Error + return ret, err +} + +func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) { + type ObjectOrDir struct { + cdssdk.Object + IsObject bool `gorm:"IsObject"` + Prefix string `gorm:"Prefix"` + } sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1 prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt) - - err := ctx.Table("Object").Select(prefixStatm+" as Prefix"). + grouping := ctx.Table("Object"). + Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)). Where("PackageID = ?", packageID). Where("Path like ?", pathPrefix+"%"). - Where(prefixStatm + " <> Path"). - Group("Prefix").Find(&ret).Error + Group("Prefix, IsObject"). + Having("Prefix > ?", startPath). + Limit(limit). + Order("Prefix ASC") + + var ret []ObjectOrDir + err = ctx.Table("Object"). + Select("Grouped.IsObject, Grouped.Prefix, Object.*"). + Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping). + Find(&ret).Error if err != nil { - return nil, err + return } - for i := range ret { - ret[i] = ret[i] + cdssdk.ObjectPathSeparator + for _, o := range ret { + if o.IsObject { + objs = append(objs, o.Object) + } else { + commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator) + } + nextStartPath = o.Prefix } - return ret, nil -} - -func (db *ObjectDB) GetDirectChildren(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { - var ret []cdssdk.Object - - sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1 - - prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt) - - err := ctx.Table("Object"). - Where("PackageID = ?", packageID). - Where("Path like ?", pathPrefix+"%"). - Where(prefixStatm + " = Path"). - Find(&ret).Error - return ret, err + return } func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 6ac2248..51a800c 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -12,7 +12,7 @@ import ( type ObjectService interface { GetObjects(msg *GetObjects) (*GetObjectsResp, *mq.CodeMessage) - GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, *mq.CodeMessage) + ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, *mq.CodeMessage) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) @@ -67,39 +67,29 @@ func (client *Client) GetObjects(msg *GetObjects) (*GetObjectsResp, error) { } // 查询指定前缀的Object,返回的Objects会按照ObjectID升序 -var _ = Register(Service.GetObjectsByPath) +var _ = Register(Service.ListObjectsByPath) -type GetObjectsByPath struct { +type ListObjectsByPath struct { mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` - Path string `json:"path"` - IsPrefix bool `json:"isPrefix"` - NoRecursive bool `json:"noRecursive"` + cdsapi.ObjectListByPath } -type GetObjectsByPathResp struct { +type ListObjectsByPathResp struct { mq.MessageBodyBase - CommonPrefixes []string `json:"commonPrefixes"` - Objects []model.Object `json:"objects"` + cdsapi.ObjectListByPathResp } -func ReqGetObjectsByPath(userID cdssdk.UserID, packageID cdssdk.PackageID, path string, isPrefix bool, noRecursive bool) *GetObjectsByPath { - return &GetObjectsByPath{ - UserID: userID, - PackageID: packageID, - Path: path, - IsPrefix: isPrefix, - NoRecursive: noRecursive, +func ReqListObjectsByPath(req cdsapi.ObjectListByPath) *ListObjectsByPath { + return &ListObjectsByPath{ + ObjectListByPath: req, } } -func RespGetObjectsByPath(commonPrefixes []string, objects []model.Object) *GetObjectsByPathResp { - return &GetObjectsByPathResp{ - CommonPrefixes: commonPrefixes, - Objects: objects, +func RespListObjectsByPath(resp cdsapi.ObjectListByPathResp) *ListObjectsByPathResp { + return &ListObjectsByPathResp{ + ObjectListByPathResp: resp, } } -func (client *Client) GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, error) { - return mq.Request(Service.GetObjectsByPath, client.rabbitCli, msg) +func (client *Client) ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, error) { + return mq.Request(Service.ListObjectsByPath, client.rabbitCli, msg) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 9fb9ba2..38a1a13 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -55,9 +55,16 @@ func (svc *Service) GetObjects(msg *coormq.GetObjects) (*coormq.GetObjectsResp, return mq.ReplyOK(coormq.RespGetObjects(ret)) } -func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetObjectsByPathResp, *mq.CodeMessage) { +func (svc *Service) ListObjectsByPath(msg *coormq.ListObjectsByPath) (*coormq.ListObjectsByPathResp, *mq.CodeMessage) { var coms []string var objs []cdssdk.Object + var conToken string + + maxKeys := 1000 + if msg.MaxKeys > 0 { + maxKeys = msg.MaxKeys + } + err := svc.db2.DoTx(func(tx db2.SQLContext) error { var err error @@ -76,31 +83,32 @@ func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetO } if !msg.NoRecursive { - objs, err = svc.db2.Object().GetWithPathPrefix(tx, msg.PackageID, msg.Path) + objs, err = svc.db2.Object().GetWithPathPrefixPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys) if err != nil { return fmt.Errorf("getting objects with prefix: %w", err) } - return nil - } - coms, err = svc.db2.Object().GetCommonPrefixes(tx, msg.PackageID, msg.Path) - if err != nil { - return fmt.Errorf("getting common prefixes: %w", err) - } + if len(objs) > 0 { + conToken = objs[len(objs)-1].Path + } - objs, err = svc.db2.Object().GetDirectChildren(tx, msg.PackageID, msg.Path) - if err != nil { - return fmt.Errorf("getting direct children: %w", err) + return nil } - return nil + objs, coms, conToken, err = svc.db2.Object().GetByPrefixGroupedPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys) + return err }) if err != nil { logger.WithField("PathPrefix", msg.Path).Warn(err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed") } - return mq.ReplyOK(coormq.RespGetObjectsByPath(coms, objs)) + return mq.ReplyOK(coormq.RespListObjectsByPath(cdsapi.ObjectListByPathResp{ + CommonPrefixes: coms, + Objects: objs, + IsTruncated: len(coms)+len(objs) >= maxKeys, + NextContinuationToken: conToken, + })) } func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {