diff --git a/client/internal/http/object.go b/client/internal/http/object.go index fb2c1f1..ae2eb6d 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -129,14 +129,34 @@ func (s *ObjectService) UpdateInfo(ctx *gin.Context) { return } - err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings) + sucs, err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings) if err != nil { log.Warnf("updating objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "update objects failed")) return } - ctx.JSON(http.StatusOK, OK(nil)) + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectUpdateInfoResp{Successes: sucs})) +} + +func (s *ObjectService) Move(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Move") + + var req cdssdk.ObjectMoveReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + sucs, err := s.svc.ObjectSvc().Move(req.UserID, req.Movings) + if err != nil { + log.Warnf("moving objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "move objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectMoveResp{Successes: sucs})) } func (s *ObjectService) Delete(ctx *gin.Context) { diff --git a/client/internal/http/server.go b/client/internal/http/server.go index ce4e276..d3c8fa9 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -43,6 +43,7 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo) + s.engine.POST(cdssdk.ObjectMovePath, s.Object().Move) s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete) s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 5e3e568..620ce35 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -36,19 +36,34 @@ func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration return false, nil, nil } -func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) error { +func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) ([]cdssdk.ObjectID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return fmt.Errorf("new coordinator client: %w", err) + return nil, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) - _, err = coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings)) + resp, err := coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings)) if err != nil { - return fmt.Errorf("requsting to coodinator: %w", err) + return nil, fmt.Errorf("requsting to coodinator: %w", err) } - return nil + return resp.Successes, nil +} + +func (svc *ObjectService) Move(userID cdssdk.UserID, movings []cdssdk.MovingObject) ([]cdssdk.ObjectID, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.MoveObjects(coormq.ReqMoveObjects(userID, movings)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return resp.Successes, nil } func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index ec9a669..7fd20ef 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -47,7 +47,7 @@ func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]mod return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil } -func (db *ObjectDB) BatchByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { +func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { if len(pathes) == 0 { return nil, nil } @@ -87,7 +87,7 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, // 可以用于批量创建或者更新记录。 // 用于创建时,需要额外检查PackageID+Path的唯一性。 // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 -func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { +func (db *ObjectDB) BatchUpsertByPackagePath(ctx SQLContext, objs []cdssdk.Object) error { if len(objs) == 0 { return nil } @@ -99,6 +99,18 @@ func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) er return BatchNamedExec(ctx, sql, 7, objs, nil) } +func (db *ObjectDB) BatchUpert(ctx SQLContext, objs []cdssdk.Object) error { + if len(objs) == 0 { + return nil + } + + sql := "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + + " values(:ObjectID, :PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + + " on duplicate key update PackageID = new.PackageID, Path = new.Path, Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" + + return BatchNamedExec(ctx, sql, 8, objs, nil) +} + func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { var ret []model.TempObject err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID) @@ -175,7 +187,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] }) } - err := db.BatchCreateOrUpdate(ctx, objs) + err := db.BatchUpsertByPackagePath(ctx, objs) if err != nil { return nil, fmt.Errorf("batch create or update objects: %w", err) } @@ -185,7 +197,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] pathes = append(pathes, add.Path) } // 这里可以不用检查查询结果是否与pathes的数量相同 - addedObjs, err := db.BatchByPackagePath(ctx, packageID, pathes) + addedObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes) if err != nil { return nil, fmt.Errorf("batch get object ids: %w", err) } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 12fcac5..7b3c90f 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -19,6 +19,8 @@ type ObjectService interface { UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage) + MoveObjects(msg *MoveObjects) (*MoveObjectsResp, *mq.CodeMessage) + DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) } @@ -142,6 +144,7 @@ type UpdateObjectInfos struct { type UpdateObjectInfosResp struct { mq.MessageBodyBase + Successes []cdssdk.ObjectID `json:"successes"` } func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) *UpdateObjectInfos { @@ -150,13 +153,44 @@ func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObjec Updatings: updatings, } } -func RespUpdateObjectInfos() *UpdateObjectInfosResp { - return &UpdateObjectInfosResp{} +func RespUpdateObjectInfos(successes []cdssdk.ObjectID) *UpdateObjectInfosResp { + return &UpdateObjectInfosResp{ + Successes: successes, + } } func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) { return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg) } +// 移动Object +var _ = Register(Service.MoveObjects) + +type MoveObjects struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Movings []cdssdk.MovingObject `json:"movings"` +} + +type MoveObjectsResp struct { + mq.MessageBodyBase + Successes []cdssdk.ObjectID `json:"successes"` +} + +func ReqMoveObjects(userID cdssdk.UserID, movings []cdssdk.MovingObject) *MoveObjects { + return &MoveObjects{ + UserID: userID, + Movings: movings, + } +} +func RespMoveObjects(successes []cdssdk.ObjectID) *MoveObjectsResp { + return &MoveObjectsResp{ + Successes: successes, + } +} +func (client *Client) MoveObjects(msg *MoveObjects) (*MoveObjectsResp, error) { + return mq.Request(Service.MoveObjects, client.rabbitCli, msg) +} + // 删除Object var _ = Register(Service.DeleteObjects) diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index fd53755..a5dd496 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -165,6 +165,7 @@ func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) ( } func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) { + var sucs []cdssdk.ObjectID err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int { return sort2.Cmp(o1.ObjectID, o2.ObjectID) @@ -184,43 +185,23 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up oldObjIDs[i] = obj.ObjectID } - avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs) + avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdssdk.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID }) if len(notExistsObjs) > 0 { // TODO 部分对象已经不存在 } - // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突 - // 否则,直接更新即可 - //var pkgIDChangedObjs []cdssdk.Object - //var pathChangedObjs []cdssdk.Object - //var infoChangedObjs []cdssdk.Object - //for i := range willUpdateObjs { - // if willUpdateObjs[i].PackageID != oldObjs[i].PackageID { - // newObj := oldObjs[i] - // willUpdateObjs[i].ApplyTo(&newObj) - // pkgIDChangedObjs = append(pkgIDChangedObjs, newObj) - // } else if willUpdateObjs[i].Path != oldObjs[i].Path { - // newObj := oldObjs[i] - // willUpdateObjs[i].ApplyTo(&newObj) - // pathChangedObjs = append(pathChangedObjs, newObj) - // } else { - // newObj := oldObjs[i] - // willUpdateObjs[i].ApplyTo(&newObj) - // infoChangedObjs = append(infoChangedObjs, newObj) - // } - //} - newObjs := make([]cdssdk.Object, len(avaiUpdatings)) for i := range newObjs { newObjs[i] = oldObjs[i] avaiUpdatings[i].ApplyTo(&newObjs[i]) } - err = svc.db.Object().BatchCreateOrUpdate(tx, newObjs) + err = svc.db.Object().BatchUpsertByPackagePath(tx, newObjs) if err != nil { return fmt.Errorf("batch create or update: %w", err) } + sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID }) return nil }) @@ -229,23 +210,23 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed") } - return mq.ReplyOK(coormq.RespUpdateObjectInfos()) + return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs)) } // 根据objIDs从objs中挑选Object。 // len(objs) >= len(objIDs) -func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.UpdatingObject) { +func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) { objIdx := 0 idIdx := 0 for idIdx < len(objIDs) && objIdx < len(objs) { - if objs[objIdx].ObjectID < objIDs[idIdx] { - notFoundObjs = append(notFoundObjs, objs[objIdx]) + if getID(objs[objIdx]) < objIDs[idIdx] { + notFound = append(notFound, objs[objIdx]) objIdx++ continue } - pickedObjs = append(pickedObjs, objs[objIdx]) + picked = append(picked, objs[objIdx]) objIdx++ idIdx++ } @@ -253,7 +234,83 @@ func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pi return } -func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Object) ([]cdssdk.Object, error) { +func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) { + var sucs []cdssdk.ObjectID + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdssdk.MovingObject) int { + return sort2.Cmp(o1.ObjectID, o2.ObjectID) + }) + + objIDs := make([]cdssdk.ObjectID, len(msg.Movings)) + for i, obj := range msg.Movings { + objIDs[i] = obj.ObjectID + } + + oldObjs, err := svc.db.Object().BatchGet(tx, objIDs) + if err != nil { + return fmt.Errorf("batch getting objects: %w", err) + } + oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs)) + for i, obj := range oldObjs { + oldObjIDs[i] = obj.ObjectID + } + + avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdssdk.MovingObject) cdssdk.ObjectID { return obj.ObjectID }) + if len(notExistsObjs) > 0 { + // TODO 部分对象已经不存在 + } + + // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突 + var pkgIDChangedObjs []cdssdk.Object + var pathChangedObjs []cdssdk.Object + for i := range avaiMovings { + if avaiMovings[i].PackageID != oldObjs[i].PackageID { + newObj := oldObjs[i] + avaiMovings[i].ApplyTo(&newObj) + pkgIDChangedObjs = append(pkgIDChangedObjs, newObj) + } else if avaiMovings[i].Path != oldObjs[i].Path { + newObj := oldObjs[i] + avaiMovings[i].ApplyTo(&newObj) + pathChangedObjs = append(pathChangedObjs, newObj) + } + } + + var newObjs []cdssdk.Object + // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象 + ensuredObjs, err := svc.ensurePackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs) + if err != nil { + return err + } + newObjs = append(newObjs, ensuredObjs...) + + // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象 + ensuredObjs, err = svc.ensurePathChangedObjects(tx, msg.UserID, pathChangedObjs) + if err != nil { + return err + } + newObjs = append(newObjs, ensuredObjs...) + + err = svc.db.Object().BatchUpert(tx, newObjs) + if err != nil { + return fmt.Errorf("batch create or update: %w", err) + } + + sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID }) + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "move objects failed") + } + + return mq.ReplyOK(coormq.RespMoveObjects(sucs)) +} + +func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { + if len(objs) == 0 { + return nil, nil + } + type PackageObjects struct { PackageID cdssdk.PackageID ObjectByPath map[string]*cdssdk.Object @@ -274,19 +331,29 @@ func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Objec o := obj pkg.ObjectByPath[obj.Path] = &o } else { - // TODO 有冲突 + // TODO 有两个对象移动到同一个路径,有冲突 } } var willUpdateObjs []cdssdk.Object for _, pkg := range packages { - existsObjs, err := svc.db.Object().BatchByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath)) + _, err := svc.db.Package().GetUserPackage(tx, userID, pkg.PackageID) + if err == sql.ErrNoRows { + continue + } + if err != nil { + return nil, fmt.Errorf("getting user package by id: %w", err) + } + + existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath)) if err != nil { return nil, fmt.Errorf("batch getting objects by package path: %w", err) } + // 标记冲突的对象 for _, obj := range existsObjs { pkg.ObjectByPath[obj.Path] = nil + // TODO 目标Package内有冲突的对象 } for _, obj := range pkg.ObjectByPath { @@ -295,12 +362,56 @@ func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Objec } willUpdateObjs = append(willUpdateObjs, *obj) } - } return willUpdateObjs, nil } +func (svc *Service) ensurePathChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { + if len(objs) == 0 { + return nil, nil + } + + objByPath := make(map[string]*cdssdk.Object) + for _, obj := range objs { + if objByPath[obj.Path] == nil { + o := obj + objByPath[obj.Path] = &o + } else { + // TODO 有两个对象移动到同一个路径,有冲突 + } + + } + + _, err := svc.db.Package().GetUserPackage(tx, userID, objs[0].PackageID) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("getting user package by id: %w", err) + } + + existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path })) + if err != nil { + return nil, fmt.Errorf("batch getting objects by package path: %w", err) + } + + // 不支持两个对象交换位置的情况,因为数据库不支持 + for _, obj := range existsObjs { + objByPath[obj.Path] = nil + } + + var willMoveObjs []cdssdk.Object + for _, obj := range objByPath { + if obj == nil { + continue + } + willMoveObjs = append(willMoveObjs, *obj) + } + + return willMoveObjs, nil +} + func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { err := svc.db.Object().BatchDelete(tx, msg.ObjectIDs)