diff --git a/common/pkgs/db2/bucket.go b/common/pkgs/db2/bucket.go deleted file mode 100644 index a46d6eb..0000000 --- a/common/pkgs/db2/bucket.go +++ /dev/null @@ -1,149 +0,0 @@ -package db2 - -import ( - "errors" - "fmt" - "time" - - "gorm.io/gorm" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type BucketDB struct { - *DB -} - -func (db *DB) Bucket() *BucketDB { - return &BucketDB{DB: db} -} - -func (db *BucketDB) GetByID(ctx SQLContext, bucketID cdssdk.BucketID) (cdssdk.Bucket, error) { - var ret cdssdk.Bucket - err := ctx.Table("Bucket").Where("BucketID = ?", bucketID).First(&ret).Error - return ret, err -} - -func (db *BucketDB) GetByName(ctx SQLContext, bucketName string) (cdssdk.Bucket, error) { - var ret cdssdk.Bucket - err := ctx.Table("Bucket").Where("Name = ?", bucketName).First(&ret).Error - return ret, err -} - -// GetIDByName 根据BucketName查询BucketID -func (db *BucketDB) GetIDByName(ctx SQLContext, bucketName string) (int64, error) { - var result struct { - BucketID int64 `gorm:"column:BucketID"` - BucketName string `gorm:"column:BucketName"` - } - - err := ctx.Table("Bucket").Select("BucketID, BucketName").Where("BucketName = ?", bucketName).Scan(&result).Error - if err != nil { - return 0, err - } - - return result.BucketID, nil -} - -func (*BucketDB) GetAll(ctx SQLContext) ([]cdssdk.Bucket, error) { - var ret []cdssdk.Bucket - err := ctx.Table("Bucket").Find(&ret).Error - return ret, err -} - -// IsAvailable 判断用户是否有指定Bucekt的权限 -func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID cdssdk.BucketID, userID cdssdk.UserID) (bool, error) { - _, err := db.GetUserBucket(ctx, userID, bucketID) - if errors.Is(err, gorm.ErrRecordNotFound) { - return false, nil - } - - if err != nil { - return false, fmt.Errorf("find bucket failed, err: %w", err) - } - - return true, nil -} - -func (*BucketDB) GetUserBucket(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) { - var ret model.Bucket - err := ctx.Table("UserBucket"). - Select("Bucket.*"). - Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID"). - Where("UserBucket.UserID = ? AND Bucket.BucketID = ?", userID, bucketID). - First(&ret).Error - return ret, err -} - -func (*BucketDB) GetUserBucketByName(ctx SQLContext, userID cdssdk.UserID, bucketName string) (model.Bucket, error) { - var ret model.Bucket - err := ctx.Table("UserBucket"). - Select("Bucket.*"). - Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID"). - Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName). - First(&ret).Error - return ret, err -} - -func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.Bucket, error) { - var ret []model.Bucket - err := ctx.Table("UserBucket"). - Select("Bucket.*"). - Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID"). - Where("UserBucket.UserID = ?", userID). - Find(&ret).Error - return ret, err -} - -func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string, createTime time.Time) (cdssdk.Bucket, error) { - var bucketID int64 - err := ctx.Table("UserBucket"). - Select("Bucket.BucketID"). - Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID"). - Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName). - Scan(&bucketID).Error - - if err != nil { - return cdssdk.Bucket{}, err - } - - if bucketID > 0 { - return cdssdk.Bucket{}, gorm.ErrDuplicatedKey - } - - newBucket := cdssdk.Bucket{Name: bucketName, CreateTime: createTime, CreatorID: userID} - if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil { - return cdssdk.Bucket{}, fmt.Errorf("insert bucket failed, err: %w", err) - } - - err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error - if err != nil { - return cdssdk.Bucket{}, fmt.Errorf("insert user bucket: %w", err) - } - - return newBucket, nil -} - -func (db *BucketDB) Rename(ctx SQLContext, bucketID cdssdk.BucketID, bucketName string) error { - return ctx.Table("Bucket").Where("BucketID = ?", bucketID).Update("Name", bucketName).Error -} - -func (db *BucketDB) Delete(ctx SQLContext, bucketID cdssdk.BucketID) error { - return ctx.Delete(&cdssdk.Bucket{}, "BucketID = ?", bucketID).Error -} - -func (db *BucketDB) DeleteComplete(tx SQLContext, bucketID cdssdk.BucketID) error { - pkgs, err := db.Package().GetBucketPackages(tx, bucketID) - if err != nil { - return err - } - - for _, pkg := range pkgs { - err := db.Package().DeleteComplete(tx, pkg.PackageID) - if err != nil { - return err - } - } - return db.Bucket().Delete(tx, bucketID) -} diff --git a/common/pkgs/db2/cache.go b/common/pkgs/db2/cache.go deleted file mode 100644 index 556f9c5..0000000 --- a/common/pkgs/db2/cache.go +++ /dev/null @@ -1,105 +0,0 @@ -package db2 - -import ( - "time" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" - "gorm.io/gorm/clause" -) - -type CacheDB struct { - *DB -} - -func (db *DB) Cache() *CacheDB { - return &CacheDB{DB: db} -} - -func (*CacheDB) Get(ctx SQLContext, fileHash cdssdk.FileHash, stgID cdssdk.StorageID) (model.Cache, error) { - var ret model.Cache - err := ctx.Table("Cache").Where("FileHash = ? AND StorageID = ?", fileHash, stgID).First(&ret).Error - return ret, err -} - -func (*CacheDB) BatchGetAllFileHashes(ctx SQLContext, start int, count int) ([]string, error) { - var ret []string - err := ctx.Table("Cache").Distinct("FileHash").Offset(start).Limit(count).Pluck("FileHash", &ret).Error - return ret, err -} - -func (*CacheDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]model.Cache, error) { - var ret []model.Cache - err := ctx.Table("Cache").Where("StorageID = ?", stgID).Find(&ret).Error - return ret, err -} - -// Create 创建一条缓存记录,如果已有则不进行操作 -func (*CacheDB) Create(ctx SQLContext, fileHash cdssdk.FileHash, stgID cdssdk.StorageID, priority int) error { - cache := model.Cache{FileHash: fileHash, StorageID: stgID, CreateTime: time.Now(), Priority: priority} - return ctx.Where(cache).Attrs(cache).FirstOrCreate(&cache).Error -} - -// 批量创建缓存记录 -func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { - if len(caches) == 0 { - return nil - } - - return ctx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "FileHash"}, {Name: "StorageID"}}, - DoUpdates: clause.AssignmentColumns([]string{"CreateTime", "Priority"}), - }).Create(&caches).Error -} - -func (db *CacheDB) BatchCreateOnSameStorage(ctx SQLContext, fileHashes []cdssdk.FileHash, stgID cdssdk.StorageID, priority int) error { - if len(fileHashes) == 0 { - return nil - } - - var caches []model.Cache - var nowTime = time.Now() - for _, hash := range fileHashes { - caches = append(caches, model.Cache{ - FileHash: hash, - StorageID: stgID, - CreateTime: nowTime, - Priority: priority, - }) - } - - return db.BatchCreate(ctx, caches) -} - -func (*CacheDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, fileHashes []cdssdk.FileHash) error { - if len(fileHashes) == 0 { - return nil - } - - return ctx.Table("Cache").Where("StorageID = ? AND FileHash IN (?)", stgID, fileHashes).Delete(&model.Cache{}).Error -} - -// GetCachingFileStorages 查找缓存了指定文件的存储服务 -func (*CacheDB) GetCachingFileStorages(ctx SQLContext, fileHash cdssdk.FileHash) ([]cdssdk.Storage, error) { - var stgs []cdssdk.Storage - err := ctx.Table("Cache").Select("Storage.*"). - Joins("JOIN Storage ON Cache.StorageID = Storage.StorageID"). - Where("Cache.FileHash = ?", fileHash). - Find(&stgs).Error - return stgs, err -} - -// DeleteStorageAll 删除一个存储服务所有的记录 -func (*CacheDB) DeleteStorageAll(ctx SQLContext, StorageID cdssdk.StorageID) error { - return ctx.Where("StorageID = ?", StorageID).Delete(&model.Cache{}).Error -} - -// FindCachingFileUserStorages 在缓存表中查询指定数据所在的节点 -func (*CacheDB) FindCachingFileUserStorages(ctx SQLContext, userID cdssdk.UserID, fileHash string) ([]cdssdk.Storage, error) { - var stgs []cdssdk.Storage - err := ctx.Table("Cache").Select("Storage.*"). - Joins("JOIN UserStorage ON Cache.StorageID = UserStorage.StorageID"). - Where("Cache.FileHash = ? AND UserStorage.UserID = ?", fileHash, userID). - Find(&stgs).Error - return stgs, err -} diff --git a/common/pkgs/db2/config/config.go b/common/pkgs/db2/config/config.go deleted file mode 100644 index 9495b71..0000000 --- a/common/pkgs/db2/config/config.go +++ /dev/null @@ -1,21 +0,0 @@ -package config - -import "fmt" - -type Config struct { - Address string `json:"address"` - Account string `json:"account"` - Password string `json:"password"` - DatabaseName string `json:"databaseName"` -} - -func (cfg *Config) MakeSourceString() string { - return fmt.Sprintf( - "%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s", - cfg.Account, - cfg.Password, - cfg.Address, - cfg.DatabaseName, - "Asia%2FShanghai", - ) -} diff --git a/common/pkgs/db2/db2.go b/common/pkgs/db2/db2.go deleted file mode 100644 index 8e807bc..0000000 --- a/common/pkgs/db2/db2.go +++ /dev/null @@ -1,38 +0,0 @@ -package db2 - -import ( - _ "github.com/go-sql-driver/mysql" - "github.com/sirupsen/logrus" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" - "gorm.io/driver/mysql" - "gorm.io/gorm" -) - -type DB struct { - db *gorm.DB -} - -func NewDB(cfg *config.Config) (*DB, error) { - mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{}) - if err != nil { - logrus.Fatalf("failed to connect to database: %v", err) - } - - return &DB{ - db: mydb, - }, nil -} - -func (db *DB) DoTx(do func(tx SQLContext) error) error { - return db.db.Transaction(func(tx *gorm.DB) error { - return do(SQLContext{tx}) - }) -} - -type SQLContext struct { - *gorm.DB -} - -func (db *DB) DefCtx() SQLContext { - return SQLContext{db.db} -} diff --git a/common/pkgs/db2/hub.go b/common/pkgs/db2/hub.go deleted file mode 100644 index dc66181..0000000 --- a/common/pkgs/db2/hub.go +++ /dev/null @@ -1,60 +0,0 @@ -package db2 - -import ( - "time" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -type HubDB struct { - *DB -} - -func (db *DB) Hub() *HubDB { - return &HubDB{DB: db} -} - -func (*HubDB) GetAllHubs(ctx SQLContext) ([]cdssdk.Hub, error) { - var ret []cdssdk.Hub - - err := ctx.Table("Hub").Find(&ret).Error - return ret, err -} - -func (*HubDB) GetByID(ctx SQLContext, hubID cdssdk.HubID) (cdssdk.Hub, error) { - var ret cdssdk.Hub - err := ctx.Table("Hub").Where("HubID = ?", hubID).Find(&ret).Error - - return ret, err -} - -func (*HubDB) BatchGetByID(ctx SQLContext, hubIDs []cdssdk.HubID) ([]cdssdk.Hub, error) { - var ret []cdssdk.Hub - err := ctx.Table("Hub").Where("HubID IN (?)", hubIDs).Find(&ret).Error - - return ret, err -} - -// GetUserHubs 根据用户id查询可用hub -func (*HubDB) GetUserHubs(ctx SQLContext, userID cdssdk.UserID) ([]cdssdk.Hub, error) { - var hubs []cdssdk.Hub - err := ctx. - Table("Hub"). - Select("Hub.*"). - Joins("JOIN UserHub ON UserHub.HubID = Hub.HubID"). - Where("UserHub.UserID = ?", userID). - Find(&hubs).Error - return hubs, err -} - -// UpdateState 更新状态,并且设置上次上报时间为现在 -func (*HubDB) UpdateState(ctx SQLContext, hubID cdssdk.HubID, state string) error { - err := ctx. - Model(&cdssdk.Hub{}). - Where("HubID = ?", hubID). - Updates(map[string]interface{}{ - "State": state, - "LastReportTime": time.Now(), - }).Error - return err -} diff --git a/common/pkgs/db2/hub_connectivity.go b/common/pkgs/db2/hub_connectivity.go deleted file mode 100644 index 5fe27e7..0000000 --- a/common/pkgs/db2/hub_connectivity.go +++ /dev/null @@ -1,37 +0,0 @@ -package db2 - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" - "gorm.io/gorm/clause" -) - -type HubConnectivityDB struct { - *DB -} - -func (db *DB) HubConnectivity() *HubConnectivityDB { - return &HubConnectivityDB{DB: db} -} - -func (db *HubConnectivityDB) BatchGetByFromHub(ctx SQLContext, fromHubIDs []cdssdk.HubID) ([]model.HubConnectivity, error) { - if len(fromHubIDs) == 0 { - return nil, nil - } - - var ret []model.HubConnectivity - - err := ctx.Table("HubConnectivity").Where("FromHubID IN (?)", fromHubIDs).Find(&ret).Error - return ret, err -} - -func (db *HubConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.HubConnectivity) error { - if len(cons) == 0 { - return nil - } - - // 使用 GORM 的批量插入或更新 - return ctx.Table("HubConnectivity").Clauses(clause.OnConflict{ - UpdateAll: true, - }).Create(&cons).Error -} diff --git a/common/pkgs/db2/location.go b/common/pkgs/db2/location.go deleted file mode 100644 index 49bfc96..0000000 --- a/common/pkgs/db2/location.go +++ /dev/null @@ -1,36 +0,0 @@ -package db2 - -import ( - "fmt" - - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type LocationDB struct { - *DB -} - -func (db *DB) Location() *LocationDB { - return &LocationDB{DB: db} -} - -func (*LocationDB) GetByID(ctx SQLContext, id int64) (model.Location, error) { - var ret model.Location - err := ctx.First(&ret, id).Error - return ret, err -} - -func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (model.Location, error) { - var locID int64 - err := ctx.Table("Hub").Select("LocationID").Where("ExternalIP = ?", ip).Scan(&locID).Error - if err != nil { - return model.Location{}, fmt.Errorf("finding hub by external ip: %w", err) - } - - loc, err := db.GetByID(ctx, locID) - if err != nil { - return model.Location{}, fmt.Errorf("getting location by id: %w", err) - } - - return loc, nil -} diff --git a/common/pkgs/db2/model/model.go b/common/pkgs/db2/model/model.go deleted file mode 100644 index 70d7c28..0000000 --- a/common/pkgs/db2/model/model.go +++ /dev/null @@ -1,68 +0,0 @@ -package model - -import ( - "time" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" -) - -// TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里 -type Storage = cdssdk.Storage - -type UserBucket struct { - UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` - BucketID cdssdk.BucketID `gorm:"column:BucketID; primaryKey; type:bigint" json:"bucketID"` -} - -func (UserBucket) TableName() string { - return "UserBucket" -} - -type UserHub struct { - UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` - HubID cdssdk.HubID `gorm:"column:HubID; primaryKey; type:bigint" json:"hubID"` -} - -func (UserHub) TableName() string { - return "UserHub" -} - -type UserStorage struct { - UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` - StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint" json:"storageID"` -} - -func (UserStorage) TableName() string { - return "UserStorage" -} - -type Bucket = cdssdk.Bucket - -type Package = cdssdk.Package - -type Object = cdssdk.Object - -type HubConnectivity = cdssdk.HubConnectivity - -type ObjectBlock = stgmod.ObjectBlock - -type Cache struct { - FileHash cdssdk.FileHash `gorm:"column:FileHash; primaryKey; type: char(68)" json:"fileHash"` - StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type: bigint" json:"storageID"` - CreateTime time.Time `gorm:"column:CreateTime; type:datetime" json:"createTime"` - Priority int `gorm:"column:Priority; type:int" json:"priority"` -} - -func (Cache) TableName() string { - return "Cache" -} - -type Location struct { - LocationID cdssdk.LocationID `gorm:"column:LocationID; primaryKey; type:bigint; autoIncrement" json:"locationID"` - Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` -} - -func (Location) TableName() string { - return "Location" -} diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go deleted file mode 100644 index 5325ac4..0000000 --- a/common/pkgs/db2/object.go +++ /dev/null @@ -1,511 +0,0 @@ -package db2 - -import ( - "fmt" - "strings" - "time" - - "gorm.io/gorm" - "gorm.io/gorm/clause" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" -) - -type ObjectDB struct { - *DB -} - -func (db *DB) Object() *ObjectDB { - return &ObjectDB{DB: db} -} - -func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (cdssdk.Object, error) { - var ret cdssdk.Object - err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error - return ret, err -} - -func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) (cdssdk.Object, error) { - var ret cdssdk.Object - err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).First(&ret).Error - return ret, err -} - -func (db *ObjectDB) GetByFullPath(ctx SQLContext, bktName string, pkgName string, path string) (cdssdk.Object, error) { - var ret cdssdk.Object - err := ctx.Table("Object"). - Joins("join Package on Package.PackageID = Object.PackageID and Package.Name = ?", pkgName). - Joins("join Bucket on Bucket.BucketID = Package.BucketID and Bucket.Name = ?", bktName). - Where("Object.Path = ?", path).First(&ret).Error - return ret, err -} - -func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { - var ret []cdssdk.Object - err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).Order("ObjectID ASC").Find(&ret).Error - return ret, err -} - -// 查询结果将按照Path升序,而不是ObjectID升序 -func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) ([]cdssdk.Object, error) { - var ret []cdssdk.Object - err := ctx.Table("Object").Where("PackageID = ? AND Path > ? AND Path LIKE ?", packageID, startPath, pathPrefix+"%").Order("Path ASC").Limit(limit).Find(&ret).Error - return ret, err -} - -func (db *ObjectDB) GetByPrefixGrouped(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (objs []cdssdk.Object, commonPrefixes []string, err error) { - type ObjectOrDir struct { - cdssdk.Object - IsObject bool `gorm:"IsObject"` - Prefix string `gorm:"Prefix"` - } - - sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1 - - prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt) - grouping := ctx.Table("Object"). - Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)). - Where("PackageID = ?", packageID). - Where("Path like ?", pathPrefix+"%"). - Group("Prefix, IsObject"). - Order("Prefix ASC") - - var ret []ObjectOrDir - err = ctx.Table("Object"). - Select("Grouped.IsObject, Grouped.Prefix, Object.*"). - Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping). - Find(&ret).Error - if err != nil { - return - } - - for _, o := range ret { - if o.IsObject { - objs = append(objs, o.Object) - } else { - commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator) - } - } - - return -} - -func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) { - type ObjectOrDir struct { - cdssdk.Object - IsObject bool `gorm:"IsObject"` - Prefix string `gorm:"Prefix"` - } - - sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1 - - prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt) - grouping := ctx.Table("Object"). - Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)). - Where("PackageID = ?", packageID). - Where("Path like ?", pathPrefix+"%"). - Group("Prefix, IsObject"). - Having("Prefix > ?", startPath). - Limit(limit). - Order("Prefix ASC") - - var ret []ObjectOrDir - err = ctx.Table("Object"). - Select("Grouped.IsObject, Grouped.Prefix, Object.*"). - Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping). - Find(&ret).Error - if err != nil { - return - } - - for _, o := range ret { - if o.IsObject { - objs = append(objs, o.Object) - } else { - commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator) - } - nextStartPath = o.Prefix - } - - return -} - -func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (bool, error) { - var obj cdssdk.Object - err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).First(&obj).Error - if err == nil { - return true, nil - } - - if err == gorm.ErrRecordNotFound { - return false, nil - } - - return false, err -} - -func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { - if len(objectIDs) == 0 { - return make(map[cdssdk.ObjectID]bool), nil - } - - var avaiIDs []cdssdk.ObjectID - err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Pluck("ObjectID", &avaiIDs).Error - if err != nil { - return nil, err - } - - avaiIDMap := make(map[cdssdk.ObjectID]bool) - for _, pkgID := range avaiIDs { - avaiIDMap[pkgID] = true - } - - return avaiIDMap, nil -} - -func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.Object, error) { - if len(objectIDs) == 0 { - return nil, nil - } - - var objs []cdssdk.Object - err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error - if err != nil { - return nil, err - } - - return objs, nil -} - -func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { - if len(pathes) == 0 { - return nil, nil - } - - var objs []cdssdk.Object - err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error - if err != nil { - return nil, err - } - - return objs, nil -} - -func (db *ObjectDB) GetDetail(ctx SQLContext, objectID cdssdk.ObjectID) (stgmod.ObjectDetail, error) { - var obj cdssdk.Object - err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&obj).Error - if err != nil { - return stgmod.ObjectDetail{}, fmt.Errorf("getting object: %w", err) - } - - // 获取所有的 ObjectBlock - var allBlocks []stgmod.ObjectBlock - err = ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Order("`Index` ASC").Find(&allBlocks).Error - if err != nil { - return stgmod.ObjectDetail{}, fmt.Errorf("getting all object blocks: %w", err) - } - - // 获取所有的 PinnedObject - var allPinnedObjs []cdssdk.PinnedObject - err = ctx.Table("PinnedObject").Where("ObjectID = ?", objectID).Order("ObjectID ASC").Find(&allPinnedObjs).Error - if err != nil { - return stgmod.ObjectDetail{}, fmt.Errorf("getting all pinned objects: %w", err) - } - - pinnedAt := make([]cdssdk.StorageID, len(allPinnedObjs)) - for i, po := range allPinnedObjs { - pinnedAt[i] = po.StorageID - } - - return stgmod.ObjectDetail{ - Object: obj, - Blocks: allBlocks, - PinnedAt: pinnedAt, - }, nil -} - -// 仅返回查询到的对象 -func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) { - var objs []cdssdk.Object - - err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error - if err != nil { - return nil, err - } - - // 获取所有的 ObjectBlock - var allBlocks []stgmod.ObjectBlock - err = ctx.Table("ObjectBlock").Where("ObjectID IN ?", objectIDs).Order("ObjectID, `Index` ASC").Find(&allBlocks).Error - if err != nil { - return nil, err - } - - // 获取所有的 PinnedObject - var allPinnedObjs []cdssdk.PinnedObject - err = ctx.Table("PinnedObject").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&allPinnedObjs).Error - if err != nil { - return nil, err - } - - details := make([]stgmod.ObjectDetail, len(objs)) - for i, obj := range objs { - details[i] = stgmod.ObjectDetail{ - Object: obj, - } - } - - stgmod.DetailsFillObjectBlocks(details, allBlocks) - stgmod.DetailsFillPinnedAt(details, allPinnedObjs) - return details, nil -} - -func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { - err := ctx.Table("Object").Create(&obj).Error - if err != nil { - return 0, fmt.Errorf("insert object failed, err: %w", err) - } - return obj.ObjectID, nil -} - -// 批量创建对象,创建完成后会填充ObjectID。 -func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error { - if len(*objs) == 0 { - return nil - } - - return ctx.Table("Object").Create(objs).Error -} - -// 批量更新对象所有属性,objs中的对象必须包含ObjectID -func (db *ObjectDB) BatchUpdate(ctx SQLContext, objs []cdssdk.Object) error { - if len(objs) == 0 { - return nil - } - - return ctx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "ObjectID"}}, - UpdateAll: true, - }).Create(objs).Error -} - -// 批量更新对象指定属性,objs中的对象只需设置需要更新的属性即可,但: -// 1. 必须包含ObjectID -// 2. 日期类型属性不能设置为0值 -func (db *ObjectDB) BatchUpdateColumns(ctx SQLContext, objs []cdssdk.Object, columns []string) error { - if len(objs) == 0 { - return nil - } - - return ctx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "ObjectID"}}, - DoUpdates: clause.AssignmentColumns(columns), - }).Create(objs).Error -} - -func (db *ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]cdssdk.Object, error) { - var ret []cdssdk.Object - err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&ret).Error - return ret, err -} - -func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) { - var objs []cdssdk.Object - err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&objs).Error - if err != nil { - return nil, fmt.Errorf("getting objects: %w", err) - } - - // 获取所有的 ObjectBlock - var allBlocks []stgmod.ObjectBlock - err = ctx.Table("ObjectBlock"). - Select("ObjectBlock.*"). - Joins("JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID"). - Where("Object.PackageID = ?", packageID). - Order("ObjectBlock.ObjectID, `Index` ASC"). - Find(&allBlocks).Error - if err != nil { - return nil, fmt.Errorf("getting all object blocks: %w", err) - } - - // 获取所有的 PinnedObject - var allPinnedObjs []cdssdk.PinnedObject - err = ctx.Table("PinnedObject"). - Select("PinnedObject.*"). - Joins("JOIN Object ON PinnedObject.ObjectID = Object.ObjectID"). - Where("Object.PackageID = ?", packageID). - Order("PinnedObject.ObjectID"). - Find(&allPinnedObjs).Error - if err != nil { - return nil, fmt.Errorf("getting all pinned objects: %w", err) - } - - details := make([]stgmod.ObjectDetail, len(objs)) - for i, obj := range objs { - details[i] = stgmod.ObjectDetail{ - Object: obj, - } - } - - stgmod.DetailsFillObjectBlocks(details, allBlocks) - stgmod.DetailsFillPinnedAt(details, allPinnedObjs) - return details, nil -} - -func (db *ObjectDB) GetObjectsIfAnyBlockOnStorage(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) { - var objs []cdssdk.Object - err := ctx.Table("Object").Where("ObjectID IN (SELECT ObjectID FROM ObjectBlock WHERE StorageID = ?)", stgID).Order("ObjectID ASC").Find(&objs).Error - if err != nil { - return nil, fmt.Errorf("getting objects: %w", err) - } - - return objs, nil -} - -func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) { - if len(adds) == 0 { - return nil, nil - } - - // 收集所有路径 - pathes := make([]string, 0, len(adds)) - for _, add := range adds { - pathes = append(pathes, add.Path) - } - - // 先查询要更新的对象,不存在也没关系 - existsObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes) - if err != nil { - return nil, fmt.Errorf("batch get object by path: %w", err) - } - - existsObjsMap := make(map[string]cdssdk.Object) - for _, obj := range existsObjs { - existsObjsMap[obj.Path] = obj - } - - var updatingObjs []cdssdk.Object - var addingObjs []cdssdk.Object - for i := range adds { - o := cdssdk.Object{ - PackageID: packageID, - Path: adds[i].Path, - Size: adds[i].Size, - FileHash: adds[i].FileHash, - Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式 - CreateTime: adds[i].UploadTime, - UpdateTime: adds[i].UploadTime, - } - - e, ok := existsObjsMap[adds[i].Path] - if ok { - o.ObjectID = e.ObjectID - o.CreateTime = e.CreateTime - updatingObjs = append(updatingObjs, o) - - } else { - addingObjs = append(addingObjs, o) - } - } - - // 先进行更新 - err = db.BatchUpdate(ctx, updatingObjs) - if err != nil { - return nil, fmt.Errorf("batch update objects: %w", err) - } - - // 再执行插入,Create函数插入后会填充ObjectID - err = db.BatchCreate(ctx, &addingObjs) - if err != nil { - return nil, fmt.Errorf("batch create objects: %w", err) - } - - // 按照add参数的顺序返回结果 - affectedObjsMp := make(map[string]cdssdk.Object) - for _, o := range updatingObjs { - affectedObjsMp[o.Path] = o - } - for _, o := range addingObjs { - affectedObjsMp[o.Path] = o - } - affectedObjs := make([]cdssdk.Object, 0, len(affectedObjsMp)) - affectedObjIDs := make([]cdssdk.ObjectID, 0, len(affectedObjsMp)) - for i := range adds { - obj := affectedObjsMp[adds[i].Path] - affectedObjs = append(affectedObjs, obj) - affectedObjIDs = append(affectedObjIDs, obj.ObjectID) - } - - if len(affectedObjIDs) > 0 { - // 批量删除 ObjectBlock - if err := db.ObjectBlock().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil { - return nil, fmt.Errorf("batch delete object blocks: %w", err) - } - - // 批量删除 PinnedObject - if err := db.PinnedObject().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil { - return nil, fmt.Errorf("batch delete pinned objects: %w", err) - } - } - - // 创建 ObjectBlock - objBlocks := make([]stgmod.ObjectBlock, 0, len(adds)) - for i, add := range adds { - for _, stgID := range add.StorageIDs { - objBlocks = append(objBlocks, stgmod.ObjectBlock{ - ObjectID: affectedObjIDs[i], - Index: 0, - StorageID: stgID, - FileHash: add.FileHash, - Size: add.Size, - }) - } - } - if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil { - return nil, fmt.Errorf("batch create object blocks: %w", err) - } - - // 创建 Cache - caches := make([]model.Cache, 0, len(adds)) - for _, add := range adds { - for _, stgID := range add.StorageIDs { - caches = append(caches, model.Cache{ - FileHash: add.FileHash, - StorageID: stgID, - CreateTime: time.Now(), - Priority: 0, - }) - } - } - if err := db.Cache().BatchCreate(ctx, caches); err != nil { - return nil, fmt.Errorf("batch create caches: %w", err) - } - - return affectedObjs, nil -} - -func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { - if len(ids) == 0 { - return nil - } - - return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error -} - -func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { - return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error -} - -func (db *ObjectDB) DeleteByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) error { - return ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Delete(&cdssdk.Object{}).Error -} - -func (db *ObjectDB) MoveByPrefix(ctx SQLContext, oldPkgID cdssdk.PackageID, oldPrefix string, newPkgID cdssdk.PackageID, newPrefix string) error { - return ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", oldPkgID, escapeLike("", "%", oldPrefix)). - Updates(map[string]any{ - "PackageID": newPkgID, - "Path": gorm.Expr("concat(?, substring(Path, ?))", newPrefix, len(oldPrefix)+1), - }).Error -} diff --git a/common/pkgs/db2/object_access_stat.go b/common/pkgs/db2/object_access_stat.go deleted file mode 100644 index b305582..0000000 --- a/common/pkgs/db2/object_access_stat.go +++ /dev/null @@ -1,116 +0,0 @@ -package db2 - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - "gorm.io/gorm" - "gorm.io/gorm/clause" -) - -type ObjectAccessStatDB struct { - *DB -} - -func (db *DB) ObjectAccessStat() *ObjectAccessStatDB { - return &ObjectAccessStatDB{db} -} - -func (*ObjectAccessStatDB) Get(ctx SQLContext, objID cdssdk.ObjectID, stgID cdssdk.StorageID) (stgmod.ObjectAccessStat, error) { - var ret stgmod.ObjectAccessStat - err := ctx.Table("ObjectAccessStat"). - Where("ObjectID = ? AND StorageID = ?", objID, stgID). - First(&ret).Error - return ret, err -} - -func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) { - var ret []stgmod.ObjectAccessStat - err := ctx.Table("ObjectAccessStat"). - Where("ObjectID = ?", objID). - Find(&ret).Error - return ret, err -} - -func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) { - if len(objIDs) == 0 { - return nil, nil - } - - var ret []stgmod.ObjectAccessStat - err := ctx.Table("ObjectAccessStat"). - Where("ObjectID IN ?", objIDs). - Find(&ret).Error - return ret, err -} - -func (*ObjectAccessStatDB) BatchGetByObjectIDOnStorage(ctx SQLContext, objIDs []cdssdk.ObjectID, stgID cdssdk.StorageID) ([]stgmod.ObjectAccessStat, error) { - if len(objIDs) == 0 { - return nil, nil - } - - var ret []stgmod.ObjectAccessStat - err := ctx.Table("ObjectAccessStat"). - Where("ObjectID IN ? AND StorageID = ?", objIDs, stgID). - Find(&ret).Error - return ret, err -} - -func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error { - if len(entries) == 0 { - return nil - } - - for _, entry := range entries { - acc := stgmod.ObjectAccessStat{ - ObjectID: entry.ObjectID, - StorageID: entry.StorageID, - Counter: entry.Counter, - } - - err := ctx.Table("ObjectAccessStat"). - Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}}, - DoUpdates: clause.Assignments(map[string]any{ - "Counter": gorm.Expr("Counter + values(Counter)"), - }), - }).Create(&acc).Error - if err != nil { - return err - } - } - return nil -} - -func (*ObjectAccessStatDB) BatchUpdateAmountInPackage(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error { - if len(pkgIDs) == 0 { - return nil - } - - err := ctx.Exec("UPDATE ObjectAccessStat AS o INNER JOIN Object AS obj ON o.ObjectID = obj.ObjectID SET o.Amount = o.Amount * ? + o.Counter * (1 - ?), o.Counter = 0 WHERE obj.PackageID IN ?", historyWeight, historyWeight, pkgIDs).Error - return err -} - -func (*ObjectAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error { - err := ctx.Exec("UPDATE ObjectAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0", historyWeight, historyWeight).Error - return err -} - -func (*ObjectAccessStatDB) DeleteByObjectID(ctx SQLContext, objID cdssdk.ObjectID) error { - err := ctx.Table("ObjectAccessStat").Where("ObjectID = ?", objID).Delete(nil).Error - return err -} - -func (*ObjectAccessStatDB) BatchDeleteByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) error { - if len(objIDs) == 0 { - return nil - } - - err := ctx.Table("ObjectAccessStat").Where("ObjectID IN ?", objIDs).Delete(nil).Error - return err -} - -func (*ObjectAccessStatDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { - err := ctx.Exec("DELETE o FROM ObjectAccessStat o INNER JOIN Object obj ON o.ObjectID = obj.ObjectID WHERE obj.PackageID = ?", packageID).Error - return err -} diff --git a/common/pkgs/db2/object_block.go b/common/pkgs/db2/object_block.go deleted file mode 100644 index 2d2729c..0000000 --- a/common/pkgs/db2/object_block.go +++ /dev/null @@ -1,122 +0,0 @@ -package db2 - -import ( - "strconv" - "strings" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gorm.io/gorm/clause" -) - -type ObjectBlockDB struct { - *DB -} - -func (db *DB) ObjectBlock() *ObjectBlockDB { - return &ObjectBlockDB{DB: db} -} - -func (db *ObjectBlockDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]stgmod.ObjectBlock, error) { - var rets []stgmod.ObjectBlock - err := ctx.Table("ObjectBlock").Where("StorageID = ?", stgID).Find(&rets).Error - return rets, err -} - -func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectBlock, error) { - if len(objectIDs) == 0 { - return nil, nil - } - - var blocks []stgmod.ObjectBlock - err := ctx.Table("ObjectBlock").Where("ObjectID IN (?)", objectIDs).Order("ObjectID, `Index` ASC").Find(&blocks).Error - return blocks, err -} - -func (*ObjectBlockDB) GetInPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectBlock, error) { - var rets []stgmod.ObjectBlock - err := ctx.Table("ObjectBlock"). - Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID"). - Where("Object.PackageID = ?", packageID). - Order("ObjectBlock.ObjectID, ObjectBlock.`Index` ASC"). - Find(&rets).Error - return rets, err -} - -func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash, size int64) error { - block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash, Size: size} - return ctx.Table("ObjectBlock").Create(&block).Error -} - -func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { - if len(blocks) == 0 { - return nil - } - - return ctx.Clauses(clause.Insert{Modifier: "ignore"}).Create(&blocks).Error -} - -func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { - return ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Delete(&stgmod.ObjectBlock{}).Error -} - -func (db *ObjectBlockDB) DeleteByObjectIDIndex(ctx SQLContext, objectID cdssdk.ObjectID, index int) error { - return ctx.Table("ObjectBlock").Where("ObjectID = ? AND `Index` = ?", objectID, index).Delete(&stgmod.ObjectBlock{}).Error -} - -func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { - if len(objectIDs) == 0 { - return nil - } - - return ctx.Table("ObjectBlock").Where("ObjectID IN (?)", objectIDs).Delete(&stgmod.ObjectBlock{}).Error -} - -func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { - return ctx.Table("ObjectBlock").Where("ObjectID IN (SELECT ObjectID FROM Object WHERE PackageID = ?)", packageID).Delete(&stgmod.ObjectBlock{}).Error -} - -func (db *ObjectBlockDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, fileHashes []cdssdk.FileHash) error { - if len(fileHashes) == 0 { - return nil - } - - return ctx.Table("ObjectBlock").Where("StorageID = ? AND FileHash IN (?)", stgID, fileHashes).Delete(&stgmod.ObjectBlock{}).Error -} - -func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) { - var cnt int64 - err := ctx.Table("ObjectBlock"). - Select("COUNT(FileHash)"). - Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID"). - Joins("INNER JOIN Package ON Object.PackageID = Package.PackageID"). - Where("FileHash = ? AND Package.State = ?", fileHash, cdssdk.PackageStateNormal). - Scan(&cnt).Error - - if err != nil { - return 0, err - } - - return int(cnt), nil -} - -// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 -// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 -func splitConcatedHubID(idStr string) []cdssdk.HubID { - idStrs := strings.Split(idStr, ",") - ids := make([]cdssdk.HubID, 0, len(idStrs)) - - for _, str := range idStrs { - // 假设传入的ID是正确的数字格式 - id, _ := strconv.ParseInt(str, 10, 64) - ids = append(ids, cdssdk.HubID(id)) - } - - return ids -} - -// 按逗号切割字符串 -func splitConcatedFileHash(idStr string) []string { - idStrs := strings.Split(idStr, ",") - return idStrs -} diff --git a/common/pkgs/db2/package.go b/common/pkgs/db2/package.go deleted file mode 100644 index c6af549..0000000 --- a/common/pkgs/db2/package.go +++ /dev/null @@ -1,209 +0,0 @@ -package db2 - -import ( - "fmt" - "time" - - "gorm.io/gorm" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type PackageDB struct { - *DB -} - -func (db *DB) Package() *PackageDB { - return &PackageDB{DB: db} -} - -func (db *PackageDB) GetByID(ctx SQLContext, packageID cdssdk.PackageID) (model.Package, error) { - var ret model.Package - err := ctx.Table("Package").Where("PackageID = ?", packageID).First(&ret).Error - return ret, err -} - -func (db *PackageDB) GetByName(ctx SQLContext, bucketID cdssdk.BucketID, name string) (model.Package, error) { - var ret model.Package - err := ctx.Table("Package").Where("BucketID = ? AND Name = ?", bucketID, name).First(&ret).Error - return ret, err -} - -func (db *PackageDB) BatchTestPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) (map[cdssdk.PackageID]bool, error) { - if len(pkgIDs) == 0 { - return make(map[cdssdk.PackageID]bool), nil - } - - var avaiIDs []cdssdk.PackageID - err := ctx.Table("Package"). - Select("PackageID"). - Where("PackageID IN ?", pkgIDs). - Find(&avaiIDs).Error - if err != nil { - return nil, err - } - - avaiIDMap := make(map[cdssdk.PackageID]bool) - for _, pkgID := range avaiIDs { - avaiIDMap[pkgID] = true - } - - return avaiIDMap, nil -} - -func (*PackageDB) BatchGetAllPackageIDs(ctx SQLContext, start int, count int) ([]cdssdk.PackageID, error) { - var ret []cdssdk.PackageID - err := ctx.Table("Package").Select("PackageID").Limit(count).Offset(start).Find(&ret).Error - return ret, err -} - -func (db *PackageDB) GetUserBucketPackages(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]model.Package, error) { - var ret []model.Package - err := ctx.Table("UserBucket"). - Select("Package.*"). - Joins("JOIN Package ON UserBucket.BucketID = Package.BucketID"). - Where("UserBucket.UserID = ? AND UserBucket.BucketID = ?", userID, bucketID). - Find(&ret).Error - return ret, err -} - -func (db *PackageDB) GetBucketPackages(ctx SQLContext, bucketID cdssdk.BucketID) ([]model.Package, error) { - var ret []model.Package - err := ctx.Table("Package"). - Select("Package.*"). - Where("BucketID = ?", bucketID). - Find(&ret).Error - return ret, err -} - -func (db *PackageDB) GetBucketPackagesByName(ctx SQLContext, bucketName string) ([]model.Package, error) { - var ret []model.Package - err := ctx.Table("Package"). - Select("Package.*"). - Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID"). - Where("Bucket.Name = ?", bucketName). - Find(&ret).Error - return ret, err -} - -// IsAvailable 判断一个用户是否拥有指定对象 -func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) { - var pkgID cdssdk.PackageID - err := ctx.Table("Package"). - Select("Package.PackageID"). - Joins("JOIN UserBucket ON Package.BucketID = UserBucket.BucketID"). - Where("Package.PackageID = ? AND UserBucket.UserID = ?", packageID, userID). - Scan(&pkgID).Error - - if err == gorm.ErrRecordNotFound { - return false, nil - } - - if err != nil { - return false, fmt.Errorf("find package failed, err: %w", err) - } - - return true, nil -} - -// GetUserPackage 获得Package,如果用户没有权限访问,则不会获得结果 -func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (model.Package, error) { - var ret model.Package - err := ctx.Table("Package"). - Select("Package.*"). - Joins("JOIN UserBucket ON Package.BucketID = UserBucket.BucketID"). - Where("Package.PackageID = ? AND UserBucket.UserID = ?", packageID, userID). - First(&ret).Error - return ret, err -} - -// 在指定名称的Bucket中查找指定名称的Package -func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, bucketName string, packageName string) (model.Package, error) { - var ret model.Package - err := ctx.Table("Package"). - Select("Package.*"). - Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID"). - Joins("JOIN UserBucket ON Bucket.BucketID = UserBucket.BucketID"). - Where("Package.Name = ? AND Bucket.Name = ? AND UserBucket.UserID = ?", packageName, bucketName, userID). - First(&ret).Error - return ret, err -} - -func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) { - var packageID int64 - err := ctx.Table("Package"). - Select("PackageID"). - Where("Name = ? AND BucketID = ?", name, bucketID). - Scan(&packageID).Error - - if err != nil { - return cdssdk.Package{}, err - } - if packageID != 0 { - return cdssdk.Package{}, gorm.ErrDuplicatedKey - } - - newPackage := cdssdk.Package{Name: name, BucketID: bucketID, CreateTime: time.Now(), State: cdssdk.PackageStateNormal} - if err := ctx.Create(&newPackage).Error; err != nil { - return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err) - } - - return newPackage, nil -} - -func (*PackageDB) Delete(ctx SQLContext, packageID cdssdk.PackageID) error { - err := ctx.Delete(&model.Package{}, "PackageID = ?", packageID).Error - return err -} - -// 删除与Package相关的所有数据 -func (db *PackageDB) DeleteComplete(ctx SQLContext, packageID cdssdk.PackageID) error { - if err := db.Package().Delete(ctx, packageID); err != nil { - return fmt.Errorf("delete package state: %w", err) - } - - if err := db.ObjectAccessStat().DeleteInPackage(ctx, packageID); err != nil { - return fmt.Errorf("delete from object access stat: %w", err) - } - - if err := db.ObjectBlock().DeleteInPackage(ctx, packageID); err != nil { - return fmt.Errorf("delete from object block failed, err: %w", err) - } - - if err := db.PinnedObject().DeleteInPackage(ctx, packageID); err != nil { - return fmt.Errorf("deleting pinned objects in package: %w", err) - } - - if err := db.Object().DeleteInPackage(ctx, packageID); err != nil { - return fmt.Errorf("deleting objects in package: %w", err) - } - - if err := db.PackageAccessStat().DeleteByPackageID(ctx, packageID); err != nil { - return fmt.Errorf("deleting package access stat: %w", err) - } - - return nil -} - -func (*PackageDB) ChangeState(ctx SQLContext, packageID cdssdk.PackageID, state string) error { - err := ctx.Exec("UPDATE Package SET State = ? WHERE PackageID = ?", state, packageID).Error - return err -} - -func (*PackageDB) HasPackageIn(ctx SQLContext, bucketID cdssdk.BucketID) (bool, error) { - var pkg cdssdk.Package - err := ctx.Table("Package").Where("BucketID = ?", bucketID).First(&pkg).Error - if err == gorm.ErrRecordNotFound { - return false, nil - } - if err != nil { - return false, err - } - return true, nil -} - -func (*PackageDB) Move(ctx SQLContext, packageID cdssdk.PackageID, newBktID cdssdk.BucketID, newName string) error { - err := ctx.Table("Package").Where("PackageID = ?", packageID).Update("BucketID", newBktID).Update("Name", newName).Error - return err -} diff --git a/common/pkgs/db2/package_access_stat.go b/common/pkgs/db2/package_access_stat.go deleted file mode 100644 index 9032645..0000000 --- a/common/pkgs/db2/package_access_stat.go +++ /dev/null @@ -1,79 +0,0 @@ -package db2 - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - "gorm.io/gorm" - "gorm.io/gorm/clause" -) - -type PackageAccessStatDB struct { - *DB -} - -func (db *DB) PackageAccessStat() *PackageAccessStatDB { - return &PackageAccessStatDB{db} -} - -func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, stgID cdssdk.StorageID) (stgmod.PackageAccessStat, error) { - var ret stgmod.PackageAccessStat - err := ctx.Table("PackageAccessStat").Where("PackageID = ? AND StorageID = ?", pkgID, stgID).First(&ret).Error - return ret, err -} - -func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) { - var ret []stgmod.PackageAccessStat - err := ctx.Table("PackageAccessStat").Where("PackageID = ?", pkgID).Find(&ret).Error - return ret, err -} - -func (*PackageAccessStatDB) BatchGetByPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) { - if len(pkgIDs) == 0 { - return nil, nil - } - - var ret []stgmod.PackageAccessStat - err := ctx.Table("PackageAccessStat").Where("PackageID IN (?)", pkgIDs).Find(&ret).Error - return ret, err -} - -func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error { - if len(entries) == 0 { - return nil - } - - accs := make([]stgmod.PackageAccessStat, len(entries)) - for i, e := range entries { - accs[i] = stgmod.PackageAccessStat{ - PackageID: e.PackageID, - StorageID: e.StorageID, - Counter: e.Counter, - } - } - - return ctx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "PackageID"}, {Name: "StorageID"}}, - DoUpdates: clause.Assignments(map[string]any{ - "Counter": gorm.Expr("Counter + values(Counter)"), - }), - }).Table("PackageAccessStat").Create(&accs).Error -} - -func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error { - if len(pkgIDs) == 0 { - return nil - } - - sql := "UPDATE PackageAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0 WHERE PackageID IN (?)" - return ctx.Exec(sql, historyWeight, historyWeight, pkgIDs).Error -} - -func (*PackageAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error { - sql := "UPDATE PackageAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0" - return ctx.Exec(sql, historyWeight, historyWeight).Error -} - -func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) error { - return ctx.Table("PackageAccessStat").Where("PackageID = ?", pkgID).Delete(&stgmod.PackageAccessStat{}).Error -} diff --git a/common/pkgs/db2/pinned_object.go b/common/pkgs/db2/pinned_object.go deleted file mode 100644 index a9de444..0000000 --- a/common/pkgs/db2/pinned_object.go +++ /dev/null @@ -1,122 +0,0 @@ -package db2 - -import ( - "time" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gorm.io/gorm/clause" -) - -type PinnedObjectDB struct { - *DB -} - -func (db *DB) PinnedObject() *PinnedObjectDB { - return &PinnedObjectDB{DB: db} -} - -func (*PinnedObjectDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.PinnedObject, error) { - var ret []cdssdk.PinnedObject - err := ctx.Table("PinnedObject").Find(&ret, "StorageID = ?", stgID).Error - return ret, err -} - -func (*PinnedObjectDB) GetObjectsByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) { - var ret []cdssdk.Object - err := ctx.Table("Object").Joins("inner join PinnedObject on Object.ObjectID = PinnedObject.ObjectID").Where("StorageID = ?", stgID).Find(&ret).Error - return ret, err -} - -func (*PinnedObjectDB) Create(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error { - return ctx.Table("PinnedObject").Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error -} - -func (*PinnedObjectDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.PinnedObject, error) { - if len(objectIDs) == 0 { - return nil, nil - } - - var pinneds []cdssdk.PinnedObject - err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Order("ObjectID asc").Find(&pinneds).Error - return pinneds, err -} - -func (*PinnedObjectDB) TryCreate(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error { - return ctx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}}, - DoUpdates: clause.AssignmentColumns([]string{"CreateTime"}), - }).Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error -} - -func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { - if len(pinneds) == 0 { - return nil - } - - return ctx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}}, - DoUpdates: clause.AssignmentColumns([]string{"CreateTime"}), - }).Create(&pinneds).Error -} - -func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { - err := ctx.Exec( - "insert ignore into PinnedObject(StorageID, ObjectID, CreateTime) select ? as StorageID, ObjectID, ? as CreateTime from Object where PackageID = ?", - stgID, - time.Now(), - packageID, - ).Error - return err -} - -func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, stgIDs []cdssdk.StorageID) error { - if len(stgIDs) == 0 { - return nil - } - - for _, id := range stgIDs { - err := db.TryCreate(ctx, id, objectID, time.Now()) - if err != nil { - return err - } - } - return nil -} - -func (*PinnedObjectDB) Delete(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID) error { - err := ctx.Exec("delete from PinnedObject where StorageID = ? and ObjectID = ?", stgID, objectID).Error - return err -} - -func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { - err := ctx.Exec("delete from PinnedObject where ObjectID = ?", objectID).Error - return err -} - -func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { - if len(objectIDs) == 0 { - return nil - } - - err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Delete(&cdssdk.PinnedObject{}).Error - return err -} - -func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { - err := ctx.Table("PinnedObject").Where("ObjectID in (select ObjectID from Object where PackageID = ?)", packageID).Delete(&cdssdk.PinnedObject{}).Error - return err -} - -func (*PinnedObjectDB) DeleteInPackageAtStorage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { - err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and StorageID = ?", packageID, stgID).Error - return err -} - -func (*PinnedObjectDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, objectIDs []cdssdk.ObjectID) error { - if len(objectIDs) == 0 { - return nil - } - - err := ctx.Table("PinnedObject").Where("StorageID = ? and ObjectID in (?)", stgID, objectIDs).Delete(&cdssdk.PinnedObject{}).Error - return err -} diff --git a/common/pkgs/db2/storage.go b/common/pkgs/db2/storage.go deleted file mode 100644 index a558e37..0000000 --- a/common/pkgs/db2/storage.go +++ /dev/null @@ -1,120 +0,0 @@ -package db2 - -import ( - "fmt" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" - "gorm.io/gorm" -) - -type StorageDB struct { - *DB -} - -func (db *DB) Storage() *StorageDB { - return &StorageDB{DB: db} -} - -func (db *StorageDB) GetByID(ctx SQLContext, stgID cdssdk.StorageID) (model.Storage, error) { - var stg model.Storage - err := ctx.Table("Storage").First(&stg, stgID).Error - return stg, err -} - -func (StorageDB) GetAllIDs(ctx SQLContext) ([]cdssdk.StorageID, error) { - var stgs []cdssdk.StorageID - err := ctx.Table("Storage").Select("StorageID").Find(&stgs).Error - return stgs, err -} - -func (db *StorageDB) BatchGetByID(ctx SQLContext, stgIDs []cdssdk.StorageID) ([]model.Storage, error) { - var stgs []model.Storage - err := ctx.Table("Storage").Find(&stgs, "StorageID IN (?)", stgIDs).Error - return stgs, err -} - -func (db *StorageDB) GetUserStorages(ctx SQLContext, userID cdssdk.UserID) ([]model.Storage, error) { - var stgs []model.Storage - err := ctx.Table("Storage").Select("Storage.*"). - Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). - Where("UserID = ?", userID).Find(&stgs).Error - return stgs, err -} - -func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cdssdk.StorageID, error) { - var ret []cdssdk.StorageID - err := ctx.Table("Storage").Select("StorageID").Find(&ret).Limit(count).Offset(start).Error - return ret, err -} - -func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) { - rows, err := ctx.Table("Storage").Select("Storage.StorageID"). - Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). - Where("UserID = ? and Storage.StorageID = ?", userID, storageID).Rows() - if err != nil { - return false, fmt.Errorf("execute sql: %w", err) - } - defer rows.Close() - - return rows.Next(), nil -} - -func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (model.Storage, error) { - var stg model.Storage - err := ctx.Table("Storage").Select("Storage.*"). - Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). - Where("UserID = ? and Storage.StorageID = ?", userID, storageID).First(&stg).Error - - return stg, err -} - -func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID, name string) (model.Storage, error) { - var stg model.Storage - err := ctx.Table("Storage").Select("Storage.*"). - Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID"). - Where("UserID = ? and Name = ?", userID, name).First(&stg).Error - - return stg, err -} - -func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cdssdk.HubID) ([]model.Storage, error) { - var stgs []model.Storage - err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "MasterHub = ?", hubID).Error - return stgs, err -} - -func (db *StorageDB) FillDetails(ctx SQLContext, details []stgmod.StorageDetail) error { - stgsMp := make(map[cdssdk.StorageID]*stgmod.StorageDetail) - var masterHubIDs []cdssdk.HubID - for i := range details { - stgsMp[details[i].Storage.StorageID] = &details[i] - masterHubIDs = append(masterHubIDs, details[i].Storage.MasterHub) - } - - // 获取监护Hub信息 - masterHubs, err := db.Hub().BatchGetByID(ctx, masterHubIDs) - if err != nil && err != gorm.ErrRecordNotFound { - return fmt.Errorf("getting master hub: %w", err) - } - masterHubMap := make(map[cdssdk.HubID]cdssdk.Hub) - for _, hub := range masterHubs { - masterHubMap[hub.HubID] = hub - } - for _, stg := range stgsMp { - if stg.Storage.MasterHub != 0 { - hub, ok := masterHubMap[stg.Storage.MasterHub] - if !ok { - logger.Warnf("master hub %v of storage %v not found, this storage will not be add to result", stg.Storage.MasterHub, stg.Storage) - delete(stgsMp, stg.Storage.StorageID) - continue - } - - stg.MasterHub = &hub - } - } - - return nil -} diff --git a/common/pkgs/db2/union_serializer.go b/common/pkgs/db2/union_serializer.go deleted file mode 100644 index 8c0183c..0000000 --- a/common/pkgs/db2/union_serializer.go +++ /dev/null @@ -1,44 +0,0 @@ -package db2 - -import ( - "context" - "fmt" - "reflect" - - "gitlink.org.cn/cloudream/common/utils/serder" - "gorm.io/gorm/schema" -) - -type UnionSerializer struct { -} - -func (UnionSerializer) Scan(ctx context.Context, field *schema.Field, dst reflect.Value, dbValue interface{}) error { - fieldValue := reflect.New(field.FieldType) - if dbValue != nil { - var data []byte - switch v := dbValue.(type) { - case []byte: - data = v - case string: - data = []byte(v) - default: - return fmt.Errorf("failed to unmarshal JSONB value: %#v", dbValue) - } - - err := serder.JSONToObjectExRaw(data, fieldValue.Interface()) - if err != nil { - return err - } - } - - field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem()) - return nil -} - -func (UnionSerializer) Value(ctx context.Context, field *schema.Field, dst reflect.Value, fieldValue interface{}) (interface{}, error) { - return serder.ObjectToJSONEx(fieldValue) -} - -func init() { - schema.RegisterSerializer("union", UnionSerializer{}) -} diff --git a/common/pkgs/db2/user.go b/common/pkgs/db2/user.go deleted file mode 100644 index b94e73b..0000000 --- a/common/pkgs/db2/user.go +++ /dev/null @@ -1,44 +0,0 @@ -package db2 - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gorm.io/gorm" -) - -type UserDB struct { - *DB -} - -func (db *DB) User() *UserDB { - return &UserDB{DB: db} -} - -func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (cdssdk.User, error) { - var ret cdssdk.User - err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error - return ret, err -} - -func (db *UserDB) GetByName(ctx SQLContext, name string) (cdssdk.User, error) { - var ret cdssdk.User - err := ctx.Table("User").Where("Name = ?", name).First(&ret).Error - return ret, err -} - -func (db *UserDB) Create(ctx SQLContext, name string) (cdssdk.User, error) { - _, err := db.GetByName(ctx, name) - if err == nil { - return cdssdk.User{}, gorm.ErrDuplicatedKey - } - if err != gorm.ErrRecordNotFound { - return cdssdk.User{}, err - } - - user := cdssdk.User{Name: name} - err = ctx.Table("User").Create(&user).Error - return user, err -} - -func (*UserDB) Delete(ctx SQLContext, userID cdssdk.UserID) error { - return ctx.Table("User").Delete(&cdssdk.User{UserID: userID}).Error -} diff --git a/common/pkgs/db2/user_bucket.go b/common/pkgs/db2/user_bucket.go deleted file mode 100644 index 70b1152..0000000 --- a/common/pkgs/db2/user_bucket.go +++ /dev/null @@ -1,36 +0,0 @@ -package db2 - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type UserBucketDB struct { - *DB -} - -func (db *DB) UserBucket() *UserBucketDB { - return &UserBucketDB{DB: db} -} - -func (*UserBucketDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserBucket, error) { - var userBuckets []model.UserBucket - err := ctx.Table("UserBucket").Where("UserID = ?", userID).Find(&userBuckets).Error - return userBuckets, err -} - -func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) error { - userBucket := model.UserBucket{ - UserID: userID, - BucketID: bucketID, - } - return ctx.Table("UserBucket").Create(&userBucket).Error -} - -func (*UserBucketDB) DeleteByBucketID(ctx SQLContext, bucketID cdssdk.BucketID) error { - return ctx.Table("UserBucket").Where("BucketID = ?", bucketID).Delete(&model.UserBucket{}).Error -} - -func (*UserBucketDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { - return ctx.Table("UserBucket").Where("UserID = ?", userID).Delete(&model.UserBucket{}).Error -} diff --git a/common/pkgs/db2/user_hub.go b/common/pkgs/db2/user_hub.go deleted file mode 100644 index e68a57e..0000000 --- a/common/pkgs/db2/user_hub.go +++ /dev/null @@ -1,34 +0,0 @@ -package db2 - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type UserHubDB struct { - *DB -} - -func (db *DB) UserHub() *UserHubDB { - return &UserHubDB{db} -} - -func (*UserHubDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserHub, error) { - var userHubs []model.UserHub - if err := ctx.Table("UserHub").Where("UserID = ?", userID).Find(&userHubs).Error; err != nil { - return nil, err - } - - return userHubs, nil -} - -func (*UserHubDB) Create(ctx SQLContext, userID cdssdk.UserID, hubID cdssdk.HubID) error { - return ctx.Table("UserHub").Create(&model.UserHub{ - UserID: userID, - HubID: hubID, - }).Error -} - -func (*UserHubDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { - return ctx.Table("UserHub").Delete(&model.UserHub{}, "UserID = ?", userID).Error -} diff --git a/common/pkgs/db2/user_storage.go b/common/pkgs/db2/user_storage.go deleted file mode 100644 index 2cccdbe..0000000 --- a/common/pkgs/db2/user_storage.go +++ /dev/null @@ -1,34 +0,0 @@ -package db2 - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type UserStorageDB struct { - *DB -} - -func (db *DB) UserStorage() *UserStorageDB { - return &UserStorageDB{db} -} - -func (*UserStorageDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserStorage, error) { - var userStgs []model.UserStorage - if err := ctx.Table("UserStorage").Where("UserID = ?", userID).Find(&userStgs).Error; err != nil { - return nil, err - } - - return userStgs, nil -} - -func (*UserStorageDB) Create(ctx SQLContext, userID cdssdk.UserID, stgID cdssdk.StorageID) error { - return ctx.Table("UserStorage").Create(&model.UserStorage{ - UserID: userID, - StorageID: stgID, - }).Error -} - -func (*UserStorageDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { - return ctx.Table("UserStorage").Delete(&model.UserStorage{}, "UserID = ?", userID).Error -} diff --git a/common/pkgs/db2/utils.go b/common/pkgs/db2/utils.go deleted file mode 100644 index deeda45..0000000 --- a/common/pkgs/db2/utils.go +++ /dev/null @@ -1,81 +0,0 @@ -package db2 - -import ( - "strings" - - "gorm.io/gorm" -) - -const ( - maxPlaceholderCount = 65535 -) - -func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(result *gorm.DB) bool) error { - if argCnt == 0 { - result := ctx.Exec(sql, toInterfaceSlice(arr)...) - if result.Error != nil { - return result.Error - } - - if callback != nil { - callback(result) - } - - return nil - } - - batchSize := maxPlaceholderCount / argCnt - for len(arr) > 0 { - curBatchSize := min(batchSize, len(arr)) - - result := ctx.Exec(sql, toInterfaceSlice(arr[:curBatchSize])...) - if result.Error != nil { - return result.Error - } - if callback != nil && !callback(result) { - return nil - } - - arr = arr[curBatchSize:] - } - - return nil -} - -// 将 []T 转换为 []interface{} -func toInterfaceSlice[T any](arr []T) []interface{} { - interfaceSlice := make([]interface{}, len(arr)) - for i, v := range arr { - interfaceSlice[i] = v - } - return interfaceSlice -} - -func min(a, b int) int { - if a < b { - return a - } - return b -} - -func escapeLike(left, right, word string) string { - var n int - for i := range word { - if c := word[i]; c == '%' || c == '_' || c == '\\' { - n++ - } - } - // No characters to escape. - if n == 0 { - return left + word + right - } - var b strings.Builder - b.Grow(len(word) + n) - for _, c := range word { - if c == '%' || c == '_' || c == '\\' { - b.WriteByte('\\') - } - b.WriteRune(c) - } - return left + b.String() + right -} diff --git a/common/pkgs/mq/coordinator/agent.go b/common/pkgs/mq/coordinator/agent.go deleted file mode 100644 index 4ff5080..0000000 --- a/common/pkgs/mq/coordinator/agent.go +++ /dev/null @@ -1,4 +0,0 @@ -package coordinator - -type AgentService interface { -} diff --git a/common/pkgs/mq/coordinator/bucket.go b/common/pkgs/mq/coordinator/bucket.go deleted file mode 100644 index 238eb61..0000000 --- a/common/pkgs/mq/coordinator/bucket.go +++ /dev/null @@ -1,154 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type BucketService interface { - GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, *mq.CodeMessage) - - GetUserBuckets(msg *GetUserBuckets) (*GetUserBucketsResp, *mq.CodeMessage) - - GetBucketPackages(msg *GetBucketPackages) (*GetBucketPackagesResp, *mq.CodeMessage) - - CreateBucket(msg *CreateBucket) (*CreateBucketResp, *mq.CodeMessage) - - DeleteBucket(msg *DeleteBucket) (*DeleteBucketResp, *mq.CodeMessage) -} - -// 根据桶名获取桶 -var _ = Register(Service.GetBucketByName) - -type GetBucketByName struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - Name string `json:"name"` -} -type GetBucketByNameResp struct { - mq.MessageBodyBase - Bucket cdssdk.Bucket `json:"bucket"` -} - -func ReqGetBucketByName(userID cdssdk.UserID, name string) *GetBucketByName { - return &GetBucketByName{ - UserID: userID, - Name: name, - } -} -func RespGetBucketByName(bucket cdssdk.Bucket) *GetBucketByNameResp { - return &GetBucketByNameResp{ - Bucket: bucket, - } -} -func (client *Client) GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, error) { - return mq.Request(Service.GetBucketByName, client.rabbitCli, msg) -} - -// 获取用户所有的桶 -var _ = Register(Service.GetUserBuckets) - -type GetUserBuckets struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` -} -type GetUserBucketsResp struct { - mq.MessageBodyBase - Buckets []model.Bucket `json:"buckets"` -} - -func NewGetUserBuckets(userID cdssdk.UserID) *GetUserBuckets { - return &GetUserBuckets{ - UserID: userID, - } -} -func NewGetUserBucketsResp(buckets []model.Bucket) *GetUserBucketsResp { - return &GetUserBucketsResp{ - Buckets: buckets, - } -} -func (client *Client) GetUserBuckets(msg *GetUserBuckets) (*GetUserBucketsResp, error) { - return mq.Request(Service.GetUserBuckets, client.rabbitCli, msg) -} - -// 获取桶中的所有Package -var _ = Register(Service.GetBucketPackages) - -type GetBucketPackages struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - BucketID cdssdk.BucketID `json:"bucketID"` -} -type GetBucketPackagesResp struct { - mq.MessageBodyBase - Packages []model.Package `json:"packages"` -} - -func NewGetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) *GetBucketPackages { - return &GetBucketPackages{ - UserID: userID, - BucketID: bucketID, - } -} -func NewGetBucketPackagesResp(packages []model.Package) *GetBucketPackagesResp { - return &GetBucketPackagesResp{ - Packages: packages, - } -} -func (client *Client) GetBucketPackages(msg *GetBucketPackages) (*GetBucketPackagesResp, error) { - return mq.Request(Service.GetBucketPackages, client.rabbitCli, msg) -} - -// 创建桶 -var _ = Register(Service.CreateBucket) - -type CreateBucket struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - BucketName string `json:"bucketName"` -} -type CreateBucketResp struct { - mq.MessageBodyBase - Bucket cdssdk.Bucket `json:"bucket"` -} - -func NewCreateBucket(userID cdssdk.UserID, bucketName string) *CreateBucket { - return &CreateBucket{ - UserID: userID, - BucketName: bucketName, - } -} -func NewCreateBucketResp(bucket cdssdk.Bucket) *CreateBucketResp { - return &CreateBucketResp{ - Bucket: bucket, - } -} -func (client *Client) CreateBucket(msg *CreateBucket) (*CreateBucketResp, error) { - return mq.Request(Service.CreateBucket, client.rabbitCli, msg) -} - -// 删除桶 -var _ = Register(Service.DeleteBucket) - -type DeleteBucket struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - BucketID cdssdk.BucketID `json:"bucketID"` -} -type DeleteBucketResp struct { - mq.MessageBodyBase -} - -func NewDeleteBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) *DeleteBucket { - return &DeleteBucket{ - UserID: userID, - BucketID: bucketID, - } -} -func NewDeleteBucketResp() *DeleteBucketResp { - return &DeleteBucketResp{} -} -func (client *Client) DeleteBucket(msg *DeleteBucket) (*DeleteBucketResp, error) { - return mq.Request(Service.DeleteBucket, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go deleted file mode 100644 index 8d09499..0000000 --- a/common/pkgs/mq/coordinator/cache.go +++ /dev/null @@ -1,62 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -type CacheService interface { - CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) - - CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, *mq.CodeMessage) -} - -// Package的Object移动到了节点的Cache中 -var _ = Register(Service.CachePackageMoved) - -type CachePackageMoved struct { - mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` - StorageID cdssdk.StorageID `json:"storageID"` -} -type CachePackageMovedResp struct { - mq.MessageBodyBase -} - -func NewCachePackageMoved(packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CachePackageMoved { - return &CachePackageMoved{ - PackageID: packageID, - StorageID: stgID, - } -} -func NewCachePackageMovedResp() *CachePackageMovedResp { - return &CachePackageMovedResp{} -} -func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { - return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) -} - -// 删除移动到指定节点Cache中的Package -var _ = Register(Service.CacheRemovePackage) - -type CacheRemovePackage struct { - mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` - StorageID cdssdk.StorageID `json:"storageID"` -} -type CacheRemovePackageResp struct { - mq.MessageBodyBase -} - -func ReqCacheRemoveMovedPackage(packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CacheRemovePackage { - return &CacheRemovePackage{ - PackageID: packageID, - StorageID: stgID, - } -} -func RespCacheRemovePackage() *CacheRemovePackageResp { - return &CacheRemovePackageResp{} -} -func (client *Client) CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, error) { - return mq.Request(Service.CacheRemovePackage, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/hub.go b/common/pkgs/mq/coordinator/hub.go index e8dfe70..5c8505e 100644 --- a/common/pkgs/mq/coordinator/hub.go +++ b/common/pkgs/mq/coordinator/hub.go @@ -2,73 +2,13 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type HubService interface { - GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, *mq.CodeMessage) - - GetUserHubs(msg *GetUserHubs) (*GetUserHubsResp, *mq.CodeMessage) - GetHubs(msg *GetHubs) (*GetHubsResp, *mq.CodeMessage) GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *mq.CodeMessage) - - UpdateHubConnectivities(msg *UpdateHubConnectivities) (*UpdateHubConnectivitiesResp, *mq.CodeMessage) -} - -var _ = Register(Service.GetHubConfig) - -type GetHubConfig struct { - mq.MessageBodyBase - HubID cdssdk.HubID `json:"hubID"` -} -type GetHubConfigResp struct { - mq.MessageBodyBase - Hub cdssdk.Hub `json:"hub"` - Storages []stgmod.StorageDetail `json:"storages"` -} - -func ReqGetHubConfig(hubID cdssdk.HubID) *GetHubConfig { - return &GetHubConfig{ - HubID: hubID, - } -} -func RespGetHubConfig(hub cdssdk.Hub, storages []stgmod.StorageDetail) *GetHubConfigResp { - return &GetHubConfigResp{ - Hub: hub, - Storages: storages, - } -} -func (client *Client) GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, error) { - return mq.Request(Service.GetHubConfig, client.rabbitCli, msg) -} - -// 查询用户可用的节点 -var _ = Register(Service.GetUserHubs) - -type GetUserHubs struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` -} -type GetUserHubsResp struct { - mq.MessageBodyBase - Hubs []cdssdk.Hub `json:"hubs"` -} - -func NewGetUserHubs(userID cdssdk.UserID) *GetUserHubs { - return &GetUserHubs{ - UserID: userID, - } -} -func NewGetUserHubsResp(hubs []cdssdk.Hub) *GetUserHubsResp { - return &GetUserHubsResp{ - Hubs: hubs, - } -} -func (client *Client) GetUserHubs(msg *GetUserHubs) (*GetUserHubsResp, error) { - return mq.Request(Service.GetUserHubs, client.rabbitCli, msg) } // 获取指定节点的信息。如果HubIDs为nil,则返回所有Hub @@ -76,24 +16,24 @@ var _ = Register(Service.GetHubs) type GetHubs struct { mq.MessageBodyBase - HubIDs []cdssdk.HubID `json:"hubIDs"` + HubIDs []cortypes.HubID `json:"hubIDs"` } type GetHubsResp struct { mq.MessageBodyBase - Hubs []*cdssdk.Hub `json:"hubs"` + Hubs []*cortypes.Hub `json:"hubs"` } -func NewGetHubs(hubIDs []cdssdk.HubID) *GetHubs { +func NewGetHubs(hubIDs []cortypes.HubID) *GetHubs { return &GetHubs{ HubIDs: hubIDs, } } -func NewGetHubsResp(hubs []*cdssdk.Hub) *GetHubsResp { +func NewGetHubsResp(hubs []*cortypes.Hub) *GetHubsResp { return &GetHubsResp{ Hubs: hubs, } } -func (r *GetHubsResp) GetHub(id cdssdk.HubID) *cdssdk.Hub { +func (r *GetHubsResp) GetHub(id cortypes.HubID) *cortypes.Hub { for _, n := range r.Hubs { if n.HubID == id { return n @@ -111,19 +51,19 @@ var _ = Register(Service.GetHubConnectivities) type GetHubConnectivities struct { mq.MessageBodyBase - HubIDs []cdssdk.HubID `json:"hubIDs"` + HubIDs []cortypes.HubID `json:"hubIDs"` } type GetHubConnectivitiesResp struct { mq.MessageBodyBase - Connectivities []cdssdk.HubConnectivity `json:"hubs"` + Connectivities []cortypes.HubConnectivity `json:"hubs"` } -func ReqGetHubConnectivities(hubIDs []cdssdk.HubID) *GetHubConnectivities { +func ReqGetHubConnectivities(hubIDs []cortypes.HubID) *GetHubConnectivities { return &GetHubConnectivities{ HubIDs: hubIDs, } } -func RespGetHubConnectivities(cons []cdssdk.HubConnectivity) *GetHubConnectivitiesResp { +func RespGetHubConnectivities(cons []cortypes.HubConnectivity) *GetHubConnectivitiesResp { return &GetHubConnectivitiesResp{ Connectivities: cons, } @@ -131,26 +71,3 @@ func RespGetHubConnectivities(cons []cdssdk.HubConnectivity) *GetHubConnectiviti func (client *Client) GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, error) { return mq.Request(Service.GetHubConnectivities, client.rabbitCli, msg) } - -// 批量更新节点连通性信息 -var _ = Register(Service.UpdateHubConnectivities) - -type UpdateHubConnectivities struct { - mq.MessageBodyBase - Connectivities []cdssdk.HubConnectivity `json:"connectivities"` -} -type UpdateHubConnectivitiesResp struct { - mq.MessageBodyBase -} - -func ReqUpdateHubConnectivities(cons []cdssdk.HubConnectivity) *UpdateHubConnectivities { - return &UpdateHubConnectivities{ - Connectivities: cons, - } -} -func RespUpdateHubConnectivities() *UpdateHubConnectivitiesResp { - return &UpdateHubConnectivitiesResp{} -} -func (client *Client) UpdateHubConnectivities(msg *UpdateHubConnectivities) (*UpdateHubConnectivitiesResp, error) { - return mq.Request(Service.UpdateHubConnectivities, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go deleted file mode 100644 index 9c2fba4..0000000 --- a/common/pkgs/mq/coordinator/object.go +++ /dev/null @@ -1,400 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" - - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type ObjectService interface { - GetObjects(msg *GetObjects) (*GetObjectsResp, *mq.CodeMessage) - - ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, *mq.CodeMessage) - - GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) - - GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) - - GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, *mq.CodeMessage) - - UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, *mq.CodeMessage) - - UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage) - - MoveObjects(msg *MoveObjects) (*MoveObjectsResp, *mq.CodeMessage) - - DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) - - CloneObjects(msg *CloneObjects) (*CloneObjectsResp, *mq.CodeMessage) - - GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, *mq.CodeMessage) - - AddAccessStat(msg *AddAccessStat) - - NewMultipartUploadObject(msg *NewMultipartUploadObject) (*NewMultipartUploadObjectResp, *mq.CodeMessage) - - AddMultipartUploadPart(msg *AddMultipartUploadPart) (*AddMultipartUploadPartResp, *mq.CodeMessage) -} - -var _ = Register(Service.GetObjects) - -type GetObjects struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` -} -type GetObjectsResp struct { - mq.MessageBodyBase - Objects []*cdssdk.Object `json:"objects"` -} - -func ReqGetObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *GetObjects { - return &GetObjects{ - UserID: userID, - ObjectIDs: objectIDs, - } -} -func RespGetObjects(objects []*cdssdk.Object) *GetObjectsResp { - return &GetObjectsResp{ - Objects: objects, - } -} -func (client *Client) GetObjects(msg *GetObjects) (*GetObjectsResp, error) { - return mq.Request(Service.GetObjects, client.rabbitCli, msg) -} - -// 查询指定前缀的Object,返回的Objects会按照ObjectID升序 -var _ = Register(Service.ListObjectsByPath) - -type ListObjectsByPath struct { - mq.MessageBodyBase - cdsapi.ObjectListByPath -} -type ListObjectsByPathResp struct { - mq.MessageBodyBase - cdsapi.ObjectListByPathResp -} - -func ReqListObjectsByPath(req cdsapi.ObjectListByPath) *ListObjectsByPath { - return &ListObjectsByPath{ - ObjectListByPath: req, - } -} -func RespListObjectsByPath(resp cdsapi.ObjectListByPathResp) *ListObjectsByPathResp { - return &ListObjectsByPathResp{ - ObjectListByPathResp: resp, - } -} -func (client *Client) ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, error) { - return mq.Request(Service.ListObjectsByPath, client.rabbitCli, msg) -} - -// 查询Package中的所有Object,返回的Objects会按照ObjectID升序 -var _ = Register(Service.GetPackageObjects) - -type GetPackageObjects struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` -} -type GetPackageObjectsResp struct { - mq.MessageBodyBase - Objects []model.Object `json:"objects"` -} - -func ReqGetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageObjects { - return &GetPackageObjects{ - UserID: userID, - PackageID: packageID, - } -} -func RespGetPackageObjects(objects []model.Object) *GetPackageObjectsResp { - return &GetPackageObjectsResp{ - Objects: objects, - } -} -func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) { - return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg) -} - -// 获取Package中所有Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序 -var _ = Register(Service.GetPackageObjectDetails) - -type GetPackageObjectDetails struct { - mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` -} -type GetPackageObjectDetailsResp struct { - mq.MessageBodyBase - Objects []stgmod.ObjectDetail `json:"objects"` -} - -func ReqGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails { - return &GetPackageObjectDetails{ - PackageID: packageID, - } -} -func RespPackageObjectDetails(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp { - return &GetPackageObjectDetailsResp{ - Objects: objects, - } -} -func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, error) { - return mq.Request(Service.GetPackageObjectDetails, client.rabbitCli, msg) -} - -// 获取多个Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序。 -var _ = Register(Service.GetObjectDetails) - -type GetObjectDetails struct { - mq.MessageBodyBase - ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` -} -type GetObjectDetailsResp struct { - mq.MessageBodyBase - Objects []*stgmod.ObjectDetail `json:"objects"` // 如果没有查询到某个ID对应的信息,则此数组对应位置为nil -} - -func ReqGetObjectDetails(objectIDs []cdssdk.ObjectID) *GetObjectDetails { - return &GetObjectDetails{ - ObjectIDs: objectIDs, - } -} -func RespGetObjectDetails(objects []*stgmod.ObjectDetail) *GetObjectDetailsResp { - return &GetObjectDetailsResp{ - Objects: objects, - } -} -func (client *Client) GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, error) { - return mq.Request(Service.GetObjectDetails, client.rabbitCli, msg) -} - -// 更新Object的冗余方式 -var _ = Register(Service.UpdateObjectRedundancy) - -type UpdateObjectRedundancy struct { - mq.MessageBodyBase - Updatings []UpdatingObjectRedundancy `json:"updatings"` -} -type UpdateObjectRedundancyResp struct { - mq.MessageBodyBase -} -type UpdatingObjectRedundancy struct { - ObjectID cdssdk.ObjectID `json:"objectID"` - FileHash cdssdk.FileHash `json:"fileHash"` - Size int64 `json:"size"` - Redundancy cdssdk.Redundancy `json:"redundancy"` - PinnedAt []cdssdk.StorageID `json:"pinnedAt"` - Blocks []stgmod.ObjectBlock `json:"blocks"` -} - -func ReqUpdateObjectRedundancy(updatings []UpdatingObjectRedundancy) *UpdateObjectRedundancy { - return &UpdateObjectRedundancy{ - Updatings: updatings, - } -} -func RespUpdateObjectRedundancy() *UpdateObjectRedundancyResp { - return &UpdateObjectRedundancyResp{} -} -func (client *Client) UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, error) { - return mq.Request(Service.UpdateObjectRedundancy, client.rabbitCli, msg) -} - -// 更新Object元数据 -var _ = Register(Service.UpdateObjectInfos) - -type UpdateObjectInfos struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - Updatings []cdsapi.UpdatingObject `json:"updatings"` -} - -type UpdateObjectInfosResp struct { - mq.MessageBodyBase - Successes []cdssdk.ObjectID `json:"successes"` -} - -func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdsapi.UpdatingObject) *UpdateObjectInfos { - return &UpdateObjectInfos{ - UserID: userID, - Updatings: updatings, - } -} -func RespUpdateObjectInfos(successes []cdssdk.ObjectID) *UpdateObjectInfosResp { - return &UpdateObjectInfosResp{ - Successes: successes, - } -} -func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) { - return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg) -} - -// 移动Object -var _ = Register(Service.MoveObjects) - -type MoveObjects struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - Movings []cdsapi.MovingObject `json:"movings"` -} - -type MoveObjectsResp struct { - mq.MessageBodyBase - Successes []cdssdk.ObjectID `json:"successes"` -} - -func ReqMoveObjects(userID cdssdk.UserID, movings []cdsapi.MovingObject) *MoveObjects { - return &MoveObjects{ - UserID: userID, - Movings: movings, - } -} -func RespMoveObjects(successes []cdssdk.ObjectID) *MoveObjectsResp { - return &MoveObjectsResp{ - Successes: successes, - } -} -func (client *Client) MoveObjects(msg *MoveObjects) (*MoveObjectsResp, error) { - return mq.Request(Service.MoveObjects, client.rabbitCli, msg) -} - -// 删除Object -var _ = Register(Service.DeleteObjects) - -type DeleteObjects struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` -} - -type DeleteObjectsResp struct { - mq.MessageBodyBase - Successes []cdssdk.ObjectID `json:"successes"` -} - -func ReqDeleteObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *DeleteObjects { - return &DeleteObjects{ - UserID: userID, - ObjectIDs: objectIDs, - } -} -func RespDeleteObjects(sucs []cdssdk.ObjectID) *DeleteObjectsResp { - return &DeleteObjectsResp{ - Successes: sucs, - } -} -func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, error) { - return mq.Request(Service.DeleteObjects, client.rabbitCli, msg) -} - -// 克隆Object -var _ = Register(Service.CloneObjects) - -type CloneObjects struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - Clonings []cdsapi.CloningObject `json:"clonings"` -} -type CloneObjectsResp struct { - mq.MessageBodyBase - Objects []*cdssdk.Object `json:"objects"` -} - -func ReqCloneObjects(userID cdssdk.UserID, clonings []cdsapi.CloningObject) *CloneObjects { - return &CloneObjects{ - UserID: userID, - Clonings: clonings, - } -} -func RespCloneObjects(objects []*cdssdk.Object) *CloneObjectsResp { - return &CloneObjectsResp{ - Objects: objects, - } -} -func (client *Client) CloneObjects(msg *CloneObjects) (*CloneObjectsResp, error) { - return mq.Request(Service.CloneObjects, client.rabbitCli, msg) -} - -// 增加访问计数 -var _ = RegisterNoReply(Service.AddAccessStat) - -type AddAccessStat struct { - mq.MessageBodyBase - Entries []AddAccessStatEntry `json:"entries"` -} - -type AddAccessStatEntry struct { - ObjectID cdssdk.ObjectID `json:"objectID"` - PackageID cdssdk.PackageID `json:"packageID"` - StorageID cdssdk.StorageID `json:"storageID"` - Counter float64 `json:"counter"` -} - -func ReqAddAccessStat(entries []AddAccessStatEntry) *AddAccessStat { - return &AddAccessStat{ - Entries: entries, - } -} - -func (client *Client) AddAccessStat(msg *AddAccessStat) error { - return mq.Send(Service.AddAccessStat, client.rabbitCli, msg) -} - -var _ = Register(Service.NewMultipartUploadObject) - -type NewMultipartUploadObject struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` - Path string `json:"path"` -} -type NewMultipartUploadObjectResp struct { - mq.MessageBodyBase - Object cdssdk.Object `json:"object"` -} - -func ReqNewMultipartUploadObject(userID cdssdk.UserID, packageID cdssdk.PackageID, path string) *NewMultipartUploadObject { - return &NewMultipartUploadObject{ - UserID: userID, - PackageID: packageID, - Path: path, - } -} -func RespNewMultipartUploadObject(object cdssdk.Object) *NewMultipartUploadObjectResp { - return &NewMultipartUploadObjectResp{ - Object: object, - } -} -func (client *Client) NewMultipartUploadObject(msg *NewMultipartUploadObject) (*NewMultipartUploadObjectResp, error) { - return mq.Request(Service.NewMultipartUploadObject, client.rabbitCli, msg) -} - -var _ = Register(Service.AddMultipartUploadPart) - -type AddMultipartUploadPart struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - ObjectID cdssdk.ObjectID `json:"objectID"` - Block stgmod.ObjectBlock `json:"block"` -} - -type AddMultipartUploadPartResp struct { - mq.MessageBodyBase -} - -func ReqAddMultipartUploadPart(userID cdssdk.UserID, objectID cdssdk.ObjectID, blk stgmod.ObjectBlock) *AddMultipartUploadPart { - return &AddMultipartUploadPart{ - UserID: userID, - ObjectID: objectID, - Block: blk, - } -} -func RespAddMultipartUploadPart() *AddMultipartUploadPartResp { - return &AddMultipartUploadPartResp{} -} -func (client *Client) AddMultipartUploadPart(msg *AddMultipartUploadPart) (*AddMultipartUploadPartResp, error) { - return mq.Request(Service.AddMultipartUploadPart, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go deleted file mode 100644 index f8dada3..0000000 --- a/common/pkgs/mq/coordinator/package.go +++ /dev/null @@ -1,258 +0,0 @@ -package coordinator - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" -) - -type PackageService interface { - GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage) - - GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, *mq.CodeMessage) - - CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage) - - UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, *mq.CodeMessage) - - DeletePackage(msg *DeletePackage) (*DeletePackageResp, *mq.CodeMessage) - - ClonePackage(msg *ClonePackage) (*ClonePackageResp, *mq.CodeMessage) - - GetPackageCachedStorages(msg *GetPackageCachedStorages) (*GetPackageCachedStoragesResp, *mq.CodeMessage) -} - -// 获取Package基本信息 -var _ = Register(Service.GetPackage) - -type GetPackage struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` -} -type GetPackageResp struct { - mq.MessageBodyBase - model.Package -} - -func NewGetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackage { - return &GetPackage{ - UserID: userID, - PackageID: packageID, - } -} -func NewGetPackageResp(pkg model.Package) *GetPackageResp { - return &GetPackageResp{ - Package: pkg, - } -} - -func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) { - return mq.Request(Service.GetPackage, client.rabbitCli, msg) -} - -// 根据名称获取Package -var _ = Register(Service.GetPackageByName) - -type GetPackageByName struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - BucketName string `json:"bucketName"` - PackageName string `json:"packageName"` -} -type GetPackageByNameResp struct { - mq.MessageBodyBase - Package cdssdk.Package `json:"package"` -} - -func ReqGetPackageByName(userID cdssdk.UserID, bucketName string, packageName string) *GetPackageByName { - return &GetPackageByName{ - UserID: userID, - BucketName: bucketName, - PackageName: packageName, - } -} -func NewGetPackageByNameResp(pkg cdssdk.Package) *GetPackageByNameResp { - return &GetPackageByNameResp{ - Package: pkg, - } -} -func (client *Client) GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, error) { - return mq.Request(Service.GetPackageByName, client.rabbitCli, msg) -} - -// 创建一个Package -var _ = Register(Service.CreatePackage) - -type CreatePackage struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - BucketID cdssdk.BucketID `json:"bucketID"` - Name string `json:"name"` -} -type CreatePackageResp struct { - mq.MessageBodyBase - Package cdssdk.Package `json:"package"` -} - -func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) *CreatePackage { - return &CreatePackage{ - UserID: userID, - BucketID: bucketID, - Name: name, - } -} -func NewCreatePackageResp(pkg cdssdk.Package) *CreatePackageResp { - return &CreatePackageResp{ - Package: pkg, - } -} - -func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, error) { - return mq.Request(Service.CreatePackage, client.rabbitCli, msg) -} - -// 更新Package -var _ = Register(Service.UpdatePackage) - -type UpdatePackage struct { - mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` - Adds []AddObjectEntry `json:"adds"` -} -type UpdatePackageResp struct { - mq.MessageBodyBase - Added []cdssdk.Object `json:"added"` -} -type AddObjectEntry struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - FileHash cdssdk.FileHash `json:"fileHash"` - UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 - StorageIDs []cdssdk.StorageID `json:"storageIDs"` -} - -func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry) *UpdatePackage { - return &UpdatePackage{ - PackageID: packageID, - Adds: adds, - } -} -func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp { - return &UpdatePackageResp{ - Added: added, - } -} -func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgIDs []cdssdk.StorageID) AddObjectEntry { - return AddObjectEntry{ - Path: path, - Size: size, - FileHash: fileHash, - UploadTime: uploadTime, - StorageIDs: stgIDs, - } -} -func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { - return mq.Request(Service.UpdatePackage, client.rabbitCli, msg) -} - -// 删除对象 -var _ = Register(Service.DeletePackage) - -type DeletePackage struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` -} -type DeletePackageResp struct { - mq.MessageBodyBase -} - -func NewDeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *DeletePackage { - return &DeletePackage{ - UserID: userID, - PackageID: packageID, - } -} -func NewDeletePackageResp() *DeletePackageResp { - return &DeletePackageResp{} -} -func (client *Client) DeletePackage(msg *DeletePackage) (*DeletePackageResp, error) { - return mq.Request(Service.DeletePackage, client.rabbitCli, msg) -} - -// 克隆Package -var _ = Register(Service.ClonePackage) - -type ClonePackage struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` - BucketID cdssdk.BucketID `json:"bucketID"` - Name string `json:"name"` -} -type ClonePackageResp struct { - mq.MessageBodyBase - Package cdssdk.Package `json:"package"` -} - -func ReqClonePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, bucketID cdssdk.BucketID, name string) *ClonePackage { - return &ClonePackage{ - UserID: userID, - PackageID: packageID, - BucketID: bucketID, - Name: name, - } -} -func RespClonePackage(pkg cdssdk.Package) *ClonePackageResp { - return &ClonePackageResp{ - Package: pkg, - } -} - -func (client *Client) ClonePackage(msg *ClonePackage) (*ClonePackageResp, error) { - return mq.Request(Service.ClonePackage, client.rabbitCli, msg) -} - -// 根据PackageID获取object分布情况 -var _ = Register(Service.GetPackageCachedStorages) - -type GetPackageCachedStorages struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` -} - -type PackageCachedStorageInfo struct { - StorageID int64 `json:"storageID"` - FileSize int64 `json:"fileSize"` - ObjectCount int64 `json:"objectCount"` -} - -type GetPackageCachedStoragesResp struct { - mq.MessageBodyBase - cdssdk.PackageCachingInfo -} - -func ReqGetPackageCachedStorages(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageCachedStorages { - return &GetPackageCachedStorages{ - UserID: userID, - PackageID: packageID, - } -} - -func ReqGetPackageCachedStoragesResp(stgInfos []cdssdk.StoragePackageCachingInfo, packageSize int64) *GetPackageCachedStoragesResp { - return &GetPackageCachedStoragesResp{ - PackageCachingInfo: cdssdk.PackageCachingInfo{ - StorageInfos: stgInfos, - PackageSize: packageSize, - }, - } -} - -func (client *Client) GetPackageCachedStorages(msg *GetPackageCachedStorages) (*GetPackageCachedStoragesResp, error) { - return mq.Request(Service.GetPackageCachedStorages, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/server.go b/common/pkgs/mq/coordinator/server.go index 933d9c8..3def4b1 100644 --- a/common/pkgs/mq/coordinator/server.go +++ b/common/pkgs/mq/coordinator/server.go @@ -8,21 +8,9 @@ import ( // Service 协调端接口 type Service interface { - AgentService - - BucketService - - CacheService - HubService - ObjectService - - PackageService - StorageService - - UserService } type Server struct { diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index 025a9d8..05f3a68 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -2,21 +2,11 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type StorageService interface { GetStorage(msg *GetStorage) (*GetStorageResp, *mq.CodeMessage) - - GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, *mq.CodeMessage) - - GetUserStorageDetails(msg *GetUserStorageDetails) (*GetUserStorageDetailsResp, *mq.CodeMessage) - - GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, *mq.CodeMessage) - - StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage) } // 获取Storage信息 @@ -24,21 +14,19 @@ var _ = Register(Service.GetStorage) type GetStorage struct { mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - StorageID cdssdk.StorageID `json:"storageID"` + StorageID cortypes.StorageID `json:"storageID"` } type GetStorageResp struct { mq.MessageBodyBase - Storage model.Storage `json:"storage"` + Storage cortypes.Storage `json:"storage"` } -func ReqGetStorage(userID cdssdk.UserID, storageID cdssdk.StorageID) *GetStorage { +func ReqGetStorage(storageID cortypes.StorageID) *GetStorage { return &GetStorage{ - UserID: userID, StorageID: storageID, } } -func RespGetStorage(stg model.Storage) *GetStorageResp { +func RespGetStorage(stg cortypes.Storage) *GetStorageResp { return &GetStorageResp{ Storage: stg, } @@ -46,126 +34,3 @@ func RespGetStorage(stg model.Storage) *GetStorageResp { func (client *Client) GetStorage(msg *GetStorage) (*GetStorageResp, error) { return mq.Request(Service.GetStorage, client.rabbitCli, msg) } - -// 获取Storage的详细信息 -var _ = Register(Service.GetStorageDetails) - -type GetStorageDetails struct { - mq.MessageBodyBase - StorageIDs []cdssdk.StorageID `json:"storageIDs"` -} -type GetStorageDetailsResp struct { - mq.MessageBodyBase - Storages []*stgmod.StorageDetail `json:"storages"` -} - -func ReqGetStorageDetails(storageIDs []cdssdk.StorageID) *GetStorageDetails { - return &GetStorageDetails{ - StorageIDs: storageIDs, - } -} -func RespGetStorageDetails(stgs []*stgmod.StorageDetail) *GetStorageDetailsResp { - return &GetStorageDetailsResp{ - Storages: stgs, - } -} - -func (r *GetStorageDetailsResp) ToMap() map[cdssdk.StorageID]stgmod.StorageDetail { - m := make(map[cdssdk.StorageID]stgmod.StorageDetail) - for _, stg := range r.Storages { - if stg == nil { - continue - } - - m[stg.Storage.StorageID] = *stg - } - return m -} - -func (client *Client) GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, error) { - return mq.Request(Service.GetStorageDetails, client.rabbitCli, msg) -} - -var _ = Register(Service.GetStorageByName) - -type GetStorageByName struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - Name string `json:"name"` -} -type GetStorageByNameResp struct { - mq.MessageBodyBase - Storage model.Storage `json:"storage"` -} - -func ReqGetStorageByName(userID cdssdk.UserID, name string) *GetStorageByName { - return &GetStorageByName{ - UserID: userID, - Name: name, - } -} -func RespGetStorageByNameResp(storage model.Storage) *GetStorageByNameResp { - return &GetStorageByNameResp{ - Storage: storage, - } -} -func (client *Client) GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, error) { - return mq.Request(Service.GetStorageByName, client.rabbitCli, msg) -} - -// 获取用户的Storage信息 -var _ = Register(Service.GetUserStorageDetails) - -type GetUserStorageDetails struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` -} -type GetUserStorageDetailsResp struct { - mq.MessageBodyBase - Storages []stgmod.StorageDetail `json:"storages"` -} - -func ReqGetUserStorageDetails(userID cdssdk.UserID) *GetUserStorageDetails { - return &GetUserStorageDetails{ - UserID: userID, - } -} -func RespGetUserStorageDetails(stgs []stgmod.StorageDetail) *GetUserStorageDetailsResp { - return &GetUserStorageDetailsResp{ - Storages: stgs, - } -} -func (client *Client) GetUserStorageDetails(msg *GetUserStorageDetails) (*GetUserStorageDetailsResp, error) { - return mq.Request(Service.GetUserStorageDetails, client.rabbitCli, msg) -} - -// 提交调度记录 -var _ = Register(Service.StoragePackageLoaded) - -type StoragePackageLoaded struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` - StorageID cdssdk.StorageID `json:"storageID"` - RootPath string `json:"rootPath"` - PinnedObjects []cdssdk.ObjectID `json:"pinnedObjects"` -} -type StoragePackageLoadedResp struct { - mq.MessageBodyBase -} - -func ReqStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID, rootPath string, pinnedObjects []cdssdk.ObjectID) *StoragePackageLoaded { - return &StoragePackageLoaded{ - UserID: userID, - PackageID: packageID, - StorageID: stgID, - RootPath: rootPath, - PinnedObjects: pinnedObjects, - } -} -func RespStoragePackageLoaded() *StoragePackageLoadedResp { - return &StoragePackageLoadedResp{} -} -func (client *Client) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, error) { - return mq.Request(Service.StoragePackageLoaded, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/temp.go b/common/pkgs/mq/coordinator/temp.go deleted file mode 100644 index 893cb74..0000000 --- a/common/pkgs/mq/coordinator/temp.go +++ /dev/null @@ -1,38 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" -) - -// 删除Object -var _ = Register(Service.GetDatabaseAll) - -type GetDatabaseAll struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` -} - -type GetDatabaseAllResp struct { - mq.MessageBodyBase - Buckets []cdssdk.Bucket `json:"buckets"` - Packages []cdssdk.Package `json:"packages"` - Objects []stgmod.ObjectDetail `json:"objects"` -} - -func ReqGetDatabaseAll(userID cdssdk.UserID) *GetDatabaseAll { - return &GetDatabaseAll{ - UserID: userID, - } -} -func RespGetDatabaseAll(buckets []cdssdk.Bucket, packages []cdssdk.Package, objects []stgmod.ObjectDetail) *GetDatabaseAllResp { - return &GetDatabaseAllResp{ - Buckets: buckets, - Packages: packages, - Objects: objects, - } -} -func (client *Client) GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, error) { - return mq.Request(Service.GetDatabaseAll, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/user.go b/common/pkgs/mq/coordinator/user.go deleted file mode 100644 index 9dcf294..0000000 --- a/common/pkgs/mq/coordinator/user.go +++ /dev/null @@ -1,67 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -type UserService interface { - CreateUser(msg *CreateUser) (*CreateUserResp, *mq.CodeMessage) - - DeleteUser(msg *DeleteUser) (*DeleteUserResp, *mq.CodeMessage) -} - -// 创建用户 -var _ = Register(Service.CreateUser) - -type CreateUser struct { - mq.MessageBodyBase - Name string `json:"name"` -} - -type CreateUserResp struct { - mq.MessageBodyBase - User cdssdk.User `json:"user"` -} - -func ReqCreateUser(name string) *CreateUser { - return &CreateUser{ - Name: name, - } -} - -func RespCreateUser(user cdssdk.User) *CreateUserResp { - return &CreateUserResp{ - User: user, - } -} - -func (c *Client) CreateUser(msg *CreateUser) (*CreateUserResp, error) { - return mq.Request(Service.CreateUser, c.rabbitCli, msg) -} - -// 删除用户 -var _ = Register(Service.DeleteUser) - -type DeleteUser struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` -} - -type DeleteUserResp struct { - mq.MessageBodyBase -} - -func ReqDeleteUser(userID cdssdk.UserID) *DeleteUser { - return &DeleteUser{ - UserID: userID, - } -} - -func RespDeleteUser() *DeleteUserResp { - return &DeleteUserResp{} -} - -func (c *Client) DeleteUser(msg *DeleteUser) (*DeleteUserResp, error) { - return mq.Request(Service.DeleteUser, c.rabbitCli, msg) -}