| @@ -39,14 +39,14 @@ func (s *ObjectService) ListByPath(ctx *gin.Context) { | |||
| return | |||
| } | |||
| coms, objs, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, req.IsPrefix, req.NoRecursive) | |||
| resp, err := s.svc.ObjectSvc().GetByPath(req) | |||
| if err != nil { | |||
| log.Warnf("listing objects: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("listing objects: %v", err))) | |||
| return | |||
| } | |||
| ctx.JSON(http.StatusOK, OK(cdsapi.ObjectListByPathResp{CommonPrefixes: coms, Objects: objs})) | |||
| ctx.JSON(http.StatusOK, OK(resp)) | |||
| } | |||
| func (s *ObjectService) ListByIDs(ctx *gin.Context) { | |||
| @@ -191,14 +191,16 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) { | |||
| return | |||
| } | |||
| _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false) | |||
| resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ | |||
| UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, | |||
| }) | |||
| if err != nil { | |||
| log.Warnf("getting object by path: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) | |||
| return | |||
| } | |||
| if len(obj) == 0 { | |||
| if len(resp.Objects) == 0 { | |||
| log.Warnf("object not found: %s", req.Path) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) | |||
| return | |||
| @@ -211,7 +213,7 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) { | |||
| } | |||
| file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{ | |||
| ObjectID: obj[0].ObjectID, | |||
| ObjectID: resp.Objects[0].ObjectID, | |||
| Offset: off, | |||
| Length: len, | |||
| }) | |||
| @@ -266,20 +268,22 @@ func (s *ObjectService) UpdateInfoByPath(ctx *gin.Context) { | |||
| return | |||
| } | |||
| _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, true, false) | |||
| resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ | |||
| UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, | |||
| }) | |||
| if err != nil { | |||
| log.Warnf("getting object by path: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) | |||
| return | |||
| } | |||
| if len(obj) == 0 { | |||
| if len(resp.Objects) == 0 { | |||
| log.Warnf("object not found: %s", req.Path) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) | |||
| return | |||
| } | |||
| sucs, err := s.svc.ObjectSvc().UpdateInfo(req.UserID, []cdsapi.UpdatingObject{{ | |||
| ObjectID: obj[0].ObjectID, | |||
| ObjectID: resp.Objects[0].ObjectID, | |||
| UpdateTime: req.UpdateTime, | |||
| }}) | |||
| if err != nil { | |||
| @@ -343,18 +347,20 @@ func (s *ObjectService) DeleteByPath(ctx *gin.Context) { | |||
| return | |||
| } | |||
| _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false) | |||
| resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ | |||
| UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, | |||
| }) | |||
| if err != nil { | |||
| log.Warnf("getting object by path: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) | |||
| return | |||
| } | |||
| if len(obj) == 0 { | |||
| if len(resp.Objects) == 0 { | |||
| ctx.JSON(http.StatusOK, OK(nil)) | |||
| return | |||
| } | |||
| err = s.svc.ObjectSvc().Delete(req.UserID, []cdssdk.ObjectID{obj[0].ObjectID}) | |||
| err = s.svc.ObjectSvc().Delete(req.UserID, []cdssdk.ObjectID{resp.Objects[0].ObjectID}) | |||
| if err != nil { | |||
| log.Warnf("deleting objects: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed")) | |||
| @@ -37,14 +37,15 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { | |||
| return | |||
| } | |||
| _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false) | |||
| resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ | |||
| UserID: req.UserID, PackageID: req.PackageID, Path: req.Path, | |||
| }) | |||
| if err != nil { | |||
| log.Warnf("getting object by path: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) | |||
| return | |||
| } | |||
| if len(obj) == 0 { | |||
| if len(resp.Objects) == 0 { | |||
| log.Warnf("object not found: %s", req.Path) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) | |||
| return | |||
| @@ -57,7 +58,7 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { | |||
| } | |||
| file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{ | |||
| ObjectID: obj[0].ObjectID, | |||
| ObjectID: resp.Objects[0].ObjectID, | |||
| Offset: off, | |||
| Length: len, | |||
| }) | |||
| @@ -26,19 +26,19 @@ func (svc *Service) ObjectSvc() *ObjectService { | |||
| return &ObjectService{Service: svc} | |||
| } | |||
| func (svc *ObjectService) GetByPath(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, isPrefix bool, noRecursive bool) ([]string, []cdssdk.Object, error) { | |||
| func (svc *ObjectService) GetByPath(req cdsapi.ObjectListByPath) (cdsapi.ObjectListByPathResp, error) { | |||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return nil, nil, fmt.Errorf("new coordinator client: %w", err) | |||
| return cdsapi.ObjectListByPathResp{}, fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||
| listResp, err := coorCli.GetObjectsByPath(coormq.ReqGetObjectsByPath(userID, pkgID, path, isPrefix, noRecursive)) | |||
| listResp, err := coorCli.ListObjectsByPath(coormq.ReqListObjectsByPath(req)) | |||
| if err != nil { | |||
| return nil, nil, fmt.Errorf("requsting to coodinator: %w", err) | |||
| return cdsapi.ObjectListByPathResp{}, fmt.Errorf("requsting to coodinator: %w", err) | |||
| } | |||
| return listResp.CommonPrefixes, listResp.Objects, nil | |||
| return listResp.ObjectListByPathResp, nil | |||
| } | |||
| func (svc *ObjectService) GetByIDs(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) ([]*cdssdk.Object, error) { | |||
| @@ -39,42 +39,51 @@ func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID | |||
| return ret, err | |||
| } | |||
| func (db *ObjectDB) GetCommonPrefixes(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]string, error) { | |||
| var ret []string | |||
| // 查询结果将按照Path升序,而不是ObjectID升序 | |||
| func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) ([]cdssdk.Object, error) { | |||
| var ret []cdssdk.Object | |||
| err := ctx.Table("Object").Where("PackageID = ? AND Path > ? AND Path LIKE ?", packageID, startPath, pathPrefix+"%").Order("Path ASC").Limit(limit).Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) { | |||
| type ObjectOrDir struct { | |||
| cdssdk.Object | |||
| IsObject bool `gorm:"IsObject"` | |||
| Prefix string `gorm:"Prefix"` | |||
| } | |||
| sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1 | |||
| prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt) | |||
| err := ctx.Table("Object").Select(prefixStatm+" as Prefix"). | |||
| grouping := ctx.Table("Object"). | |||
| Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)). | |||
| Where("PackageID = ?", packageID). | |||
| Where("Path like ?", pathPrefix+"%"). | |||
| Where(prefixStatm + " <> Path"). | |||
| Group("Prefix").Find(&ret).Error | |||
| Group("Prefix, IsObject"). | |||
| Having("Prefix > ?", startPath). | |||
| Limit(limit). | |||
| Order("Prefix ASC") | |||
| var ret []ObjectOrDir | |||
| err = ctx.Table("Object"). | |||
| Select("Grouped.IsObject, Grouped.Prefix, Object.*"). | |||
| Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping). | |||
| Find(&ret).Error | |||
| if err != nil { | |||
| return nil, err | |||
| return | |||
| } | |||
| for i := range ret { | |||
| ret[i] = ret[i] + cdssdk.ObjectPathSeparator | |||
| for _, o := range ret { | |||
| if o.IsObject { | |||
| objs = append(objs, o.Object) | |||
| } else { | |||
| commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator) | |||
| } | |||
| nextStartPath = o.Prefix | |||
| } | |||
| return ret, nil | |||
| } | |||
| func (db *ObjectDB) GetDirectChildren(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { | |||
| var ret []cdssdk.Object | |||
| sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1 | |||
| prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt) | |||
| err := ctx.Table("Object"). | |||
| Where("PackageID = ?", packageID). | |||
| Where("Path like ?", pathPrefix+"%"). | |||
| Where(prefixStatm + " = Path"). | |||
| Find(&ret).Error | |||
| return ret, err | |||
| return | |||
| } | |||
| func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { | |||
| @@ -12,7 +12,7 @@ import ( | |||
| type ObjectService interface { | |||
| GetObjects(msg *GetObjects) (*GetObjectsResp, *mq.CodeMessage) | |||
| GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, *mq.CodeMessage) | |||
| ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, *mq.CodeMessage) | |||
| GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) | |||
| @@ -67,39 +67,29 @@ func (client *Client) GetObjects(msg *GetObjects) (*GetObjectsResp, error) { | |||
| } | |||
| // 查询指定前缀的Object,返回的Objects会按照ObjectID升序 | |||
| var _ = Register(Service.GetObjectsByPath) | |||
| var _ = Register(Service.ListObjectsByPath) | |||
| type GetObjectsByPath struct { | |||
| type ListObjectsByPath struct { | |||
| mq.MessageBodyBase | |||
| UserID cdssdk.UserID `json:"userID"` | |||
| PackageID cdssdk.PackageID `json:"packageID"` | |||
| Path string `json:"path"` | |||
| IsPrefix bool `json:"isPrefix"` | |||
| NoRecursive bool `json:"noRecursive"` | |||
| cdsapi.ObjectListByPath | |||
| } | |||
| type GetObjectsByPathResp struct { | |||
| type ListObjectsByPathResp struct { | |||
| mq.MessageBodyBase | |||
| CommonPrefixes []string `json:"commonPrefixes"` | |||
| Objects []model.Object `json:"objects"` | |||
| cdsapi.ObjectListByPathResp | |||
| } | |||
| func ReqGetObjectsByPath(userID cdssdk.UserID, packageID cdssdk.PackageID, path string, isPrefix bool, noRecursive bool) *GetObjectsByPath { | |||
| return &GetObjectsByPath{ | |||
| UserID: userID, | |||
| PackageID: packageID, | |||
| Path: path, | |||
| IsPrefix: isPrefix, | |||
| NoRecursive: noRecursive, | |||
| func ReqListObjectsByPath(req cdsapi.ObjectListByPath) *ListObjectsByPath { | |||
| return &ListObjectsByPath{ | |||
| ObjectListByPath: req, | |||
| } | |||
| } | |||
| func RespGetObjectsByPath(commonPrefixes []string, objects []model.Object) *GetObjectsByPathResp { | |||
| return &GetObjectsByPathResp{ | |||
| CommonPrefixes: commonPrefixes, | |||
| Objects: objects, | |||
| func RespListObjectsByPath(resp cdsapi.ObjectListByPathResp) *ListObjectsByPathResp { | |||
| return &ListObjectsByPathResp{ | |||
| ObjectListByPathResp: resp, | |||
| } | |||
| } | |||
| func (client *Client) GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, error) { | |||
| return mq.Request(Service.GetObjectsByPath, client.rabbitCli, msg) | |||
| func (client *Client) ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, error) { | |||
| return mq.Request(Service.ListObjectsByPath, client.rabbitCli, msg) | |||
| } | |||
| // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 | |||
| @@ -55,9 +55,16 @@ func (svc *Service) GetObjects(msg *coormq.GetObjects) (*coormq.GetObjectsResp, | |||
| return mq.ReplyOK(coormq.RespGetObjects(ret)) | |||
| } | |||
| func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetObjectsByPathResp, *mq.CodeMessage) { | |||
| func (svc *Service) ListObjectsByPath(msg *coormq.ListObjectsByPath) (*coormq.ListObjectsByPathResp, *mq.CodeMessage) { | |||
| var coms []string | |||
| var objs []cdssdk.Object | |||
| var conToken string | |||
| maxKeys := 1000 | |||
| if msg.MaxKeys > 0 { | |||
| maxKeys = msg.MaxKeys | |||
| } | |||
| err := svc.db2.DoTx(func(tx db2.SQLContext) error { | |||
| var err error | |||
| @@ -76,31 +83,32 @@ func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetO | |||
| } | |||
| if !msg.NoRecursive { | |||
| objs, err = svc.db2.Object().GetWithPathPrefix(tx, msg.PackageID, msg.Path) | |||
| objs, err = svc.db2.Object().GetWithPathPrefixPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys) | |||
| if err != nil { | |||
| return fmt.Errorf("getting objects with prefix: %w", err) | |||
| } | |||
| return nil | |||
| } | |||
| coms, err = svc.db2.Object().GetCommonPrefixes(tx, msg.PackageID, msg.Path) | |||
| if err != nil { | |||
| return fmt.Errorf("getting common prefixes: %w", err) | |||
| } | |||
| if len(objs) > 0 { | |||
| conToken = objs[len(objs)-1].Path | |||
| } | |||
| objs, err = svc.db2.Object().GetDirectChildren(tx, msg.PackageID, msg.Path) | |||
| if err != nil { | |||
| return fmt.Errorf("getting direct children: %w", err) | |||
| return nil | |||
| } | |||
| return nil | |||
| objs, coms, conToken, err = svc.db2.Object().GetByPrefixGroupedPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys) | |||
| return err | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("PathPrefix", msg.Path).Warn(err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed") | |||
| } | |||
| return mq.ReplyOK(coormq.RespGetObjectsByPath(coms, objs)) | |||
| return mq.ReplyOK(coormq.RespListObjectsByPath(cdsapi.ObjectListByPathResp{ | |||
| CommonPrefixes: coms, | |||
| Objects: objs, | |||
| IsTruncated: len(coms)+len(objs) >= maxKeys, | |||
| NextContinuationToken: conToken, | |||
| })) | |||
| } | |||
| func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { | |||