Browse Source

增加移动对象接口

gitlink
Sydonian 1 year ago
parent
commit
786a654293
6 changed files with 238 additions and 45 deletions
  1. +22
    -2
      client/internal/http/object.go
  2. +1
    -0
      client/internal/http/server.go
  3. +20
    -5
      client/internal/services/object.go
  4. +16
    -4
      common/pkgs/db/object.go
  5. +36
    -2
      common/pkgs/mq/coordinator/object.go
  6. +143
    -32
      coordinator/internal/mq/object.go

+ 22
- 2
client/internal/http/object.go View File

@@ -129,14 +129,34 @@ func (s *ObjectService) UpdateInfo(ctx *gin.Context) {
return
}

err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings)
sucs, err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings)
if err != nil {
log.Warnf("updating objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "update objects failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
ctx.JSON(http.StatusOK, OK(cdssdk.ObjectUpdateInfoResp{Successes: sucs}))
}

func (s *ObjectService) Move(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.Move")

var req cdssdk.ObjectMoveReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

sucs, err := s.svc.ObjectSvc().Move(req.UserID, req.Movings)
if err != nil {
log.Warnf("moving objects: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "move objects failed"))
return
}

ctx.JSON(http.StatusOK, OK(cdssdk.ObjectMoveResp{Successes: sucs}))
}

func (s *ObjectService) Delete(ctx *gin.Context) {


+ 1
- 0
client/internal/http/server.go View File

@@ -43,6 +43,7 @@ func (s *Server) initRouters() {
s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload)
s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects)
s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo)
s.engine.POST(cdssdk.ObjectMovePath, s.Object().Move)
s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete)

s.engine.GET(cdssdk.PackageGetPath, s.Package().Get)


+ 20
- 5
client/internal/services/object.go View File

@@ -36,19 +36,34 @@ func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration
return false, nil, nil
}

func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) error {
func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) ([]cdssdk.ObjectID, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

_, err = coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings))
resp, err := coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings))
if err != nil {
return fmt.Errorf("requsting to coodinator: %w", err)
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return nil
return resp.Successes, nil
}

func (svc *ObjectService) Move(userID cdssdk.UserID, movings []cdssdk.MovingObject) ([]cdssdk.ObjectID, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

resp, err := coorCli.MoveObjects(coormq.ReqMoveObjects(userID, movings))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return resp.Successes, nil
}

func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) {


+ 16
- 4
common/pkgs/db/object.go View File

@@ -47,7 +47,7 @@ func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]mod
return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
}

func (db *ObjectDB) BatchByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
if len(pathes) == 0 {
return nil, nil
}
@@ -87,7 +87,7 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID,
// 可以用于批量创建或者更新记录。
// 用于创建时,需要额外检查PackageID+Path的唯一性。
// 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。
func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error {
func (db *ObjectDB) BatchUpsertByPackagePath(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 {
return nil
}
@@ -99,6 +99,18 @@ func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) er
return BatchNamedExec(ctx, sql, 7, objs, nil)
}

func (db *ObjectDB) BatchUpert(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 {
return nil
}

sql := "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" +
" values(:ObjectID, :PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" +
" on duplicate key update PackageID = new.PackageID, Path = new.Path, Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime"

return BatchNamedExec(ctx, sql, 8, objs, nil)
}

func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) {
var ret []model.TempObject
err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
@@ -175,7 +187,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
})
}

err := db.BatchCreateOrUpdate(ctx, objs)
err := db.BatchUpsertByPackagePath(ctx, objs)
if err != nil {
return nil, fmt.Errorf("batch create or update objects: %w", err)
}
@@ -185,7 +197,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
pathes = append(pathes, add.Path)
}
// 这里可以不用检查查询结果是否与pathes的数量相同
addedObjs, err := db.BatchByPackagePath(ctx, packageID, pathes)
addedObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
if err != nil {
return nil, fmt.Errorf("batch get object ids: %w", err)
}


+ 36
- 2
common/pkgs/mq/coordinator/object.go View File

@@ -19,6 +19,8 @@ type ObjectService interface {

UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage)

MoveObjects(msg *MoveObjects) (*MoveObjectsResp, *mq.CodeMessage)

DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage)
}

@@ -142,6 +144,7 @@ type UpdateObjectInfos struct {

type UpdateObjectInfosResp struct {
mq.MessageBodyBase
Successes []cdssdk.ObjectID `json:"successes"`
}

func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) *UpdateObjectInfos {
@@ -150,13 +153,44 @@ func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObjec
Updatings: updatings,
}
}
func RespUpdateObjectInfos() *UpdateObjectInfosResp {
return &UpdateObjectInfosResp{}
func RespUpdateObjectInfos(successes []cdssdk.ObjectID) *UpdateObjectInfosResp {
return &UpdateObjectInfosResp{
Successes: successes,
}
}
func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) {
return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg)
}

// 移动Object
var _ = Register(Service.MoveObjects)

type MoveObjects struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Movings []cdssdk.MovingObject `json:"movings"`
}

type MoveObjectsResp struct {
mq.MessageBodyBase
Successes []cdssdk.ObjectID `json:"successes"`
}

func ReqMoveObjects(userID cdssdk.UserID, movings []cdssdk.MovingObject) *MoveObjects {
return &MoveObjects{
UserID: userID,
Movings: movings,
}
}
func RespMoveObjects(successes []cdssdk.ObjectID) *MoveObjectsResp {
return &MoveObjectsResp{
Successes: successes,
}
}
func (client *Client) MoveObjects(msg *MoveObjects) (*MoveObjectsResp, error) {
return mq.Request(Service.MoveObjects, client.rabbitCli, msg)
}

// 删除Object
var _ = Register(Service.DeleteObjects)



+ 143
- 32
coordinator/internal/mq/object.go View File

@@ -165,6 +165,7 @@ func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (
}

func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) {
var sucs []cdssdk.ObjectID
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int {
return sort2.Cmp(o1.ObjectID, o2.ObjectID)
@@ -184,43 +185,23 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up
oldObjIDs[i] = obj.ObjectID
}

avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs)
avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdssdk.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID })
if len(notExistsObjs) > 0 {
// TODO 部分对象已经不存在
}

// 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
// 否则,直接更新即可
//var pkgIDChangedObjs []cdssdk.Object
//var pathChangedObjs []cdssdk.Object
//var infoChangedObjs []cdssdk.Object
//for i := range willUpdateObjs {
// if willUpdateObjs[i].PackageID != oldObjs[i].PackageID {
// newObj := oldObjs[i]
// willUpdateObjs[i].ApplyTo(&newObj)
// pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
// } else if willUpdateObjs[i].Path != oldObjs[i].Path {
// newObj := oldObjs[i]
// willUpdateObjs[i].ApplyTo(&newObj)
// pathChangedObjs = append(pathChangedObjs, newObj)
// } else {
// newObj := oldObjs[i]
// willUpdateObjs[i].ApplyTo(&newObj)
// infoChangedObjs = append(infoChangedObjs, newObj)
// }
//}

newObjs := make([]cdssdk.Object, len(avaiUpdatings))
for i := range newObjs {
newObjs[i] = oldObjs[i]
avaiUpdatings[i].ApplyTo(&newObjs[i])
}

err = svc.db.Object().BatchCreateOrUpdate(tx, newObjs)
err = svc.db.Object().BatchUpsertByPackagePath(tx, newObjs)
if err != nil {
return fmt.Errorf("batch create or update: %w", err)
}

sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
return nil
})

@@ -229,23 +210,23 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up
return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed")
}

return mq.ReplyOK(coormq.RespUpdateObjectInfos())
return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs))
}

// 根据objIDs从objs中挑选Object。
// len(objs) >= len(objIDs)
func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.UpdatingObject) {
func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) {
objIdx := 0
idIdx := 0

for idIdx < len(objIDs) && objIdx < len(objs) {
if objs[objIdx].ObjectID < objIDs[idIdx] {
notFoundObjs = append(notFoundObjs, objs[objIdx])
if getID(objs[objIdx]) < objIDs[idIdx] {
notFound = append(notFound, objs[objIdx])
objIdx++
continue
}

pickedObjs = append(pickedObjs, objs[objIdx])
picked = append(picked, objs[objIdx])
objIdx++
idIdx++
}
@@ -253,7 +234,83 @@ func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pi
return
}

func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Object) ([]cdssdk.Object, error) {
func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) {
var sucs []cdssdk.ObjectID
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdssdk.MovingObject) int {
return sort2.Cmp(o1.ObjectID, o2.ObjectID)
})

objIDs := make([]cdssdk.ObjectID, len(msg.Movings))
for i, obj := range msg.Movings {
objIDs[i] = obj.ObjectID
}

oldObjs, err := svc.db.Object().BatchGet(tx, objIDs)
if err != nil {
return fmt.Errorf("batch getting objects: %w", err)
}
oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
for i, obj := range oldObjs {
oldObjIDs[i] = obj.ObjectID
}

avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdssdk.MovingObject) cdssdk.ObjectID { return obj.ObjectID })
if len(notExistsObjs) > 0 {
// TODO 部分对象已经不存在
}

// 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
var pkgIDChangedObjs []cdssdk.Object
var pathChangedObjs []cdssdk.Object
for i := range avaiMovings {
if avaiMovings[i].PackageID != oldObjs[i].PackageID {
newObj := oldObjs[i]
avaiMovings[i].ApplyTo(&newObj)
pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
} else if avaiMovings[i].Path != oldObjs[i].Path {
newObj := oldObjs[i]
avaiMovings[i].ApplyTo(&newObj)
pathChangedObjs = append(pathChangedObjs, newObj)
}
}

var newObjs []cdssdk.Object
// 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象
ensuredObjs, err := svc.ensurePackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs)
if err != nil {
return err
}
newObjs = append(newObjs, ensuredObjs...)

// 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象
ensuredObjs, err = svc.ensurePathChangedObjects(tx, msg.UserID, pathChangedObjs)
if err != nil {
return err
}
newObjs = append(newObjs, ensuredObjs...)

err = svc.db.Object().BatchUpert(tx, newObjs)
if err != nil {
return fmt.Errorf("batch create or update: %w", err)
}

sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
return nil
})
if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "move objects failed")
}

return mq.ReplyOK(coormq.RespMoveObjects(sucs))
}

func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
if len(objs) == 0 {
return nil, nil
}

type PackageObjects struct {
PackageID cdssdk.PackageID
ObjectByPath map[string]*cdssdk.Object
@@ -274,19 +331,29 @@ func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Objec
o := obj
pkg.ObjectByPath[obj.Path] = &o
} else {
// TODO 有冲突
// TODO 有两个对象移动到同一个路径,有冲突
}
}

var willUpdateObjs []cdssdk.Object
for _, pkg := range packages {
existsObjs, err := svc.db.Object().BatchByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
_, err := svc.db.Package().GetUserPackage(tx, userID, pkg.PackageID)
if err == sql.ErrNoRows {
continue
}
if err != nil {
return nil, fmt.Errorf("getting user package by id: %w", err)
}

existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
if err != nil {
return nil, fmt.Errorf("batch getting objects by package path: %w", err)
}

// 标记冲突的对象
for _, obj := range existsObjs {
pkg.ObjectByPath[obj.Path] = nil
// TODO 目标Package内有冲突的对象
}

for _, obj := range pkg.ObjectByPath {
@@ -295,12 +362,56 @@ func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Objec
}
willUpdateObjs = append(willUpdateObjs, *obj)
}

}

return willUpdateObjs, nil
}

func (svc *Service) ensurePathChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
if len(objs) == 0 {
return nil, nil
}

objByPath := make(map[string]*cdssdk.Object)
for _, obj := range objs {
if objByPath[obj.Path] == nil {
o := obj
objByPath[obj.Path] = &o
} else {
// TODO 有两个对象移动到同一个路径,有冲突
}

}

_, err := svc.db.Package().GetUserPackage(tx, userID, objs[0].PackageID)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("getting user package by id: %w", err)
}

existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path }))
if err != nil {
return nil, fmt.Errorf("batch getting objects by package path: %w", err)
}

// 不支持两个对象交换位置的情况,因为数据库不支持
for _, obj := range existsObjs {
objByPath[obj.Path] = nil
}

var willMoveObjs []cdssdk.Object
for _, obj := range objByPath {
if obj == nil {
continue
}
willMoveObjs = append(willMoveObjs, *obj)
}

return willMoveObjs, nil
}

func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
err := svc.db.Object().BatchDelete(tx, msg.ObjectIDs)


Loading…
Cancel
Save