From e7791206180ec7a9d4ab544b6d5c91abbd2c9e29 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 8 Apr 2025 10:56:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dcoordinator=E7=AB=AF=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- coordinator/internal/cmd/migrate.go | 25 +- coordinator/internal/cmd/serve.go | 21 +- coordinator/internal/config/config.go | 2 +- coordinator/internal/db/config.go | 21 + coordinator/internal/db/db.go | 77 ++ coordinator/internal/db/hub.go | 48 ++ coordinator/internal/db/hub_connectivity.go | 36 + coordinator/internal/db/location.go | 36 + coordinator/internal/db/storage.go | 43 + coordinator/internal/db/union_serializer.go | 44 + coordinator/internal/db/user.go | 44 + coordinator/internal/mq/agent.go | 1 - coordinator/internal/mq/bucket.go | 143 ---- coordinator/internal/mq/cache.go | 66 -- coordinator/internal/mq/hub.go | 93 +- coordinator/internal/mq/object.go | 891 -------------------- coordinator/internal/mq/package.go | 337 -------- coordinator/internal/mq/service.go | 11 +- coordinator/internal/mq/storage.go | 124 +-- coordinator/internal/mq/temp.go | 52 -- coordinator/internal/mq/user.go | 127 --- coordinator/internal/mq/utils.go | 23 - coordinator/types/storage.go | 39 +- coordinator/types/storage_credential.go | 92 ++ coordinator/types/types.go | 20 + 25 files changed, 495 insertions(+), 1921 deletions(-) create mode 100644 coordinator/internal/db/config.go create mode 100644 coordinator/internal/db/db.go create mode 100644 coordinator/internal/db/hub.go create mode 100644 coordinator/internal/db/hub_connectivity.go create mode 100644 coordinator/internal/db/location.go create mode 100644 coordinator/internal/db/storage.go create mode 100644 coordinator/internal/db/union_serializer.go create mode 100644 coordinator/internal/db/user.go delete mode 100644 coordinator/internal/mq/agent.go delete mode 100644 coordinator/internal/mq/bucket.go delete mode 100644 coordinator/internal/mq/cache.go delete mode 100644 coordinator/internal/mq/object.go delete mode 100644 coordinator/internal/mq/package.go delete mode 100644 coordinator/internal/mq/temp.go delete mode 100644 coordinator/internal/mq/user.go delete mode 100644 coordinator/internal/mq/utils.go create mode 100644 coordinator/types/storage_credential.go diff --git a/coordinator/internal/cmd/migrate.go b/coordinator/internal/cmd/migrate.go index 4cd89ab..a7723cf 100644 --- a/coordinator/internal/cmd/migrate.go +++ b/coordinator/internal/cmd/migrate.go @@ -5,10 +5,8 @@ import ( "os" "github.com/spf13/cobra" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" "gitlink.org.cn/cloudream/storage2/coordinator/internal/config" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" "gorm.io/driver/mysql" "gorm.io/gorm" ) @@ -42,22 +40,11 @@ func migrate(configPath string) { } db = db.Set("gorm:table_options", "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci") - migrateOne(db, cdssdk.Bucket{}) - migrateOne(db, model.Cache{}) - migrateOne(db, model.Location{}) - migrateOne(db, model.HubConnectivity{}) - migrateOne(db, cdssdk.Hub{}) - migrateOne(db, stgmod.ObjectAccessStat{}) - migrateOne(db, stgmod.ObjectBlock{}) - migrateOne(db, cdssdk.Object{}) - migrateOne(db, stgmod.PackageAccessStat{}) - migrateOne(db, cdssdk.Package{}) - migrateOne(db, cdssdk.PinnedObject{}) - migrateOne(db, cdssdk.Storage{}) - migrateOne(db, model.UserStorage{}) - migrateOne(db, model.UserBucket{}) - migrateOne(db, cdssdk.User{}) - migrateOne(db, model.UserHub{}) + migrateOne(db, cortypes.HubConnectivity{}) + migrateOne(db, cortypes.Hub{}) + migrateOne(db, cortypes.Location{}) + migrateOne(db, cortypes.Storage{}) + migrateOne(db, cortypes.User{}) fmt.Println("migrate success") } diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index fbb33e6..8603b07 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -7,11 +7,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" "gitlink.org.cn/cloudream/storage2/coordinator/internal/config" + "gitlink.org.cn/cloudream/storage2/coordinator/internal/db" mymq "gitlink.org.cn/cloudream/storage2/coordinator/internal/mq" ) @@ -28,20 +27,20 @@ func serve(configPath string) { os.Exit(1) } - db2, err := db2.NewDB(&config.Cfg().DB) + db2, err := db.NewDB(&config.Cfg().DB) if err != nil { logger.Fatalf("new db2 failed, err: %s", err.Error()) } // 初始化系统事件发布器 - evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceCoordinator{}) - if err != nil { - logger.Errorf("new sysevent publisher: %v", err) - os.Exit(1) - } - go servePublisher(evtPub) - - coorSvr, err := coormq.NewServer(mymq.NewService(db2, evtPub), config.Cfg().RabbitMQ) + // evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &cortypes.SourceCoordinator{}) + // if err != nil { + // logger.Errorf("new sysevent publisher: %v", err) + // os.Exit(1) + // } + // go servePublisher(evtPub) + + coorSvr, err := coormq.NewServer(mymq.NewService(db2), config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index 4197dae..dc1daf0 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -4,7 +4,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" c "gitlink.org.cn/cloudream/common/utils/config" - db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" + "gitlink.org.cn/cloudream/storage2/coordinator/internal/db" ) type Config struct { diff --git a/coordinator/internal/db/config.go b/coordinator/internal/db/config.go new file mode 100644 index 0000000..0879503 --- /dev/null +++ b/coordinator/internal/db/config.go @@ -0,0 +1,21 @@ +package db + +import "fmt" + +type Config struct { + Address string `json:"address"` + Account string `json:"account"` + Password string `json:"password"` + DatabaseName string `json:"databaseName"` +} + +func (cfg *Config) MakeSourceString() string { + return fmt.Sprintf( + "%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s", + cfg.Account, + cfg.Password, + cfg.Address, + cfg.DatabaseName, + "Asia%2FShanghai", + ) +} diff --git a/coordinator/internal/db/db.go b/coordinator/internal/db/db.go new file mode 100644 index 0000000..bd83200 --- /dev/null +++ b/coordinator/internal/db/db.go @@ -0,0 +1,77 @@ +package db + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/sirupsen/logrus" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +type DB struct { + db *gorm.DB +} + +func NewDB(cfg *Config) (*DB, error) { + mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{}) + if err != nil { + logrus.Fatalf("failed to connect to database: %v", err) + } + + return &DB{ + db: mydb, + }, nil +} + +func (db *DB) DoTx(do func(tx SQLContext) error) error { + return db.db.Transaction(func(tx *gorm.DB) error { + return do(SQLContext{tx}) + }) +} + +func DoTx02[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}) + return err + }) + return ret, err +} + +func DoTx12[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}, t) + return err + }) + return ret, err +} + +func DoTx22[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}, t1, t2) + return err + }) + return ret, err +} + +func DoTx32[T1 any, T2 any, T3 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2, t3 T3) (R, error), t1 T1, t2 T2, t3 T3) (R, error) { + var ret R + err := db.db.Transaction(func(tx *gorm.DB) error { + var err error + ret, err = do(SQLContext{tx}, t1, t2, t3) + return err + }) + return ret, err +} + +type SQLContext struct { + *gorm.DB +} + +func (db *DB) DefCtx() SQLContext { + return SQLContext{db.db} +} diff --git a/coordinator/internal/db/hub.go b/coordinator/internal/db/hub.go new file mode 100644 index 0000000..065a877 --- /dev/null +++ b/coordinator/internal/db/hub.go @@ -0,0 +1,48 @@ +package db + +import ( + "time" + + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" +) + +type HubDB struct { + *DB +} + +func (db *DB) Hub() *HubDB { + return &HubDB{DB: db} +} + +func (*HubDB) GetAllHubs(ctx SQLContext) ([]cortypes.Hub, error) { + var ret []cortypes.Hub + + err := ctx.Table("Hub").Find(&ret).Error + return ret, err +} + +func (*HubDB) GetByID(ctx SQLContext, hubID cortypes.HubID) (cortypes.Hub, error) { + var ret cortypes.Hub + err := ctx.Table("Hub").Where("HubID = ?", hubID).Find(&ret).Error + + return ret, err +} + +func (*HubDB) BatchGetByID(ctx SQLContext, hubIDs []cortypes.HubID) ([]cortypes.Hub, error) { + var ret []cortypes.Hub + err := ctx.Table("Hub").Where("HubID IN (?)", hubIDs).Find(&ret).Error + + return ret, err +} + +// UpdateState 更新状态,并且设置上次上报时间为现在 +func (*HubDB) UpdateState(ctx SQLContext, hubID cortypes.HubID, state string) error { + err := ctx. + Model(&cortypes.Hub{}). + Where("HubID = ?", hubID). + Updates(map[string]interface{}{ + "State": state, + "LastReportTime": time.Now(), + }).Error + return err +} diff --git a/coordinator/internal/db/hub_connectivity.go b/coordinator/internal/db/hub_connectivity.go new file mode 100644 index 0000000..754c4b9 --- /dev/null +++ b/coordinator/internal/db/hub_connectivity.go @@ -0,0 +1,36 @@ +package db + +import ( + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" + "gorm.io/gorm/clause" +) + +type HubConnectivityDB struct { + *DB +} + +func (db *DB) HubConnectivity() *HubConnectivityDB { + return &HubConnectivityDB{DB: db} +} + +func (db *HubConnectivityDB) BatchGetByFromHub(ctx SQLContext, fromHubIDs []cortypes.HubID) ([]cortypes.HubConnectivity, error) { + if len(fromHubIDs) == 0 { + return nil, nil + } + + var ret []cortypes.HubConnectivity + + err := ctx.Table("HubConnectivity").Where("FromHubID IN (?)", fromHubIDs).Find(&ret).Error + return ret, err +} + +func (db *HubConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []cortypes.HubConnectivity) error { + if len(cons) == 0 { + return nil + } + + // 使用 GORM 的批量插入或更新 + return ctx.Table("HubConnectivity").Clauses(clause.OnConflict{ + UpdateAll: true, + }).Create(&cons).Error +} diff --git a/coordinator/internal/db/location.go b/coordinator/internal/db/location.go new file mode 100644 index 0000000..d4bfe2c --- /dev/null +++ b/coordinator/internal/db/location.go @@ -0,0 +1,36 @@ +package db + +import ( + "fmt" + + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" +) + +type LocationDB struct { + *DB +} + +func (db *DB) Location() *LocationDB { + return &LocationDB{DB: db} +} + +func (*LocationDB) GetByID(ctx SQLContext, id int64) (cortypes.Location, error) { + var ret cortypes.Location + err := ctx.First(&ret, id).Error + return ret, err +} + +func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (cortypes.Location, error) { + var locID int64 + err := ctx.Table("Hub").Select("LocationID").Where("ExternalIP = ?", ip).Scan(&locID).Error + if err != nil { + return cortypes.Location{}, fmt.Errorf("finding hub by external ip: %w", err) + } + + loc, err := db.GetByID(ctx, locID) + if err != nil { + return cortypes.Location{}, fmt.Errorf("getting location by id: %w", err) + } + + return loc, nil +} diff --git a/coordinator/internal/db/storage.go b/coordinator/internal/db/storage.go new file mode 100644 index 0000000..41a06a3 --- /dev/null +++ b/coordinator/internal/db/storage.go @@ -0,0 +1,43 @@ +package db + +import ( + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" +) + +type StorageDB struct { + *DB +} + +func (db *DB) Storage() *StorageDB { + return &StorageDB{DB: db} +} + +func (db *StorageDB) GetByID(ctx SQLContext, stgID cortypes.StorageID) (cortypes.Storage, error) { + var stg cortypes.Storage + err := ctx.Table("Storage").First(&stg, stgID).Error + return stg, err +} + +func (StorageDB) GetAllIDs(ctx SQLContext) ([]cortypes.StorageID, error) { + var stgs []cortypes.StorageID + err := ctx.Table("Storage").Select("StorageID").Find(&stgs).Error + return stgs, err +} + +func (db *StorageDB) BatchGetByID(ctx SQLContext, stgIDs []cortypes.StorageID) ([]cortypes.Storage, error) { + var stgs []cortypes.Storage + err := ctx.Table("Storage").Find(&stgs, "StorageID IN (?)", stgIDs).Error + return stgs, err +} + +func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cortypes.StorageID, error) { + var ret []cortypes.StorageID + err := ctx.Table("Storage").Select("StorageID").Find(&ret).Limit(count).Offset(start).Error + return ret, err +} + +func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cortypes.HubID) ([]cortypes.Storage, error) { + var stgs []cortypes.Storage + err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "MasterHub = ?", hubID).Error + return stgs, err +} diff --git a/coordinator/internal/db/union_serializer.go b/coordinator/internal/db/union_serializer.go new file mode 100644 index 0000000..490cd30 --- /dev/null +++ b/coordinator/internal/db/union_serializer.go @@ -0,0 +1,44 @@ +package db + +import ( + "context" + "fmt" + "reflect" + + "gitlink.org.cn/cloudream/common/utils/serder" + "gorm.io/gorm/schema" +) + +type UnionSerializer struct { +} + +func (UnionSerializer) Scan(ctx context.Context, field *schema.Field, dst reflect.Value, dbValue interface{}) error { + fieldValue := reflect.New(field.FieldType) + if dbValue != nil { + var data []byte + switch v := dbValue.(type) { + case []byte: + data = v + case string: + data = []byte(v) + default: + return fmt.Errorf("failed to unmarshal JSONB value: %#v", dbValue) + } + + err := serder.JSONToObjectExRaw(data, fieldValue.Interface()) + if err != nil { + return err + } + } + + field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem()) + return nil +} + +func (UnionSerializer) Value(ctx context.Context, field *schema.Field, dst reflect.Value, fieldValue interface{}) (interface{}, error) { + return serder.ObjectToJSONEx(fieldValue) +} + +func init() { + schema.RegisterSerializer("union", UnionSerializer{}) +} diff --git a/coordinator/internal/db/user.go b/coordinator/internal/db/user.go new file mode 100644 index 0000000..a9fd156 --- /dev/null +++ b/coordinator/internal/db/user.go @@ -0,0 +1,44 @@ +package db + +import ( + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" + "gorm.io/gorm" +) + +type UserDB struct { + *DB +} + +func (db *DB) User() *UserDB { + return &UserDB{DB: db} +} + +func (db *UserDB) GetByID(ctx SQLContext, userID cortypes.UserID) (cortypes.User, error) { + var ret cortypes.User + err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error + return ret, err +} + +func (db *UserDB) GetByName(ctx SQLContext, name string) (cortypes.User, error) { + var ret cortypes.User + err := ctx.Table("User").Where("Name = ?", name).First(&ret).Error + return ret, err +} + +func (db *UserDB) Create(ctx SQLContext, name string) (cortypes.User, error) { + _, err := db.GetByName(ctx, name) + if err == nil { + return cortypes.User{}, gorm.ErrDuplicatedKey + } + if err != gorm.ErrRecordNotFound { + return cortypes.User{}, err + } + + user := cortypes.User{Name: name} + err = ctx.Table("User").Create(&user).Error + return user, err +} + +func (*UserDB) Delete(ctx SQLContext, userID cortypes.UserID) error { + return ctx.Table("User").Delete(&cortypes.User{UserID: userID}).Error +} diff --git a/coordinator/internal/mq/agent.go b/coordinator/internal/mq/agent.go deleted file mode 100644 index 71893fd..0000000 --- a/coordinator/internal/mq/agent.go +++ /dev/null @@ -1 +0,0 @@ -package mq diff --git a/coordinator/internal/mq/bucket.go b/coordinator/internal/mq/bucket.go deleted file mode 100644 index a2f13ca..0000000 --- a/coordinator/internal/mq/bucket.go +++ /dev/null @@ -1,143 +0,0 @@ -package mq - -import ( - "errors" - "fmt" - "time" - - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gorm.io/gorm" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" -) - -func (svc *Service) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) { - // TODO - panic("not implement yet") -} - -func (svc *Service) GetBucketByName(msg *coormq.GetBucketByName) (*coormq.GetBucketByNameResp, *mq.CodeMessage) { - bucket, err := svc.db2.Bucket().GetUserBucketByName(svc.db2.DefCtx(), msg.UserID, msg.Name) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("Name", msg.Name). - Warnf("getting bucket by name: %s", err.Error()) - - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, mq.Failed(errorcode.DataNotFound, "bucket not found") - } - - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - return mq.ReplyOK(coormq.RespGetBucketByName(bucket)) -} - -func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) { - buckets, err := svc.db2.Bucket().GetUserBuckets(svc.db2.DefCtx(), msg.UserID) - - if err != nil { - logger.WithField("UserID", msg.UserID). - Warnf("get user buckets failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets)) -} - -func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.GetBucketPackagesResp, *mq.CodeMessage) { - packages, err := svc.db2.Package().GetUserBucketPackages(svc.db2.DefCtx(), msg.UserID, msg.BucketID) - - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("BucketID", msg.BucketID). - Warnf("get bucket packages failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get bucket packages failed") - } - - return mq.ReplyOK(coormq.NewGetBucketPackagesResp(packages)) -} - -func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { - var bucket cdssdk.Bucket - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - _, err := svc.db2.User().GetByID(tx, msg.UserID) - if err != nil { - return fmt.Errorf("getting user by id: %w", err) - } - - bucket, err = svc.db2.Bucket().Create(tx, msg.UserID, msg.BucketName, time.Now()) - if err != nil { - return fmt.Errorf("creating bucket: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("BucketName", msg.BucketName). - Warn(err.Error()) - - if errors.Is(err, gorm.ErrDuplicatedKey) { - return nil, mq.Failed(errorcode.DataExists, "bucket name already exists") - } - - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - svc.evtPub.Publish(&stgmod.BodyNewBucket{ - Info: bucket, - }) - - return mq.ReplyOK(coormq.NewCreateBucketResp(bucket)) -} - -func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) - if !isAvai { - return fmt.Errorf("bucket is not avaiable to the user") - } - - if err := svc.db2.UserBucket().DeleteByBucketID(tx, msg.BucketID); err != nil { - return fmt.Errorf("deleting user bucket: %w", err) - } - - pkgs, err := svc.db2.Package().GetBucketPackages(tx, msg.BucketID) - if err != nil { - return fmt.Errorf("getting bucket packages: %w", err) - } - - for _, pkg := range pkgs { - err := svc.db2.Package().DeleteComplete(tx, pkg.PackageID) - if err != nil { - return fmt.Errorf("deleting package %v: %w", pkg.PackageID, err) - } - } - - err = svc.db2.Bucket().Delete(tx, msg.BucketID) - if err != nil { - return fmt.Errorf("deleting bucket: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("BucketID", msg.BucketID). - Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "delete bucket failed") - } - - svc.evtPub.Publish(&stgmod.BodyBucketDeleted{ - BucketID: msg.BucketID, - }) - - return mq.ReplyOK(coormq.NewDeleteBucketResp()) -} diff --git a/coordinator/internal/mq/cache.go b/coordinator/internal/mq/cache.go deleted file mode 100644 index dc7db2b..0000000 --- a/coordinator/internal/mq/cache.go +++ /dev/null @@ -1,66 +0,0 @@ -package mq - -import ( - "fmt" - - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" -) - -func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.CachePackageMovedResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - _, err := svc.db2.Package().GetByID(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package by id: %w", err) - } - - _, err = svc.db2.Storage().GetByID(tx, msg.StorageID) - if err != nil { - return fmt.Errorf("getting storage by id: %w", err) - } - - err = svc.db2.PinnedObject().CreateFromPackage(tx, msg.PackageID, msg.StorageID) - if err != nil { - return fmt.Errorf("creating pinned objects from package: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("PackageID", msg.PackageID).WithField("HubID", msg.StorageID).Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "create package pinned objects failed") - } - - return mq.ReplyOK(coormq.NewCachePackageMovedResp()) -} - -func (svc *Service) CacheRemovePackage(msg *coormq.CacheRemovePackage) (*coormq.CacheRemovePackageResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - _, err := svc.db2.Package().GetByID(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package by id: %w", err) - } - - _, err = svc.db2.Storage().GetByID(tx, msg.StorageID) - if err != nil { - return fmt.Errorf("getting storage by id: %w", err) - } - - err = svc.db2.PinnedObject().DeleteInPackageAtStorage(tx, msg.PackageID, msg.StorageID) - if err != nil { - return fmt.Errorf("delete pinned objects in package at storage: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("PackageID", msg.PackageID).WithField("HubID", msg.StorageID).Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "remove pinned package failed") - } - - return mq.ReplyOK(coormq.RespCacheRemovePackage()) -} diff --git a/coordinator/internal/mq/hub.go b/coordinator/internal/mq/hub.go index 217d82c..e7b1d8d 100644 --- a/coordinator/internal/mq/hub.go +++ b/coordinator/internal/mq/hub.go @@ -3,64 +3,18 @@ package mq import ( "fmt" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) -func (svc *Service) GetHubConfig(msg *coormq.GetHubConfig) (*coormq.GetHubConfigResp, *mq.CodeMessage) { - log := logger.WithField("HubID", msg.HubID) - - hub, err := svc.db2.Hub().GetByID(svc.db2.DefCtx(), msg.HubID) - if err != nil { - log.Warnf("getting hub: %v", err) - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err)) - } - - detailsMap := make(map[cdssdk.StorageID]*stgmod.StorageDetail) - - stgs, err := svc.db2.Storage().GetHubStorages(svc.db2.DefCtx(), msg.HubID) - if err != nil { - log.Warnf("getting hub storages: %v", err) - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub storages: %v", err)) - } - - for _, stg := range stgs { - detailsMap[stg.StorageID] = &stgmod.StorageDetail{ - Storage: stg, - MasterHub: &hub, - } - } - - var details []stgmod.StorageDetail - for _, detail := range detailsMap { - details = append(details, *detail) - } - - return mq.ReplyOK(coormq.RespGetHubConfig(hub, details)) -} - -func (svc *Service) GetUserHubs(msg *coormq.GetUserHubs) (*coormq.GetUserHubsResp, *mq.CodeMessage) { - hubs, err := svc.db2.Hub().GetUserHubs(svc.db2.DefCtx(), msg.UserID) - if err != nil { - logger.WithField("UserID", msg.UserID). - Warnf("query user hubs failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "query user hubs failed") - } - - return mq.ReplyOK(coormq.NewGetUserHubsResp(hubs)) -} - func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeMessage) { - var hubs []*cdssdk.Hub + var hubs []*cortypes.Hub if msg.HubIDs == nil { - get, err := svc.db2.Hub().GetAllHubs(svc.db2.DefCtx()) + get, err := svc.db.Hub().GetAllHubs(svc.db.DefCtx()) if err != nil { logger.Warnf("getting all hubs: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get all hub failed") @@ -72,13 +26,13 @@ func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeM } else { // 可以不用事务 - get, err := svc.db2.Hub().BatchGetByID(svc.db2.DefCtx(), msg.HubIDs) + get, err := svc.db.Hub().BatchGetByID(svc.db.DefCtx(), msg.HubIDs) if err != nil { logger.Warnf("batch get hubs by id: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("batch get hubs by id: %v", err)) } - getMp := make(map[cdssdk.HubID]cdssdk.Hub) + getMp := make(map[cortypes.HubID]cortypes.Hub) for _, hub := range get { getMp[hub.HubID] = hub } @@ -97,7 +51,7 @@ func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeM } func (svc *Service) GetHubConnectivities(msg *coormq.GetHubConnectivities) (*coormq.GetHubConnectivitiesResp, *mq.CodeMessage) { - cons, err := svc.db2.HubConnectivity().BatchGetByFromHub(svc.db2.DefCtx(), msg.HubIDs) + cons, err := svc.db.HubConnectivity().BatchGetByFromHub(svc.db.DefCtx(), msg.HubIDs) if err != nil { logger.Warnf("batch get hub connectivities by from hub: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "batch get hub connectivities by from hub failed") @@ -105,38 +59,3 @@ func (svc *Service) GetHubConnectivities(msg *coormq.GetHubConnectivities) (*coo return mq.ReplyOK(coormq.RespGetHubConnectivities(cons)) } - -func (svc *Service) UpdateHubConnectivities(msg *coormq.UpdateHubConnectivities) (*coormq.UpdateHubConnectivitiesResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - // 只有发起节点和目的节点都存在,才能插入这条记录到数据库 - allHubs, err := svc.db2.Hub().GetAllHubs(tx) - if err != nil { - return fmt.Errorf("getting all hubs: %w", err) - } - - allHubID := make(map[cdssdk.HubID]bool) - for _, hub := range allHubs { - allHubID[hub.HubID] = true - } - - var avaiCons []cdssdk.HubConnectivity - for _, con := range msg.Connectivities { - if allHubID[con.FromHubID] && allHubID[con.ToHubID] { - avaiCons = append(avaiCons, con) - } - } - - err = svc.db2.HubConnectivity().BatchUpdateOrCreate(tx, avaiCons) - if err != nil { - return fmt.Errorf("batch update or create hub connectivities: %s", err) - } - - return nil - }) - if err != nil { - logger.Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - return mq.ReplyOK(coormq.RespUpdateHubConnectivities()) -} diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go deleted file mode 100644 index d9ee1c2..0000000 --- a/coordinator/internal/mq/object.go +++ /dev/null @@ -1,891 +0,0 @@ -package mq - -import ( - "errors" - "fmt" - "time" - - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" - "gorm.io/gorm" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" - "gitlink.org.cn/cloudream/common/utils/sort2" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" -) - -func (svc *Service) GetObjects(msg *coormq.GetObjects) (*coormq.GetObjectsResp, *mq.CodeMessage) { - var ret []*cdssdk.Object - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - // TODO 应该检查用户是否有每一个Object所在Package的权限 - objs, err := svc.db2.Object().BatchGet(tx, msg.ObjectIDs) - if err != nil { - return err - } - - objMp := make(map[cdssdk.ObjectID]cdssdk.Object) - for _, obj := range objs { - objMp[obj.ObjectID] = obj - } - - for _, objID := range msg.ObjectIDs { - o, ok := objMp[objID] - if ok { - ret = append(ret, &o) - } else { - ret = append(ret, nil) - } - } - - return err - }) - if err != nil { - logger.WithField("UserID", msg.UserID). - Warn(err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get objects failed") - } - - return mq.ReplyOK(coormq.RespGetObjects(ret)) -} - -func (svc *Service) ListObjectsByPath(msg *coormq.ListObjectsByPath) (*coormq.ListObjectsByPathResp, *mq.CodeMessage) { - var coms []string - var objs []cdssdk.Object - var conToken string - - maxKeys := 1000 - if msg.MaxKeys > 0 { - maxKeys = msg.MaxKeys - } - - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - var err error - - _, err = svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package by id: %w", err) - } - - if !msg.IsPrefix { - obj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) - if err != nil { - return fmt.Errorf("getting object by path: %w", err) - } - objs = append(objs, obj) - - return nil - } - - if !msg.NoRecursive { - objs, err = svc.db2.Object().GetWithPathPrefixPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys) - if err != nil { - return fmt.Errorf("getting objects with prefix: %w", err) - } - - if len(objs) > 0 { - conToken = objs[len(objs)-1].Path - } - - return nil - } - - objs, coms, conToken, err = svc.db2.Object().GetByPrefixGroupedPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys) - return err - }) - if err != nil { - logger.WithField("PathPrefix", msg.Path).Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed") - } - - return mq.ReplyOK(coormq.RespListObjectsByPath(cdsapi.ObjectListByPathResp{ - CommonPrefixes: coms, - Objects: objs, - IsTruncated: len(coms)+len(objs) >= maxKeys, - NextContinuationToken: conToken, - })) -} - -func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { - var objs []cdssdk.Object - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - _, err := svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package by id: %w", err) - } - - objs, err = svc.db2.Object().GetPackageObjects(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package objects: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("UserID", msg.UserID).WithField("PackageID", msg.PackageID). - Warn(err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") - } - - return mq.ReplyOK(coormq.RespGetPackageObjects(objs)) -} - -func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) { - var details []stgmod.ObjectDetail - // 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性 - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - var err error - _, err = svc.db2.Package().GetByID(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package by id: %w", err) - } - - details, err = svc.db2.Object().GetPackageObjectDetails(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package block details: %w", err) - } - - return nil - }) - - if err != nil { - logger.WithField("PackageID", msg.PackageID).Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed") - } - - return mq.ReplyOK(coormq.RespPackageObjectDetails(details)) -} - -func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) { - detailsMp := make(map[cdssdk.ObjectID]*stgmod.ObjectDetail) - - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - var err error - - msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs) - - // 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并 - objs, err := svc.db2.Object().BatchGet(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch get objects: %w", err) - } - for _, obj := range objs { - detailsMp[obj.ObjectID] = &stgmod.ObjectDetail{ - Object: obj, - } - } - - // 查询合并 - blocks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch get object blocks: %w", err) - } - for _, block := range blocks { - d := detailsMp[block.ObjectID] - d.Blocks = append(d.Blocks, block) - } - - // 查询合并 - pinneds, err := svc.db2.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch get pinned objects: %w", err) - } - for _, pinned := range pinneds { - d := detailsMp[pinned.ObjectID] - d.PinnedAt = append(d.PinnedAt, pinned.StorageID) - } - - return nil - }) - - if err != nil { - logger.Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get object details failed") - } - - details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs)) - for i, objID := range msg.ObjectIDs { - details[i] = detailsMp[objID] - } - - return mq.ReplyOK(coormq.RespGetObjectDetails(details)) -} - -func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(ctx db2.SQLContext) error { - db := svc.db2 - objs := msg.Updatings - - nowTime := time.Now() - objIDs := make([]cdssdk.ObjectID, 0, len(objs)) - for _, obj := range objs { - objIDs = append(objIDs, obj.ObjectID) - } - - avaiIDs, err := db.Object().BatchTestObjectID(ctx, objIDs) - if err != nil { - return fmt.Errorf("batch test object id: %w", err) - } - - // 过滤掉已经不存在的对象。 - // 注意,objIDs没有被过滤,因为后续逻辑不过滤也不会出错 - objs = lo.Filter(objs, func(obj coormq.UpdatingObjectRedundancy, _ int) bool { - return avaiIDs[obj.ObjectID] - }) - - dummyObjs := make([]cdssdk.Object, 0, len(objs)) - for _, obj := range objs { - dummyObjs = append(dummyObjs, cdssdk.Object{ - ObjectID: obj.ObjectID, - FileHash: obj.FileHash, - Size: obj.Size, - Redundancy: obj.Redundancy, - CreateTime: nowTime, // 实际不会更新,只因为不能是0值 - UpdateTime: nowTime, - }) - } - - err = db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"FileHash", "Size", "Redundancy", "UpdateTime"}) - if err != nil { - return fmt.Errorf("batch update object redundancy: %w", err) - } - - // 删除原本所有的编码块记录,重新添加 - err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) - if err != nil { - return fmt.Errorf("batch delete object blocks: %w", err) - } - - // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 - err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) - if err != nil { - return fmt.Errorf("batch delete pinned object: %w", err) - } - - blocks := make([]stgmod.ObjectBlock, 0, len(objs)) - for _, obj := range objs { - blocks = append(blocks, obj.Blocks...) - } - err = db.ObjectBlock().BatchCreate(ctx, blocks) - if err != nil { - return fmt.Errorf("batch create object blocks: %w", err) - } - - caches := make([]model.Cache, 0, len(objs)) - for _, obj := range objs { - for _, blk := range obj.Blocks { - caches = append(caches, model.Cache{ - FileHash: blk.FileHash, - StorageID: blk.StorageID, - CreateTime: nowTime, - Priority: 0, - }) - } - } - err = db.Cache().BatchCreate(ctx, caches) - if err != nil { - return fmt.Errorf("batch create object caches: %w", err) - } - - pinneds := make([]cdssdk.PinnedObject, 0, len(objs)) - for _, obj := range objs { - for _, p := range obj.PinnedAt { - pinneds = append(pinneds, cdssdk.PinnedObject{ - ObjectID: obj.ObjectID, - StorageID: p, - CreateTime: nowTime, - }) - } - } - err = db.PinnedObject().BatchTryCreate(ctx, pinneds) - if err != nil { - return fmt.Errorf("batch create pinned objects: %w", err) - } - - return nil - }) - if err != nil { - logger.Warnf("batch updating redundancy: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed") - } - - return mq.ReplyOK(coormq.RespUpdateObjectRedundancy()) -} - -func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) { - var sucs []cdssdk.ObjectID - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdsapi.UpdatingObject) int { - return sort2.Cmp(o1.ObjectID, o2.ObjectID) - }) - - objIDs := make([]cdssdk.ObjectID, len(msg.Updatings)) - for i, obj := range msg.Updatings { - objIDs[i] = obj.ObjectID - } - - oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs) - if err != nil { - return fmt.Errorf("batch getting objects: %w", err) - } - oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs)) - for i, obj := range oldObjs { - oldObjIDs[i] = obj.ObjectID - } - - avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdsapi.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID }) - if len(notExistsObjs) > 0 { - // TODO 部分对象已经不存在 - } - - newObjs := make([]cdssdk.Object, len(avaiUpdatings)) - for i := range newObjs { - newObjs[i] = oldObjs[i] - avaiUpdatings[i].ApplyTo(&newObjs[i]) - } - - err = svc.db2.Object().BatchUpdate(tx, newObjs) - if err != nil { - return fmt.Errorf("batch create or update: %w", err) - } - - sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID }) - return nil - }) - - if err != nil { - logger.Warnf("batch updating objects: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed") - } - - return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs)) -} - -// 根据objIDs从objs中挑选Object。 -// len(objs) >= len(objIDs) -func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) { - objIdx := 0 - idIdx := 0 - - for idIdx < len(objIDs) && objIdx < len(objs) { - if getID(objs[objIdx]) < objIDs[idIdx] { - notFound = append(notFound, objs[objIdx]) - objIdx++ - continue - } - - picked = append(picked, objs[objIdx]) - objIdx++ - idIdx++ - } - - return -} - -func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) { - var sucs []cdssdk.ObjectID - var evt []*stgmod.BodyObjectInfoUpdated - - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdsapi.MovingObject) int { - return sort2.Cmp(o1.ObjectID, o2.ObjectID) - }) - - objIDs := make([]cdssdk.ObjectID, len(msg.Movings)) - for i, obj := range msg.Movings { - objIDs[i] = obj.ObjectID - } - - oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs) - if err != nil { - return fmt.Errorf("batch getting objects: %w", err) - } - oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs)) - for i, obj := range oldObjs { - oldObjIDs[i] = obj.ObjectID - } - - // 找出仍在数据库的Object - avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdsapi.MovingObject) cdssdk.ObjectID { return obj.ObjectID }) - if len(notExistsObjs) > 0 { - // TODO 部分对象已经不存在 - } - - // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突 - var pkgIDChangedObjs []cdssdk.Object - var pathChangedObjs []cdssdk.Object - for i := range avaiMovings { - if avaiMovings[i].PackageID != oldObjs[i].PackageID { - newObj := oldObjs[i] - avaiMovings[i].ApplyTo(&newObj) - pkgIDChangedObjs = append(pkgIDChangedObjs, newObj) - } else if avaiMovings[i].Path != oldObjs[i].Path { - newObj := oldObjs[i] - avaiMovings[i].ApplyTo(&newObj) - pathChangedObjs = append(pathChangedObjs, newObj) - } - } - - var newObjs []cdssdk.Object - // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象 - checkedObjs, err := svc.checkPackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs) - if err != nil { - return err - } - newObjs = append(newObjs, checkedObjs...) - - // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象 - checkedObjs, err = svc.checkPathChangedObjects(tx, msg.UserID, pathChangedObjs) - if err != nil { - return err - } - newObjs = append(newObjs, checkedObjs...) - - err = svc.db2.Object().BatchUpdate(tx, newObjs) - if err != nil { - return fmt.Errorf("batch create or update: %w", err) - } - - sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID }) - evt = lo.Map(newObjs, func(obj cdssdk.Object, _ int) *stgmod.BodyObjectInfoUpdated { - return &stgmod.BodyObjectInfoUpdated{ - Object: obj, - } - }) - return nil - }) - if err != nil { - logger.Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "move objects failed") - } - - for _, e := range evt { - svc.evtPub.Publish(e) - } - - return mq.ReplyOK(coormq.RespMoveObjects(sucs)) -} - -func (svc *Service) checkPackageChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { - if len(objs) == 0 { - return nil, nil - } - - type PackageObjects struct { - PackageID cdssdk.PackageID - ObjectByPath map[string]*cdssdk.Object - } - - packages := make(map[cdssdk.PackageID]*PackageObjects) - for _, obj := range objs { - pkg, ok := packages[obj.PackageID] - if !ok { - pkg = &PackageObjects{ - PackageID: obj.PackageID, - ObjectByPath: make(map[string]*cdssdk.Object), - } - packages[obj.PackageID] = pkg - } - - if pkg.ObjectByPath[obj.Path] == nil { - o := obj - pkg.ObjectByPath[obj.Path] = &o - } else { - // TODO 有两个对象移动到同一个路径,有冲突 - } - } - - var willUpdateObjs []cdssdk.Object - for _, pkg := range packages { - _, err := svc.db2.Package().GetUserPackage(tx, userID, pkg.PackageID) - if errors.Is(err, gorm.ErrRecordNotFound) { - continue - } - if err != nil { - return nil, fmt.Errorf("getting user package by id: %w", err) - } - - existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath)) - if err != nil { - return nil, fmt.Errorf("batch getting objects by package path: %w", err) - } - - // 标记冲突的对象 - for _, obj := range existsObjs { - pkg.ObjectByPath[obj.Path] = nil - // TODO 目标Package内有冲突的对象 - } - - for _, obj := range pkg.ObjectByPath { - if obj == nil { - continue - } - willUpdateObjs = append(willUpdateObjs, *obj) - } - } - - return willUpdateObjs, nil -} - -func (svc *Service) checkPathChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { - if len(objs) == 0 { - return nil, nil - } - - objByPath := make(map[string]*cdssdk.Object) - for _, obj := range objs { - if objByPath[obj.Path] == nil { - o := obj - objByPath[obj.Path] = &o - } else { - // TODO 有两个对象移动到同一个路径,有冲突 - } - - } - - _, err := svc.db2.Package().GetUserPackage(tx, userID, objs[0].PackageID) - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, nil - } - if err != nil { - return nil, fmt.Errorf("getting user package by id: %w", err) - } - - existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path })) - if err != nil { - return nil, fmt.Errorf("batch getting objects by package path: %w", err) - } - - // 不支持两个对象交换位置的情况,因为数据库不支持 - for _, obj := range existsObjs { - objByPath[obj.Path] = nil - } - - var willMoveObjs []cdssdk.Object - for _, obj := range objByPath { - if obj == nil { - continue - } - willMoveObjs = append(willMoveObjs, *obj) - } - - return willMoveObjs, nil -} - -func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) { - var sucs []cdssdk.ObjectID - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - avaiIDs, err := svc.db2.Object().BatchTestObjectID(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch testing object id: %w", err) - } - sucs = lo.Keys(avaiIDs) - - err = svc.db2.Object().BatchDelete(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch deleting objects: %w", err) - } - - err = svc.db2.ObjectBlock().BatchDeleteByObjectID(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch deleting object blocks: %w", err) - } - - err = svc.db2.PinnedObject().BatchDeleteByObjectID(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch deleting pinned objects: %w", err) - } - - err = svc.db2.ObjectAccessStat().BatchDeleteByObjectID(tx, msg.ObjectIDs) - if err != nil { - return fmt.Errorf("batch deleting object access stats: %w", err) - } - - return nil - }) - if err != nil { - logger.Warnf("batch deleting objects: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed") - } - - for _, objID := range sucs { - svc.evtPub.Publish(&stgmod.BodyObjectDeleted{ - ObjectID: objID, - }) - } - - return mq.ReplyOK(coormq.RespDeleteObjects(sucs)) -} - -func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjectsResp, *mq.CodeMessage) { - type CloningObject struct { - Cloning cdsapi.CloningObject - OrgIndex int - } - type PackageClonings struct { - PackageID cdssdk.PackageID - Clonings map[string]CloningObject - } - - var evt []*stgmod.BodyNewOrUpdateObject - - // TODO 要检查用户是否有Object、Package的权限 - clonings := make(map[cdssdk.PackageID]*PackageClonings) - for i, cloning := range msg.Clonings { - pkg, ok := clonings[cloning.NewPackageID] - if !ok { - pkg = &PackageClonings{ - PackageID: cloning.NewPackageID, - Clonings: make(map[string]CloningObject), - } - clonings[cloning.NewPackageID] = pkg - } - pkg.Clonings[cloning.NewPath] = CloningObject{ - Cloning: cloning, - OrgIndex: i, - } - } - - ret := make([]*cdssdk.Object, len(msg.Clonings)) - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - // 剔除掉新路径已经存在的对象 - for _, pkg := range clonings { - exists, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.Clonings)) - if err != nil { - return fmt.Errorf("batch getting objects by package path: %w", err) - } - - for _, obj := range exists { - delete(pkg.Clonings, obj.Path) - } - } - - // 删除目的Package不存在的对象 - newPkg, err := svc.db2.Package().BatchTestPackageID(tx, lo.Keys(clonings)) - if err != nil { - return fmt.Errorf("batch testing package id: %w", err) - } - for _, pkg := range clonings { - if !newPkg[pkg.PackageID] { - delete(clonings, pkg.PackageID) - } - } - - var avaiClonings []CloningObject - var avaiObjIDs []cdssdk.ObjectID - for _, pkg := range clonings { - for _, cloning := range pkg.Clonings { - avaiClonings = append(avaiClonings, cloning) - avaiObjIDs = append(avaiObjIDs, cloning.Cloning.ObjectID) - } - } - - avaiDetails, err := svc.db2.Object().BatchGetDetails(tx, avaiObjIDs) - if err != nil { - return fmt.Errorf("batch getting object details: %w", err) - } - - avaiDetailsMap := make(map[cdssdk.ObjectID]stgmod.ObjectDetail) - for _, detail := range avaiDetails { - avaiDetailsMap[detail.Object.ObjectID] = detail - } - - oldAvaiClonings := avaiClonings - avaiClonings = nil - - var newObjs []cdssdk.Object - for _, cloning := range oldAvaiClonings { - // 进一步剔除原始对象不存在的情况 - detail, ok := avaiDetailsMap[cloning.Cloning.ObjectID] - if !ok { - continue - } - - avaiClonings = append(avaiClonings, cloning) - - newObj := detail.Object - newObj.ObjectID = 0 - newObj.Path = cloning.Cloning.NewPath - newObj.PackageID = cloning.Cloning.NewPackageID - newObjs = append(newObjs, newObj) - } - - // 先创建出新对象 - err = svc.db2.Object().BatchCreate(tx, &newObjs) - if err != nil { - return fmt.Errorf("batch creating objects: %w", err) - } - - // 创建了新对象就能拿到新对象ID,再创建新对象块 - var newBlks []stgmod.ObjectBlock - for i, cloning := range avaiClonings { - oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks - for _, blk := range oldBlks { - newBlk := blk - newBlk.ObjectID = newObjs[i].ObjectID - newBlks = append(newBlks, newBlk) - } - } - - err = svc.db2.ObjectBlock().BatchCreate(tx, newBlks) - if err != nil { - return fmt.Errorf("batch creating object blocks: %w", err) - } - - for i, cloning := range avaiClonings { - ret[cloning.OrgIndex] = &newObjs[i] - } - - for i, cloning := range avaiClonings { - var evtBlks []stgmod.BlockDistributionObjectInfo - blkType := getBlockTypeFromRed(newObjs[i].Redundancy) - - oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks - for _, blk := range oldBlks { - evtBlks = append(evtBlks, stgmod.BlockDistributionObjectInfo{ - BlockType: blkType, - Index: blk.Index, - StorageID: blk.StorageID, - }) - } - - evt = append(evt, &stgmod.BodyNewOrUpdateObject{ - Info: newObjs[i], - BlockDistribution: evtBlks, - }) - } - return nil - }) - - if err != nil { - logger.Warnf("cloning objects: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - for _, e := range evt { - svc.evtPub.Publish(e) - } - - return mq.ReplyOK(coormq.RespCloneObjects(ret)) -} - -func (svc *Service) NewMultipartUploadObject(msg *coormq.NewMultipartUploadObject) (*coormq.NewMultipartUploadObjectResp, *mq.CodeMessage) { - var obj cdssdk.Object - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - oldObj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) - if err == nil { - obj = oldObj - err := svc.db2.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID) - if err != nil { - return fmt.Errorf("delete object blocks: %w", err) - } - - obj.FileHash = cdssdk.EmptyHash - obj.Size = 0 - obj.Redundancy = cdssdk.NewMultipartUploadRedundancy() - obj.UpdateTime = time.Now() - - err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj}) - if err != nil { - return fmt.Errorf("update object: %w", err) - } - - return nil - } - - obj = cdssdk.Object{ - PackageID: msg.PackageID, - Path: msg.Path, - FileHash: cdssdk.EmptyHash, - Size: 0, - Redundancy: cdssdk.NewMultipartUploadRedundancy(), - CreateTime: time.Now(), - UpdateTime: time.Now(), - } - objID, err := svc.db2.Object().Create(tx, obj) - if err != nil { - return fmt.Errorf("create object: %w", err) - } - - obj.ObjectID = objID - return nil - }) - if err != nil { - logger.Warnf("new multipart upload object: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("new multipart upload object: %v", err)) - } - - return mq.ReplyOK(coormq.RespNewMultipartUploadObject(obj)) -} - -func (svc *Service) AddMultipartUploadPart(msg *coormq.AddMultipartUploadPart) (*coormq.AddMultipartUploadPartResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - obj, err := svc.db2.Object().GetByID(tx, msg.ObjectID) - if err != nil { - return fmt.Errorf("getting object by id: %w", err) - } - - _, ok := obj.Redundancy.(*cdssdk.MultipartUploadRedundancy) - if !ok { - return fmt.Errorf("object is not a multipart upload object") - } - - blks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, []cdssdk.ObjectID{obj.ObjectID}) - if err != nil { - return fmt.Errorf("batch getting object blocks: %w", err) - } - - blks = lo.Reject(blks, func(blk stgmod.ObjectBlock, idx int) bool { return blk.Index == msg.Block.Index }) - blks = append(blks, msg.Block) - - blks = sort2.Sort(blks, func(a, b stgmod.ObjectBlock) int { return a.Index - b.Index }) - - totalSize := int64(0) - var hashes [][]byte - for _, blk := range blks { - totalSize += blk.Size - hashes = append(hashes, blk.FileHash.GetHashBytes()) - } - - newObjHash := cdssdk.CalculateCompositeHash(hashes) - obj.Size = totalSize - obj.FileHash = newObjHash - obj.UpdateTime = time.Now() - - err = svc.db2.ObjectBlock().DeleteByObjectIDIndex(tx, msg.ObjectID, msg.Block.Index) - if err != nil { - return fmt.Errorf("delete object block: %w", err) - } - - err = svc.db2.ObjectBlock().Create(tx, msg.ObjectID, msg.Block.Index, msg.Block.StorageID, msg.Block.FileHash, msg.Block.Size) - if err != nil { - return fmt.Errorf("create object block: %w", err) - } - - err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj}) - if err != nil { - return fmt.Errorf("update object: %w", err) - } - - return nil - }) - if err != nil { - logger.Warnf("add multipart upload part: %s", err.Error()) - - code := errorcode.OperationFailed - if errors.Is(err, gorm.ErrRecordNotFound) { - code = errorcode.DataNotFound - } - - return nil, mq.Failed(code, fmt.Sprintf("add multipart upload part: %v", err)) - } - - return mq.ReplyOK(coormq.RespAddMultipartUploadPart()) -} diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go deleted file mode 100644 index 6779705..0000000 --- a/coordinator/internal/mq/package.go +++ /dev/null @@ -1,337 +0,0 @@ -package mq - -import ( - "errors" - "fmt" - "sort" - - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gorm.io/gorm" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" -) - -func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) { - pkg, err := svc.db2.Package().GetByID(svc.db2.DefCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("get package: %s", err.Error()) - - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, mq.Failed(errorcode.DataNotFound, "package not found") - } - - return nil, mq.Failed(errorcode.OperationFailed, "get package failed") - } - - return mq.ReplyOK(coormq.NewGetPackageResp(pkg)) -} - -func (svc *Service) GetPackageByName(msg *coormq.GetPackageByName) (*coormq.GetPackageByNameResp, *mq.CodeMessage) { - pkg, err := svc.db2.Package().GetUserPackageByName(svc.db2.DefCtx(), msg.UserID, msg.BucketName, msg.PackageName) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("BucketName", msg.BucketName). - WithField("PackageName", msg.PackageName). - Warnf("get package by name: %s", err.Error()) - - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, mq.Failed(errorcode.DataNotFound, "package not found") - } - - return nil, mq.Failed(errorcode.OperationFailed, "get package by name failed") - } - - return mq.ReplyOK(coormq.NewGetPackageByNameResp(pkg)) -} - -func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { - var pkg cdssdk.Package - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - var err error - - isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) - if !isAvai { - return fmt.Errorf("bucket is not avaiable to the user") - } - - pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name) - if err != nil { - return fmt.Errorf("creating package: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("BucketID", msg.BucketID). - WithField("Name", msg.Name). - Warn(err.Error()) - - if errors.Is(err, gorm.ErrDuplicatedKey) { - return nil, mq.Failed(errorcode.DataExists, "package already exists") - } - - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - svc.evtPub.Publish(&stgmod.BodyNewPackage{ - Info: pkg, - }) - - return mq.ReplyOK(coormq.NewCreatePackageResp(pkg)) -} - -func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { - var added []cdssdk.Object - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - _, err := svc.db2.Package().GetByID(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package by id: %w", err) - } - - ad, err := svc.db2.Object().BatchAdd(tx, msg.PackageID, msg.Adds) - if err != nil { - return fmt.Errorf("adding objects: %w", err) - } - added = ad - - return nil - }) - if err != nil { - logger.WithField("PackageID", msg.PackageID).Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "update package failed") - } - - addedMp := make(map[string]cdssdk.Object) - for _, obj := range added { - addedMp[obj.Path] = obj - } - - for _, add := range msg.Adds { - var blks []stgmod.BlockDistributionObjectInfo - for _, stgID := range add.StorageIDs { - blks = append(blks, stgmod.BlockDistributionObjectInfo{ - BlockType: stgmod.BlockTypeRaw, - StorageID: stgID, - }) - } - - svc.evtPub.Publish(&stgmod.BodyNewOrUpdateObject{ - Info: addedMp[add.Path], - BlockDistribution: blks, - }) - } - - return mq.ReplyOK(coormq.NewUpdatePackageResp(added)) -} - -func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - isAvai, _ := svc.db2.Package().IsAvailable(tx, msg.UserID, msg.PackageID) - if !isAvai { - return fmt.Errorf("package is not available to the user") - } - - err := svc.db2.Package().DeleteComplete(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("deleting package: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("PackageID", msg.PackageID). - Warnf(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "delete package failed") - } - - svc.evtPub.Publish(&stgmod.BodyPackageDeleted{ - PackageID: msg.PackageID, - }) - - return mq.ReplyOK(coormq.NewDeletePackageResp()) -} - -func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackageResp, *mq.CodeMessage) { - var pkg cdssdk.Package - var oldObjIDs []cdssdk.ObjectID - var newObjIDs []cdssdk.ObjectID - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - var err error - - isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID) - if !isAvai { - return fmt.Errorf("bucket is not avaiable to the user") - } - - pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name) - if err != nil { - return fmt.Errorf("creating package: %w", err) - } - - objs, err := svc.db2.Object().GetPackageObjects(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting package objects: %w", err) - } - - objBlks, err := svc.db2.ObjectBlock().GetInPackageID(tx, msg.PackageID) - if err != nil { - return fmt.Errorf("getting object blocks: %w", err) - } - - clonedObjs := make([]cdssdk.Object, len(objs)) - for i, obj := range objs { - clonedObjs[i] = obj - clonedObjs[i].ObjectID = 0 - clonedObjs[i].PackageID = pkg.PackageID - } - - err = svc.db2.Object().BatchCreate(tx, &clonedObjs) - if err != nil { - return fmt.Errorf("batch creating objects: %w", err) - } - - oldToNew := make(map[cdssdk.ObjectID]cdssdk.ObjectID) - for i, obj := range clonedObjs { - oldToNew[objs[i].ObjectID] = obj.ObjectID - - oldObjIDs = append(oldObjIDs, objs[i].ObjectID) - newObjIDs = append(newObjIDs, obj.ObjectID) - } - - clonedBlks := make([]stgmod.ObjectBlock, len(objBlks)) - for i, blk := range objBlks { - clonedBlks[i] = blk - clonedBlks[i].ObjectID = oldToNew[blk.ObjectID] - } - - err = svc.db2.ObjectBlock().BatchCreate(tx, clonedBlks) - if err != nil { - return fmt.Errorf("batch creating object blocks: %w", err) - } - - return nil - }) - if err != nil { - if errors.Is(err, gorm.ErrDuplicatedKey) { - return nil, mq.Failed(errorcode.DataExists, "package already exists") - } - - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - svc.evtPub.Publish(&stgmod.BodyPackageCloned{ - SourcePackageID: msg.PackageID, - NewPackage: pkg, - SourceObjectIDs: oldObjIDs, - NewObjectIDs: newObjIDs, - }) - - return mq.ReplyOK(coormq.RespClonePackage(pkg)) -} - -func (svc *Service) GetPackageCachedStorages(msg *coormq.GetPackageCachedStorages) (*coormq.GetPackageCachedStoragesResp, *mq.CodeMessage) { - isAva, err := svc.db2.Package().IsAvailable(svc.db2.DefCtx(), msg.UserID, msg.PackageID) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("PackageID", msg.PackageID). - Warnf("check package available failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "check package available failed") - } - if !isAva { - logger.WithField("UserID", msg.UserID). - WithField("PackageID", msg.PackageID). - Warnf("package is not available to the user") - return nil, mq.Failed(errorcode.OperationFailed, "package is not available to the user") - } - - // 这个函数只是统计哪些节点缓存了Package中的数据,不需要多么精确,所以可以不用事务 - objDetails, err := svc.db2.Object().GetPackageObjectDetails(svc.db2.DefCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("get package block details: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get package block details failed") - } - - var packageSize int64 - stgInfoMap := make(map[cdssdk.StorageID]*cdssdk.StoragePackageCachingInfo) - for _, obj := range objDetails { - // 只要存了文件的一个块,就认为此节点存了整个文件 - for _, block := range obj.Blocks { - info, ok := stgInfoMap[block.StorageID] - if !ok { - info = &cdssdk.StoragePackageCachingInfo{ - StorageID: block.StorageID, - } - stgInfoMap[block.StorageID] = info - - } - - info.FileSize += obj.Object.Size - info.ObjectCount++ - } - } - - var stgInfos []cdssdk.StoragePackageCachingInfo - for _, stgInfo := range stgInfoMap { - stgInfos = append(stgInfos, *stgInfo) - } - - sort.Slice(stgInfos, func(i, j int) bool { - return stgInfos[i].StorageID < stgInfos[j].StorageID - }) - return mq.ReplyOK(coormq.ReqGetPackageCachedStoragesResp(stgInfos, packageSize)) -} - -func (svc *Service) AddAccessStat(msg *coormq.AddAccessStat) { - pkgIDs := make([]cdssdk.PackageID, len(msg.Entries)) - objIDs := make([]cdssdk.ObjectID, len(msg.Entries)) - for i, e := range msg.Entries { - pkgIDs[i] = e.PackageID - objIDs[i] = e.ObjectID - } - - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - avaiPkgIDs, err := svc.db2.Package().BatchTestPackageID(tx, pkgIDs) - if err != nil { - return fmt.Errorf("batch test package id: %w", err) - } - - avaiObjIDs, err := svc.db2.Object().BatchTestObjectID(tx, objIDs) - if err != nil { - return fmt.Errorf("batch test object id: %w", err) - } - - var willAdds []coormq.AddAccessStatEntry - for _, e := range msg.Entries { - if avaiPkgIDs[e.PackageID] && avaiObjIDs[e.ObjectID] { - willAdds = append(willAdds, e) - } - } - - if len(willAdds) > 0 { - err := svc.db2.PackageAccessStat().BatchAddCounter(tx, willAdds) - if err != nil { - return fmt.Errorf("batch add package access stat counter: %w", err) - } - - err = svc.db2.ObjectAccessStat().BatchAddCounter(tx, willAdds) - if err != nil { - return fmt.Errorf("batch add object access stat counter: %w", err) - } - } - - return nil - }) - - if err != nil { - logger.Warn(err.Error()) - } -} diff --git a/coordinator/internal/mq/service.go b/coordinator/internal/mq/service.go index 8f65b12..4a05373 100644 --- a/coordinator/internal/mq/service.go +++ b/coordinator/internal/mq/service.go @@ -1,18 +1,15 @@ package mq import ( - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" + "gitlink.org.cn/cloudream/storage2/coordinator/internal/db" ) type Service struct { - db2 *db2.DB - evtPub *sysevent.Publisher + db *db.DB } -func NewService(db2 *db2.DB, evtPub *sysevent.Publisher) *Service { +func NewService(db *db.DB) *Service { return &Service{ - db2: db2, - evtPub: evtPub, + db: db, } } diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 06b15bc..df38e6c 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -1,23 +1,15 @@ package mq import ( - "errors" - "fmt" - "time" - "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gorm.io/gorm" "gitlink.org.cn/cloudream/common/pkgs/mq" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" ) func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) { - stg, err := svc.db2.Storage().GetUserStorage(svc.db2.DefCtx(), msg.UserID, msg.StorageID) + stg, err := svc.db.Storage().GetByID(svc.db.DefCtx(), msg.StorageID) if err != nil { logger.Warnf("getting user storage: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") @@ -25,117 +17,3 @@ func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, return mq.ReplyOK(coormq.RespGetStorage(stg)) } - -func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.GetStorageDetailsResp, *mq.CodeMessage) { - stgsMp := make(map[cdssdk.StorageID]*stgmod.StorageDetail) - - svc.db2.DoTx(func(tx db2.SQLContext) error { - stgs, err := svc.db2.Storage().BatchGetByID(tx, msg.StorageIDs) - if err != nil && err != gorm.ErrRecordNotFound { - return fmt.Errorf("getting storage: %w", err) - } - - details := make([]stgmod.StorageDetail, len(stgs)) - for i, stg := range stgs { - details[i] = stgmod.StorageDetail{ - Storage: stg, - } - stgsMp[stg.StorageID] = &details[i] - } - err = svc.db2.Storage().FillDetails(tx, details) - if err != nil { - return err - } - - return nil - }) - - ret := make([]*stgmod.StorageDetail, len(msg.StorageIDs)) - for i, id := range msg.StorageIDs { - stg, ok := stgsMp[id] - if !ok { - ret[i] = nil - continue - } - ret[i] = stg - } - - return mq.ReplyOK(coormq.RespGetStorageDetails(ret)) -} - -func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*coormq.GetUserStorageDetailsResp, *mq.CodeMessage) { - var ret []stgmod.StorageDetail - - svc.db2.DoTx(func(tx db2.SQLContext) error { - stgs, err := svc.db2.Storage().GetUserStorages(tx, msg.UserID) - if err != nil && err != gorm.ErrRecordNotFound { - return fmt.Errorf("getting user storages: %w", err) - } - - for _, stg := range stgs { - ret = append(ret, stgmod.StorageDetail{ - Storage: stg, - }) - } - err = svc.db2.Storage().FillDetails(tx, ret) - if err != nil { - return err - } - - return nil - }) - - return mq.ReplyOK(coormq.RespGetUserStorageDetails(ret)) -} - -func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetStorageByNameResp, *mq.CodeMessage) { - stg, err := svc.db2.Storage().GetUserStorageByName(svc.db2.DefCtx(), msg.UserID, msg.Name) - if err != nil { - logger.Warnf("getting user storage by name: %s", err.Error()) - - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, mq.Failed(errorcode.DataNotFound, "storage not found") - } - - return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") - } - - return mq.ReplyOK(coormq.RespGetStorageByNameResp(stg)) -} - -func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - // TODO 权限检查 - exists, err := svc.db2.Object().BatchTestObjectID(tx, msg.PinnedObjects) - if err != nil { - return fmt.Errorf("testing object id: %w", err) - } - - pinned := make([]cdssdk.PinnedObject, 0, len(msg.PinnedObjects)) - for _, obj := range msg.PinnedObjects { - if exists[obj] { - pinned = append(pinned, cdssdk.PinnedObject{ - StorageID: msg.StorageID, - ObjectID: obj, - CreateTime: time.Now(), - }) - } - } - - err = svc.db2.PinnedObject().BatchTryCreate(tx, pinned) - if err != nil { - return fmt.Errorf("batch creating pinned object: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("UserID", msg.UserID). - WithField("StorageID", msg.StorageID). - WithField("PackageID", msg.PackageID). - Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "user load package to storage failed") - } - - return mq.ReplyOK(coormq.RespStoragePackageLoaded()) -} diff --git a/coordinator/internal/mq/temp.go b/coordinator/internal/mq/temp.go deleted file mode 100644 index b1f565b..0000000 --- a/coordinator/internal/mq/temp.go +++ /dev/null @@ -1,52 +0,0 @@ -package mq - -import ( - "fmt" - - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" -) - -func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatabaseAllResp, *mq.CodeMessage) { - var bkts []cdssdk.Bucket - var pkgs []cdssdk.Package - var objs []stgmod.ObjectDetail - - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - var err error - bkts, err = svc.db2.Bucket().GetUserBuckets(tx, msg.UserID) - if err != nil { - return fmt.Errorf("get user buckets: %w", err) - } - - for _, bkt := range bkts { - ps, err := svc.db2.Package().GetUserBucketPackages(tx, msg.UserID, bkt.BucketID) - if err != nil { - return fmt.Errorf("get bucket packages: %w", err) - } - pkgs = append(pkgs, ps...) - } - - for _, pkg := range pkgs { - os, err := svc.db2.Object().GetPackageObjectDetails(tx, pkg.PackageID) - if err != nil { - return fmt.Errorf("get package object details: %w", err) - } - objs = append(objs, os...) - } - - return nil - }) - if err != nil { - logger.Warnf("batch deleting objects: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed") - } - - return mq.ReplyOK(coormq.RespGetDatabaseAll(bkts, pkgs, objs)) -} diff --git a/coordinator/internal/mq/user.go b/coordinator/internal/mq/user.go deleted file mode 100644 index 1cfad67..0000000 --- a/coordinator/internal/mq/user.go +++ /dev/null @@ -1,127 +0,0 @@ -package mq - -import ( - "errors" - "fmt" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - "gorm.io/gorm" -) - -func (svc *Service) CreateUser(msg *coormq.CreateUser) (*coormq.CreateUserResp, *mq.CodeMessage) { - var user cdssdk.User - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - var err error - - user, err = svc.db2.User().Create(tx, msg.Name) - if err != nil { - return fmt.Errorf("creating user: %w", err) - } - - // TODO 目前新建用户的权限与ID 1的相同 - hubs, err := svc.db2.UserHub().GetByUserID(tx, 1) - if err != nil { - return fmt.Errorf("getting user hubs: %w", err) - } - - stgs, err := svc.db2.UserStorage().GetByUserID(tx, 1) - if err != nil { - return fmt.Errorf("getting user storages: %w", err) - } - - for _, hub := range hubs { - err := svc.db2.UserHub().Create(tx, user.UserID, hub.HubID) - if err != nil { - return fmt.Errorf("creating user hub: %w", err) - } - } - - for _, stg := range stgs { - err := svc.db2.UserStorage().Create(tx, user.UserID, stg.StorageID) - if err != nil { - return fmt.Errorf("creating user storage: %w", err) - } - } - - return nil - }) - if err != nil { - logger.WithField("Name", msg.Name). - Warn(err.Error()) - - if errors.Is(err, gorm.ErrDuplicatedKey) { - return nil, mq.Failed(errorcode.DataExists, "user name already exists") - } - - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - return mq.ReplyOK(coormq.RespCreateUser(user)) -} - -func (svc *Service) DeleteUser(msg *coormq.DeleteUser) (*coormq.DeleteUserResp, *mq.CodeMessage) { - // TODO 目前不能删除ID 1的用户 - if msg.UserID == 1 { - return nil, mq.Failed(errorcode.OperationFailed, "cannot delete the default user") - } - - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - err := svc.db2.User().Delete(tx, msg.UserID) - if err != nil { - return fmt.Errorf("deleting user: %w", err) - } - - err = svc.db2.UserHub().DeleteByUserID(tx, msg.UserID) - if err != nil { - return fmt.Errorf("deleting user hubs: %w", err) - } - - err = svc.db2.UserStorage().DeleteByUserID(tx, msg.UserID) - if err != nil { - return fmt.Errorf("deleting user storages: %w", err) - } - - bkts, err := svc.db2.UserBucket().GetByUserID(tx, msg.UserID) - if err != nil { - return fmt.Errorf("getting user buckets: %w", err) - } - - for _, bkt := range bkts { - pkgs, err := svc.db2.Package().GetBucketPackages(tx, bkt.BucketID) - if err != nil { - return fmt.Errorf("getting bucket packages: %w", err) - } - - for _, pkg := range pkgs { - err := svc.db2.Package().DeleteComplete(tx, pkg.PackageID) - if err != nil { - return fmt.Errorf("deleting package %v: %w", pkg.PackageID, err) - } - } - - err = svc.db2.Bucket().Delete(tx, bkt.BucketID) - if err != nil { - return fmt.Errorf("deleting bucket: %w", err) - } - } - - err = svc.db2.UserBucket().DeleteByUserID(tx, msg.UserID) - if err != nil { - return fmt.Errorf("deleting user buckets: %w", err) - } - - return nil - }) - if err != nil { - logger.WithField("UserID", msg.UserID). - Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - return mq.ReplyOK(coormq.RespDeleteUser()) -} diff --git a/coordinator/internal/mq/utils.go b/coordinator/internal/mq/utils.go deleted file mode 100644 index d1c3971..0000000 --- a/coordinator/internal/mq/utils.go +++ /dev/null @@ -1,23 +0,0 @@ -package mq - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" -) - -func getBlockTypeFromRed(red cdssdk.Redundancy) string { - switch red.(type) { - case *cdssdk.NoneRedundancy: - return stgmod.BlockTypeRaw - - case *cdssdk.ECRedundancy: - return stgmod.BlockTypeEC - - case *cdssdk.LRCRedundancy: - return stgmod.BlockTypeEC - - case *cdssdk.SegmentRedundancy: - return stgmod.BlockTypeSegment - } - return "" -} diff --git a/coordinator/types/storage.go b/coordinator/types/storage.go index 1e4bf5f..a87f3ba 100644 --- a/coordinator/types/storage.go +++ b/coordinator/types/storage.go @@ -7,10 +7,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) -type UserStorageConfig interface { - GetUserStorageConfigType() string -} - type Storage struct { StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint; autoIncrement;"` Name string `json:"name" gorm:"column:Name; type:varchar(256); not null"` @@ -51,7 +47,7 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Storage type MashupStorageType struct { serder.Metadata `union:"Mashup"` Type string `json:"type"` - Agent StorageType `json:"agent"` // 创建Agent时,使用的存储服务类型 + Store StorageType `json:"store"` // 创建ShardStore或PublicStore时,使用的存储服务类型 Feature StorageType `json:"feature"` // 根据Feature创建组件时使用的存储服务类型 } @@ -79,11 +75,6 @@ func (a *LocalStorageType) String() string { type OSSType struct { serder.Metadata `union:"OSS"` Type string `json:"type"` - Region string `json:"region"` - AK string `json:"accessKeyId"` - SK string `json:"secretAccessKey"` - Endpoint string `json:"endpoint"` - Bucket string `json:"bucket"` } func (a *OSSType) GetStorageType() string { @@ -97,12 +88,6 @@ func (a *OSSType) String() string { type OBSType struct { serder.Metadata `union:"OBS"` Type string `json:"type"` - Region string `json:"region"` - AK string `json:"accessKeyId"` - SK string `json:"secretAccessKey"` - Endpoint string `json:"endpoint"` - Bucket string `json:"bucket"` - ProjectID string `json:"projectID"` } func (a *OBSType) GetStorageType() string { @@ -116,11 +101,6 @@ func (a *OBSType) String() string { type COSType struct { serder.Metadata `union:"COS"` Type string `json:"type"` - Region string `json:"region"` - AK string `json:"accessKeyId"` - SK string `json:"secretAccessKey"` - Endpoint string `json:"endpoint"` - Bucket string `json:"bucket"` } func (a *COSType) GetStorageType() string { @@ -134,13 +114,6 @@ func (a *COSType) String() string { type EFileType struct { serder.Metadata `union:"EFile"` Type string `json:"type"` - TokenURL string `json:"tokenURL"` - APIURL string `json:"apiURL"` - TokenExpire int `json:"tokenExpire"` // 单位秒 - User string `json:"user"` - Password string `json:"password"` - OrgID string `json:"orgID"` - ClusterID string `json:"clusterID"` } func (a *EFileType) GetStorageType() string { @@ -155,11 +128,6 @@ func (a *EFileType) String() string { type S3Type struct { serder.Metadata `union:"S3"` Type string `json:"type"` - Region string `json:"region"` - AK string `json:"accessKeyId"` - SK string `json:"secretAccessKey"` - Endpoint string `json:"endpoint"` - Bucket string `json:"bucket"` } func (a *S3Type) GetStorageType() string { @@ -169,3 +137,8 @@ func (a *S3Type) GetStorageType() string { func (a *S3Type) String() string { return "S3" } + +type ShardStoreUserConfig struct { + Root string `json:"root"` + MaxSize int64 `json:"maxSize"` +} diff --git a/coordinator/types/storage_credential.go b/coordinator/types/storage_credential.go new file mode 100644 index 0000000..98dbcfb --- /dev/null +++ b/coordinator/types/storage_credential.go @@ -0,0 +1,92 @@ +package types + +import ( + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type StorageCredential interface { + GetStorageCredentialType() string +} + +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[StorageCredential]( + (*LocalCred)(nil), + (*MashupCred)(nil), + (*OBSCred)(nil), + (*OSSCred)(nil), + (*COSCred)(nil), + (*EFileCred)(nil), + (*S3Cred)(nil), +)), "type") + +type LocalCred struct { + StorageCredential + serder.Metadata `union:"Local"` + Type string `json:"type"` +} + +type MashupCred struct { + StorageCredential + serder.Metadata `union:"Mashup"` + Store StorageCredential `json:"store"` + Feature StorageCredential `json:"feature"` +} + +type OSSCred struct { + StorageCredential + serder.Metadata `union:"OSS"` + Type string `json:"type"` + Region string `json:"region"` + AK string `json:"accessKeyId"` + SK string `json:"secretAccessKey"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` +} + +type OBSCred struct { + StorageCredential + serder.Metadata `union:"OBS"` + Type string `json:"type"` + Region string `json:"region"` + AK string `json:"accessKeyId"` + SK string `json:"secretAccessKey"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` + ProjectID string `json:"projectID"` +} + +type COSCred struct { + StorageCredential + serder.Metadata `union:"COS"` + Type string `json:"type"` + Region string `json:"region"` + AK string `json:"accessKeyId"` + SK string `json:"secretAccessKey"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` +} + +type EFileCred struct { + StorageCredential + serder.Metadata `union:"EFile"` + Type string `json:"type"` + TokenURL string `json:"tokenURL"` + APIURL string `json:"apiURL"` + TokenExpire int `json:"tokenExpire"` // 单位秒 + User string `json:"user"` + Password string `json:"password"` + OrgID string `json:"orgID"` + ClusterID string `json:"clusterID"` +} + +// 通用的S3协议的存储服务 +type S3Cred struct { + StorageCredential + serder.Metadata `union:"S3"` + Type string `json:"type"` + Region string `json:"region"` + AK string `json:"accessKeyId"` + SK string `json:"secretAccessKey"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` +} diff --git a/coordinator/types/types.go b/coordinator/types/types.go index a116dfc..1a889bd 100644 --- a/coordinator/types/types.go +++ b/coordinator/types/types.go @@ -14,6 +14,8 @@ type HubID int64 type LocationID int64 +type UserID int64 + type Hub struct { HubID HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"` Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` @@ -68,3 +70,21 @@ type HubConnectivity struct { func (HubConnectivity) TableName() string { return "HubConnectivity" } + +type Location struct { + LocationID LocationID `gorm:"column:LocationID; primaryKey; type:bigint; autoIncrement" json:"locationID"` + Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` +} + +func (Location) TableName() string { + return "Location" +} + +type User struct { + UserID UserID `gorm:"column:UserID; primaryKey; type:bigint; autoIncrement" json:"userID"` + Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` +} + +func (User) TableName() string { + return "User" +}