Browse Source

调整代码结构

gitlink
Sydonian 11 months ago
parent
commit
23afe5c43a
31 changed files with 15 additions and 3294 deletions
  1. +0
    -149
      common/pkgs/db2/bucket.go
  2. +0
    -105
      common/pkgs/db2/cache.go
  3. +0
    -21
      common/pkgs/db2/config/config.go
  4. +0
    -38
      common/pkgs/db2/db2.go
  5. +0
    -60
      common/pkgs/db2/hub.go
  6. +0
    -37
      common/pkgs/db2/hub_connectivity.go
  7. +0
    -36
      common/pkgs/db2/location.go
  8. +0
    -68
      common/pkgs/db2/model/model.go
  9. +0
    -511
      common/pkgs/db2/object.go
  10. +0
    -116
      common/pkgs/db2/object_access_stat.go
  11. +0
    -122
      common/pkgs/db2/object_block.go
  12. +0
    -209
      common/pkgs/db2/package.go
  13. +0
    -79
      common/pkgs/db2/package_access_stat.go
  14. +0
    -122
      common/pkgs/db2/pinned_object.go
  15. +0
    -120
      common/pkgs/db2/storage.go
  16. +0
    -44
      common/pkgs/db2/union_serializer.go
  17. +0
    -44
      common/pkgs/db2/user.go
  18. +0
    -36
      common/pkgs/db2/user_bucket.go
  19. +0
    -34
      common/pkgs/db2/user_hub.go
  20. +0
    -34
      common/pkgs/db2/user_storage.go
  21. +0
    -81
      common/pkgs/db2/utils.go
  22. +0
    -4
      common/pkgs/mq/coordinator/agent.go
  23. +0
    -154
      common/pkgs/mq/coordinator/bucket.go
  24. +0
    -62
      common/pkgs/mq/coordinator/cache.go
  25. +10
    -93
      common/pkgs/mq/coordinator/hub.go
  26. +0
    -400
      common/pkgs/mq/coordinator/object.go
  27. +0
    -258
      common/pkgs/mq/coordinator/package.go
  28. +0
    -12
      common/pkgs/mq/coordinator/server.go
  29. +5
    -140
      common/pkgs/mq/coordinator/storage.go
  30. +0
    -38
      common/pkgs/mq/coordinator/temp.go
  31. +0
    -67
      common/pkgs/mq/coordinator/user.go

+ 0
- 149
common/pkgs/db2/bucket.go View File

@@ -1,149 +0,0 @@
package db2

import (
"errors"
"fmt"
"time"

"gorm.io/gorm"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type BucketDB struct {
*DB
}

func (db *DB) Bucket() *BucketDB {
return &BucketDB{DB: db}
}

func (db *BucketDB) GetByID(ctx SQLContext, bucketID cdssdk.BucketID) (cdssdk.Bucket, error) {
var ret cdssdk.Bucket
err := ctx.Table("Bucket").Where("BucketID = ?", bucketID).First(&ret).Error
return ret, err
}

func (db *BucketDB) GetByName(ctx SQLContext, bucketName string) (cdssdk.Bucket, error) {
var ret cdssdk.Bucket
err := ctx.Table("Bucket").Where("Name = ?", bucketName).First(&ret).Error
return ret, err
}

// GetIDByName 根据BucketName查询BucketID
func (db *BucketDB) GetIDByName(ctx SQLContext, bucketName string) (int64, error) {
var result struct {
BucketID int64 `gorm:"column:BucketID"`
BucketName string `gorm:"column:BucketName"`
}

err := ctx.Table("Bucket").Select("BucketID, BucketName").Where("BucketName = ?", bucketName).Scan(&result).Error
if err != nil {
return 0, err
}

return result.BucketID, nil
}

func (*BucketDB) GetAll(ctx SQLContext) ([]cdssdk.Bucket, error) {
var ret []cdssdk.Bucket
err := ctx.Table("Bucket").Find(&ret).Error
return ret, err
}

// IsAvailable 判断用户是否有指定Bucekt的权限
func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID cdssdk.BucketID, userID cdssdk.UserID) (bool, error) {
_, err := db.GetUserBucket(ctx, userID, bucketID)
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find bucket failed, err: %w", err)
}

return true, nil
}

func (*BucketDB) GetUserBucket(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) {
var ret model.Bucket
err := ctx.Table("UserBucket").
Select("Bucket.*").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ? AND Bucket.BucketID = ?", userID, bucketID).
First(&ret).Error
return ret, err
}

func (*BucketDB) GetUserBucketByName(ctx SQLContext, userID cdssdk.UserID, bucketName string) (model.Bucket, error) {
var ret model.Bucket
err := ctx.Table("UserBucket").
Select("Bucket.*").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName).
First(&ret).Error
return ret, err
}

func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.Bucket, error) {
var ret []model.Bucket
err := ctx.Table("UserBucket").
Select("Bucket.*").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ?", userID).
Find(&ret).Error
return ret, err
}

func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string, createTime time.Time) (cdssdk.Bucket, error) {
var bucketID int64
err := ctx.Table("UserBucket").
Select("Bucket.BucketID").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName).
Scan(&bucketID).Error

if err != nil {
return cdssdk.Bucket{}, err
}

if bucketID > 0 {
return cdssdk.Bucket{}, gorm.ErrDuplicatedKey
}

newBucket := cdssdk.Bucket{Name: bucketName, CreateTime: createTime, CreatorID: userID}
if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil {
return cdssdk.Bucket{}, fmt.Errorf("insert bucket failed, err: %w", err)
}

err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error
if err != nil {
return cdssdk.Bucket{}, fmt.Errorf("insert user bucket: %w", err)
}

return newBucket, nil
}

func (db *BucketDB) Rename(ctx SQLContext, bucketID cdssdk.BucketID, bucketName string) error {
return ctx.Table("Bucket").Where("BucketID = ?", bucketID).Update("Name", bucketName).Error
}

func (db *BucketDB) Delete(ctx SQLContext, bucketID cdssdk.BucketID) error {
return ctx.Delete(&cdssdk.Bucket{}, "BucketID = ?", bucketID).Error
}

func (db *BucketDB) DeleteComplete(tx SQLContext, bucketID cdssdk.BucketID) error {
pkgs, err := db.Package().GetBucketPackages(tx, bucketID)
if err != nil {
return err
}

for _, pkg := range pkgs {
err := db.Package().DeleteComplete(tx, pkg.PackageID)
if err != nil {
return err
}
}
return db.Bucket().Delete(tx, bucketID)
}

+ 0
- 105
common/pkgs/db2/cache.go View File

@@ -1,105 +0,0 @@
package db2

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gorm.io/gorm/clause"
)

type CacheDB struct {
*DB
}

func (db *DB) Cache() *CacheDB {
return &CacheDB{DB: db}
}

func (*CacheDB) Get(ctx SQLContext, fileHash cdssdk.FileHash, stgID cdssdk.StorageID) (model.Cache, error) {
var ret model.Cache
err := ctx.Table("Cache").Where("FileHash = ? AND StorageID = ?", fileHash, stgID).First(&ret).Error
return ret, err
}

func (*CacheDB) BatchGetAllFileHashes(ctx SQLContext, start int, count int) ([]string, error) {
var ret []string
err := ctx.Table("Cache").Distinct("FileHash").Offset(start).Limit(count).Pluck("FileHash", &ret).Error
return ret, err
}

func (*CacheDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]model.Cache, error) {
var ret []model.Cache
err := ctx.Table("Cache").Where("StorageID = ?", stgID).Find(&ret).Error
return ret, err
}

// Create 创建一条缓存记录,如果已有则不进行操作
func (*CacheDB) Create(ctx SQLContext, fileHash cdssdk.FileHash, stgID cdssdk.StorageID, priority int) error {
cache := model.Cache{FileHash: fileHash, StorageID: stgID, CreateTime: time.Now(), Priority: priority}
return ctx.Where(cache).Attrs(cache).FirstOrCreate(&cache).Error
}

// 批量创建缓存记录
func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error {
if len(caches) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "FileHash"}, {Name: "StorageID"}},
DoUpdates: clause.AssignmentColumns([]string{"CreateTime", "Priority"}),
}).Create(&caches).Error
}

func (db *CacheDB) BatchCreateOnSameStorage(ctx SQLContext, fileHashes []cdssdk.FileHash, stgID cdssdk.StorageID, priority int) error {
if len(fileHashes) == 0 {
return nil
}

var caches []model.Cache
var nowTime = time.Now()
for _, hash := range fileHashes {
caches = append(caches, model.Cache{
FileHash: hash,
StorageID: stgID,
CreateTime: nowTime,
Priority: priority,
})
}

return db.BatchCreate(ctx, caches)
}

func (*CacheDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, fileHashes []cdssdk.FileHash) error {
if len(fileHashes) == 0 {
return nil
}

return ctx.Table("Cache").Where("StorageID = ? AND FileHash IN (?)", stgID, fileHashes).Delete(&model.Cache{}).Error
}

// GetCachingFileStorages 查找缓存了指定文件的存储服务
func (*CacheDB) GetCachingFileStorages(ctx SQLContext, fileHash cdssdk.FileHash) ([]cdssdk.Storage, error) {
var stgs []cdssdk.Storage
err := ctx.Table("Cache").Select("Storage.*").
Joins("JOIN Storage ON Cache.StorageID = Storage.StorageID").
Where("Cache.FileHash = ?", fileHash).
Find(&stgs).Error
return stgs, err
}

// DeleteStorageAll 删除一个存储服务所有的记录
func (*CacheDB) DeleteStorageAll(ctx SQLContext, StorageID cdssdk.StorageID) error {
return ctx.Where("StorageID = ?", StorageID).Delete(&model.Cache{}).Error
}

// FindCachingFileUserStorages 在缓存表中查询指定数据所在的节点
func (*CacheDB) FindCachingFileUserStorages(ctx SQLContext, userID cdssdk.UserID, fileHash string) ([]cdssdk.Storage, error) {
var stgs []cdssdk.Storage
err := ctx.Table("Cache").Select("Storage.*").
Joins("JOIN UserStorage ON Cache.StorageID = UserStorage.StorageID").
Where("Cache.FileHash = ? AND UserStorage.UserID = ?", fileHash, userID).
Find(&stgs).Error
return stgs, err
}

+ 0
- 21
common/pkgs/db2/config/config.go View File

@@ -1,21 +0,0 @@
package config

import "fmt"

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
DatabaseName string `json:"databaseName"`
}

func (cfg *Config) MakeSourceString() string {
return fmt.Sprintf(
"%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s",
cfg.Account,
cfg.Password,
cfg.Address,
cfg.DatabaseName,
"Asia%2FShanghai",
)
}

+ 0
- 38
common/pkgs/db2/db2.go View File

@@ -1,38 +0,0 @@
package db2

import (
_ "github.com/go-sql-driver/mysql"
"github.com/sirupsen/logrus"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

type DB struct {
db *gorm.DB
}

func NewDB(cfg *config.Config) (*DB, error) {
mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{})
if err != nil {
logrus.Fatalf("failed to connect to database: %v", err)
}

return &DB{
db: mydb,
}, nil
}

func (db *DB) DoTx(do func(tx SQLContext) error) error {
return db.db.Transaction(func(tx *gorm.DB) error {
return do(SQLContext{tx})
})
}

type SQLContext struct {
*gorm.DB
}

func (db *DB) DefCtx() SQLContext {
return SQLContext{db.db}
}

+ 0
- 60
common/pkgs/db2/hub.go View File

@@ -1,60 +0,0 @@
package db2

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type HubDB struct {
*DB
}

func (db *DB) Hub() *HubDB {
return &HubDB{DB: db}
}

func (*HubDB) GetAllHubs(ctx SQLContext) ([]cdssdk.Hub, error) {
var ret []cdssdk.Hub

err := ctx.Table("Hub").Find(&ret).Error
return ret, err
}

func (*HubDB) GetByID(ctx SQLContext, hubID cdssdk.HubID) (cdssdk.Hub, error) {
var ret cdssdk.Hub
err := ctx.Table("Hub").Where("HubID = ?", hubID).Find(&ret).Error

return ret, err
}

func (*HubDB) BatchGetByID(ctx SQLContext, hubIDs []cdssdk.HubID) ([]cdssdk.Hub, error) {
var ret []cdssdk.Hub
err := ctx.Table("Hub").Where("HubID IN (?)", hubIDs).Find(&ret).Error

return ret, err
}

// GetUserHubs 根据用户id查询可用hub
func (*HubDB) GetUserHubs(ctx SQLContext, userID cdssdk.UserID) ([]cdssdk.Hub, error) {
var hubs []cdssdk.Hub
err := ctx.
Table("Hub").
Select("Hub.*").
Joins("JOIN UserHub ON UserHub.HubID = Hub.HubID").
Where("UserHub.UserID = ?", userID).
Find(&hubs).Error
return hubs, err
}

// UpdateState 更新状态,并且设置上次上报时间为现在
func (*HubDB) UpdateState(ctx SQLContext, hubID cdssdk.HubID, state string) error {
err := ctx.
Model(&cdssdk.Hub{}).
Where("HubID = ?", hubID).
Updates(map[string]interface{}{
"State": state,
"LastReportTime": time.Now(),
}).Error
return err
}

+ 0
- 37
common/pkgs/db2/hub_connectivity.go View File

@@ -1,37 +0,0 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gorm.io/gorm/clause"
)

type HubConnectivityDB struct {
*DB
}

func (db *DB) HubConnectivity() *HubConnectivityDB {
return &HubConnectivityDB{DB: db}
}

func (db *HubConnectivityDB) BatchGetByFromHub(ctx SQLContext, fromHubIDs []cdssdk.HubID) ([]model.HubConnectivity, error) {
if len(fromHubIDs) == 0 {
return nil, nil
}

var ret []model.HubConnectivity

err := ctx.Table("HubConnectivity").Where("FromHubID IN (?)", fromHubIDs).Find(&ret).Error
return ret, err
}

func (db *HubConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.HubConnectivity) error {
if len(cons) == 0 {
return nil
}

// 使用 GORM 的批量插入或更新
return ctx.Table("HubConnectivity").Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(&cons).Error
}

+ 0
- 36
common/pkgs/db2/location.go View File

@@ -1,36 +0,0 @@
package db2

import (
"fmt"

"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type LocationDB struct {
*DB
}

func (db *DB) Location() *LocationDB {
return &LocationDB{DB: db}
}

func (*LocationDB) GetByID(ctx SQLContext, id int64) (model.Location, error) {
var ret model.Location
err := ctx.First(&ret, id).Error
return ret, err
}

func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (model.Location, error) {
var locID int64
err := ctx.Table("Hub").Select("LocationID").Where("ExternalIP = ?", ip).Scan(&locID).Error
if err != nil {
return model.Location{}, fmt.Errorf("finding hub by external ip: %w", err)
}

loc, err := db.GetByID(ctx, locID)
if err != nil {
return model.Location{}, fmt.Errorf("getting location by id: %w", err)
}

return loc, nil
}

+ 0
- 68
common/pkgs/db2/model/model.go View File

@@ -1,68 +0,0 @@
package model

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
)

// TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里
type Storage = cdssdk.Storage

type UserBucket struct {
UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"`
BucketID cdssdk.BucketID `gorm:"column:BucketID; primaryKey; type:bigint" json:"bucketID"`
}

func (UserBucket) TableName() string {
return "UserBucket"
}

type UserHub struct {
UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"`
HubID cdssdk.HubID `gorm:"column:HubID; primaryKey; type:bigint" json:"hubID"`
}

func (UserHub) TableName() string {
return "UserHub"
}

type UserStorage struct {
UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"`
StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint" json:"storageID"`
}

func (UserStorage) TableName() string {
return "UserStorage"
}

type Bucket = cdssdk.Bucket

type Package = cdssdk.Package

type Object = cdssdk.Object

type HubConnectivity = cdssdk.HubConnectivity

type ObjectBlock = stgmod.ObjectBlock

type Cache struct {
FileHash cdssdk.FileHash `gorm:"column:FileHash; primaryKey; type: char(68)" json:"fileHash"`
StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type: bigint" json:"storageID"`
CreateTime time.Time `gorm:"column:CreateTime; type:datetime" json:"createTime"`
Priority int `gorm:"column:Priority; type:int" json:"priority"`
}

func (Cache) TableName() string {
return "Cache"
}

type Location struct {
LocationID cdssdk.LocationID `gorm:"column:LocationID; primaryKey; type:bigint; autoIncrement" json:"locationID"`
Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"`
}

func (Location) TableName() string {
return "Location"
}

+ 0
- 511
common/pkgs/db2/object.go View File

@@ -1,511 +0,0 @@
package db2

import (
"fmt"
"strings"
"time"

"gorm.io/gorm"
"gorm.io/gorm/clause"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

type ObjectDB struct {
*DB
}

func (db *DB) Object() *ObjectDB {
return &ObjectDB{DB: db}
}

func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error
return ret, err
}

func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).First(&ret).Error
return ret, err
}

func (db *ObjectDB) GetByFullPath(ctx SQLContext, bktName string, pkgName string, path string) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").
Joins("join Package on Package.PackageID = Object.PackageID and Package.Name = ?", pkgName).
Joins("join Bucket on Bucket.BucketID = Package.BucketID and Bucket.Name = ?", bktName).
Where("Object.Path = ?", path).First(&ret).Error
return ret, err
}

func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).Order("ObjectID ASC").Find(&ret).Error
return ret, err
}

// 查询结果将按照Path升序,而不是ObjectID升序
func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path > ? AND Path LIKE ?", packageID, startPath, pathPrefix+"%").Order("Path ASC").Limit(limit).Find(&ret).Error
return ret, err
}

func (db *ObjectDB) GetByPrefixGrouped(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (objs []cdssdk.Object, commonPrefixes []string, err error) {
type ObjectOrDir struct {
cdssdk.Object
IsObject bool `gorm:"IsObject"`
Prefix string `gorm:"Prefix"`
}

sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1

prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)
grouping := ctx.Table("Object").
Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)).
Where("PackageID = ?", packageID).
Where("Path like ?", pathPrefix+"%").
Group("Prefix, IsObject").
Order("Prefix ASC")

var ret []ObjectOrDir
err = ctx.Table("Object").
Select("Grouped.IsObject, Grouped.Prefix, Object.*").
Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping).
Find(&ret).Error
if err != nil {
return
}

for _, o := range ret {
if o.IsObject {
objs = append(objs, o.Object)
} else {
commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator)
}
}

return
}

func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) {
type ObjectOrDir struct {
cdssdk.Object
IsObject bool `gorm:"IsObject"`
Prefix string `gorm:"Prefix"`
}

sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1

prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)
grouping := ctx.Table("Object").
Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)).
Where("PackageID = ?", packageID).
Where("Path like ?", pathPrefix+"%").
Group("Prefix, IsObject").
Having("Prefix > ?", startPath).
Limit(limit).
Order("Prefix ASC")

var ret []ObjectOrDir
err = ctx.Table("Object").
Select("Grouped.IsObject, Grouped.Prefix, Object.*").
Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping).
Find(&ret).Error
if err != nil {
return
}

for _, o := range ret {
if o.IsObject {
objs = append(objs, o.Object)
} else {
commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator)
}
nextStartPath = o.Prefix
}

return
}

func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (bool, error) {
var obj cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).First(&obj).Error
if err == nil {
return true, nil
}

if err == gorm.ErrRecordNotFound {
return false, nil
}

return false, err
}

func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
if len(objectIDs) == 0 {
return make(map[cdssdk.ObjectID]bool), nil
}

var avaiIDs []cdssdk.ObjectID
err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Pluck("ObjectID", &avaiIDs).Error
if err != nil {
return nil, err
}

avaiIDMap := make(map[cdssdk.ObjectID]bool)
for _, pkgID := range avaiIDs {
avaiIDMap[pkgID] = true
}

return avaiIDMap, nil
}

func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.Object, error) {
if len(objectIDs) == 0 {
return nil, nil
}

var objs []cdssdk.Object
err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, err
}

return objs, nil
}

func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
if len(pathes) == 0 {
return nil, nil
}

var objs []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error
if err != nil {
return nil, err
}

return objs, nil
}

func (db *ObjectDB) GetDetail(ctx SQLContext, objectID cdssdk.ObjectID) (stgmod.ObjectDetail, error) {
var obj cdssdk.Object
err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&obj).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting object: %w", err)
}

// 获取所有的 ObjectBlock
var allBlocks []stgmod.ObjectBlock
err = ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Order("`Index` ASC").Find(&allBlocks).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting all object blocks: %w", err)
}

// 获取所有的 PinnedObject
var allPinnedObjs []cdssdk.PinnedObject
err = ctx.Table("PinnedObject").Where("ObjectID = ?", objectID).Order("ObjectID ASC").Find(&allPinnedObjs).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting all pinned objects: %w", err)
}

pinnedAt := make([]cdssdk.StorageID, len(allPinnedObjs))
for i, po := range allPinnedObjs {
pinnedAt[i] = po.StorageID
}

return stgmod.ObjectDetail{
Object: obj,
Blocks: allBlocks,
PinnedAt: pinnedAt,
}, nil
}

// 仅返回查询到的对象
func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) {
var objs []cdssdk.Object

err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, err
}

// 获取所有的 ObjectBlock
var allBlocks []stgmod.ObjectBlock
err = ctx.Table("ObjectBlock").Where("ObjectID IN ?", objectIDs).Order("ObjectID, `Index` ASC").Find(&allBlocks).Error
if err != nil {
return nil, err
}

// 获取所有的 PinnedObject
var allPinnedObjs []cdssdk.PinnedObject
err = ctx.Table("PinnedObject").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&allPinnedObjs).Error
if err != nil {
return nil, err
}

details := make([]stgmod.ObjectDetail, len(objs))
for i, obj := range objs {
details[i] = stgmod.ObjectDetail{
Object: obj,
}
}

stgmod.DetailsFillObjectBlocks(details, allBlocks)
stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
return details, nil
}

func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
err := ctx.Table("Object").Create(&obj).Error
if err != nil {
return 0, fmt.Errorf("insert object failed, err: %w", err)
}
return obj.ObjectID, nil
}

// 批量创建对象,创建完成后会填充ObjectID。
func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error {
if len(*objs) == 0 {
return nil
}

return ctx.Table("Object").Create(objs).Error
}

// 批量更新对象所有属性,objs中的对象必须包含ObjectID
func (db *ObjectDB) BatchUpdate(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}},
UpdateAll: true,
}).Create(objs).Error
}

// 批量更新对象指定属性,objs中的对象只需设置需要更新的属性即可,但:
// 1. 必须包含ObjectID
// 2. 日期类型属性不能设置为0值
func (db *ObjectDB) BatchUpdateColumns(ctx SQLContext, objs []cdssdk.Object, columns []string) error {
if len(objs) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}},
DoUpdates: clause.AssignmentColumns(columns),
}).Create(objs).Error
}

func (db *ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&ret).Error
return ret, err
}

func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
var objs []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

// 获取所有的 ObjectBlock
var allBlocks []stgmod.ObjectBlock
err = ctx.Table("ObjectBlock").
Select("ObjectBlock.*").
Joins("JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
Where("Object.PackageID = ?", packageID).
Order("ObjectBlock.ObjectID, `Index` ASC").
Find(&allBlocks).Error
if err != nil {
return nil, fmt.Errorf("getting all object blocks: %w", err)
}

// 获取所有的 PinnedObject
var allPinnedObjs []cdssdk.PinnedObject
err = ctx.Table("PinnedObject").
Select("PinnedObject.*").
Joins("JOIN Object ON PinnedObject.ObjectID = Object.ObjectID").
Where("Object.PackageID = ?", packageID).
Order("PinnedObject.ObjectID").
Find(&allPinnedObjs).Error
if err != nil {
return nil, fmt.Errorf("getting all pinned objects: %w", err)
}

details := make([]stgmod.ObjectDetail, len(objs))
for i, obj := range objs {
details[i] = stgmod.ObjectDetail{
Object: obj,
}
}

stgmod.DetailsFillObjectBlocks(details, allBlocks)
stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
return details, nil
}

func (db *ObjectDB) GetObjectsIfAnyBlockOnStorage(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) {
var objs []cdssdk.Object
err := ctx.Table("Object").Where("ObjectID IN (SELECT ObjectID FROM ObjectBlock WHERE StorageID = ?)", stgID).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

return objs, nil
}

func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) {
if len(adds) == 0 {
return nil, nil
}

// 收集所有路径
pathes := make([]string, 0, len(adds))
for _, add := range adds {
pathes = append(pathes, add.Path)
}

// 先查询要更新的对象,不存在也没关系
existsObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
if err != nil {
return nil, fmt.Errorf("batch get object by path: %w", err)
}

existsObjsMap := make(map[string]cdssdk.Object)
for _, obj := range existsObjs {
existsObjsMap[obj.Path] = obj
}

var updatingObjs []cdssdk.Object
var addingObjs []cdssdk.Object
for i := range adds {
o := cdssdk.Object{
PackageID: packageID,
Path: adds[i].Path,
Size: adds[i].Size,
FileHash: adds[i].FileHash,
Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
CreateTime: adds[i].UploadTime,
UpdateTime: adds[i].UploadTime,
}

e, ok := existsObjsMap[adds[i].Path]
if ok {
o.ObjectID = e.ObjectID
o.CreateTime = e.CreateTime
updatingObjs = append(updatingObjs, o)

} else {
addingObjs = append(addingObjs, o)
}
}

// 先进行更新
err = db.BatchUpdate(ctx, updatingObjs)
if err != nil {
return nil, fmt.Errorf("batch update objects: %w", err)
}

// 再执行插入,Create函数插入后会填充ObjectID
err = db.BatchCreate(ctx, &addingObjs)
if err != nil {
return nil, fmt.Errorf("batch create objects: %w", err)
}

// 按照add参数的顺序返回结果
affectedObjsMp := make(map[string]cdssdk.Object)
for _, o := range updatingObjs {
affectedObjsMp[o.Path] = o
}
for _, o := range addingObjs {
affectedObjsMp[o.Path] = o
}
affectedObjs := make([]cdssdk.Object, 0, len(affectedObjsMp))
affectedObjIDs := make([]cdssdk.ObjectID, 0, len(affectedObjsMp))
for i := range adds {
obj := affectedObjsMp[adds[i].Path]
affectedObjs = append(affectedObjs, obj)
affectedObjIDs = append(affectedObjIDs, obj.ObjectID)
}

if len(affectedObjIDs) > 0 {
// 批量删除 ObjectBlock
if err := db.ObjectBlock().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil {
return nil, fmt.Errorf("batch delete object blocks: %w", err)
}

// 批量删除 PinnedObject
if err := db.PinnedObject().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil {
return nil, fmt.Errorf("batch delete pinned objects: %w", err)
}
}

// 创建 ObjectBlock
objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
for i, add := range adds {
for _, stgID := range add.StorageIDs {
objBlocks = append(objBlocks, stgmod.ObjectBlock{
ObjectID: affectedObjIDs[i],
Index: 0,
StorageID: stgID,
FileHash: add.FileHash,
Size: add.Size,
})
}
}
if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
return nil, fmt.Errorf("batch create object blocks: %w", err)
}

// 创建 Cache
caches := make([]model.Cache, 0, len(adds))
for _, add := range adds {
for _, stgID := range add.StorageIDs {
caches = append(caches, model.Cache{
FileHash: add.FileHash,
StorageID: stgID,
CreateTime: time.Now(),
Priority: 0,
})
}
}
if err := db.Cache().BatchCreate(ctx, caches); err != nil {
return nil, fmt.Errorf("batch create caches: %w", err)
}

return affectedObjs, nil
}

func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
if len(ids) == 0 {
return nil
}

return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error
}

func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error
}

func (db *ObjectDB) DeleteByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) error {
return ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Delete(&cdssdk.Object{}).Error
}

func (db *ObjectDB) MoveByPrefix(ctx SQLContext, oldPkgID cdssdk.PackageID, oldPrefix string, newPkgID cdssdk.PackageID, newPrefix string) error {
return ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", oldPkgID, escapeLike("", "%", oldPrefix)).
Updates(map[string]any{
"PackageID": newPkgID,
"Path": gorm.Expr("concat(?, substring(Path, ?))", newPrefix, len(oldPrefix)+1),
}).Error
}

+ 0
- 116
common/pkgs/db2/object_access_stat.go View File

@@ -1,116 +0,0 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type ObjectAccessStatDB struct {
*DB
}

func (db *DB) ObjectAccessStat() *ObjectAccessStatDB {
return &ObjectAccessStatDB{db}
}

func (*ObjectAccessStatDB) Get(ctx SQLContext, objID cdssdk.ObjectID, stgID cdssdk.StorageID) (stgmod.ObjectAccessStat, error) {
var ret stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID = ? AND StorageID = ?", objID, stgID).
First(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) {
var ret []stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID = ?", objID).
Find(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) {
if len(objIDs) == 0 {
return nil, nil
}

var ret []stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID IN ?", objIDs).
Find(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) BatchGetByObjectIDOnStorage(ctx SQLContext, objIDs []cdssdk.ObjectID, stgID cdssdk.StorageID) ([]stgmod.ObjectAccessStat, error) {
if len(objIDs) == 0 {
return nil, nil
}

var ret []stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID IN ? AND StorageID = ?", objIDs, stgID).
Find(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error {
if len(entries) == 0 {
return nil
}

for _, entry := range entries {
acc := stgmod.ObjectAccessStat{
ObjectID: entry.ObjectID,
StorageID: entry.StorageID,
Counter: entry.Counter,
}

err := ctx.Table("ObjectAccessStat").
Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}},
DoUpdates: clause.Assignments(map[string]any{
"Counter": gorm.Expr("Counter + values(Counter)"),
}),
}).Create(&acc).Error
if err != nil {
return err
}
}
return nil
}

func (*ObjectAccessStatDB) BatchUpdateAmountInPackage(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {
if len(pkgIDs) == 0 {
return nil
}

err := ctx.Exec("UPDATE ObjectAccessStat AS o INNER JOIN Object AS obj ON o.ObjectID = obj.ObjectID SET o.Amount = o.Amount * ? + o.Counter * (1 - ?), o.Counter = 0 WHERE obj.PackageID IN ?", historyWeight, historyWeight, pkgIDs).Error
return err
}

func (*ObjectAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error {
err := ctx.Exec("UPDATE ObjectAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0", historyWeight, historyWeight).Error
return err
}

func (*ObjectAccessStatDB) DeleteByObjectID(ctx SQLContext, objID cdssdk.ObjectID) error {
err := ctx.Table("ObjectAccessStat").Where("ObjectID = ?", objID).Delete(nil).Error
return err
}

func (*ObjectAccessStatDB) BatchDeleteByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) error {
if len(objIDs) == 0 {
return nil
}

err := ctx.Table("ObjectAccessStat").Where("ObjectID IN ?", objIDs).Delete(nil).Error
return err
}

func (*ObjectAccessStatDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
err := ctx.Exec("DELETE o FROM ObjectAccessStat o INNER JOIN Object obj ON o.ObjectID = obj.ObjectID WHERE obj.PackageID = ?", packageID).Error
return err
}

+ 0
- 122
common/pkgs/db2/object_block.go View File

@@ -1,122 +0,0 @@
package db2

import (
"strconv"
"strings"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gorm.io/gorm/clause"
)

type ObjectBlockDB struct {
*DB
}

func (db *DB) ObjectBlock() *ObjectBlockDB {
return &ObjectBlockDB{DB: db}
}

func (db *ObjectBlockDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]stgmod.ObjectBlock, error) {
var rets []stgmod.ObjectBlock
err := ctx.Table("ObjectBlock").Where("StorageID = ?", stgID).Find(&rets).Error
return rets, err
}

func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectBlock, error) {
if len(objectIDs) == 0 {
return nil, nil
}

var blocks []stgmod.ObjectBlock
err := ctx.Table("ObjectBlock").Where("ObjectID IN (?)", objectIDs).Order("ObjectID, `Index` ASC").Find(&blocks).Error
return blocks, err
}

func (*ObjectBlockDB) GetInPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectBlock, error) {
var rets []stgmod.ObjectBlock
err := ctx.Table("ObjectBlock").
Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
Where("Object.PackageID = ?", packageID).
Order("ObjectBlock.ObjectID, ObjectBlock.`Index` ASC").
Find(&rets).Error
return rets, err
}

func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash, size int64) error {
block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash, Size: size}
return ctx.Table("ObjectBlock").Create(&block).Error
}

func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error {
if len(blocks) == 0 {
return nil
}

return ctx.Clauses(clause.Insert{Modifier: "ignore"}).Create(&blocks).Error
}

func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
return ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) DeleteByObjectIDIndex(ctx SQLContext, objectID cdssdk.ObjectID, index int) error {
return ctx.Table("ObjectBlock").Where("ObjectID = ? AND `Index` = ?", objectID, index).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error {
if len(objectIDs) == 0 {
return nil
}

return ctx.Table("ObjectBlock").Where("ObjectID IN (?)", objectIDs).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
return ctx.Table("ObjectBlock").Where("ObjectID IN (SELECT ObjectID FROM Object WHERE PackageID = ?)", packageID).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, fileHashes []cdssdk.FileHash) error {
if len(fileHashes) == 0 {
return nil
}

return ctx.Table("ObjectBlock").Where("StorageID = ? AND FileHash IN (?)", stgID, fileHashes).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) {
var cnt int64
err := ctx.Table("ObjectBlock").
Select("COUNT(FileHash)").
Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
Joins("INNER JOIN Package ON Object.PackageID = Package.PackageID").
Where("FileHash = ? AND Package.State = ?", fileHash, cdssdk.PackageStateNormal).
Scan(&cnt).Error

if err != nil {
return 0, err
}

return int(cnt), nil
}

// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
func splitConcatedHubID(idStr string) []cdssdk.HubID {
idStrs := strings.Split(idStr, ",")
ids := make([]cdssdk.HubID, 0, len(idStrs))

for _, str := range idStrs {
// 假设传入的ID是正确的数字格式
id, _ := strconv.ParseInt(str, 10, 64)
ids = append(ids, cdssdk.HubID(id))
}

return ids
}

// 按逗号切割字符串
func splitConcatedFileHash(idStr string) []string {
idStrs := strings.Split(idStr, ",")
return idStrs
}

+ 0
- 209
common/pkgs/db2/package.go View File

@@ -1,209 +0,0 @@
package db2

import (
"fmt"
"time"

"gorm.io/gorm"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type PackageDB struct {
*DB
}

func (db *DB) Package() *PackageDB {
return &PackageDB{DB: db}
}

func (db *PackageDB) GetByID(ctx SQLContext, packageID cdssdk.PackageID) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").Where("PackageID = ?", packageID).First(&ret).Error
return ret, err
}

func (db *PackageDB) GetByName(ctx SQLContext, bucketID cdssdk.BucketID, name string) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").Where("BucketID = ? AND Name = ?", bucketID, name).First(&ret).Error
return ret, err
}

func (db *PackageDB) BatchTestPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) (map[cdssdk.PackageID]bool, error) {
if len(pkgIDs) == 0 {
return make(map[cdssdk.PackageID]bool), nil
}

var avaiIDs []cdssdk.PackageID
err := ctx.Table("Package").
Select("PackageID").
Where("PackageID IN ?", pkgIDs).
Find(&avaiIDs).Error
if err != nil {
return nil, err
}

avaiIDMap := make(map[cdssdk.PackageID]bool)
for _, pkgID := range avaiIDs {
avaiIDMap[pkgID] = true
}

return avaiIDMap, nil
}

func (*PackageDB) BatchGetAllPackageIDs(ctx SQLContext, start int, count int) ([]cdssdk.PackageID, error) {
var ret []cdssdk.PackageID
err := ctx.Table("Package").Select("PackageID").Limit(count).Offset(start).Find(&ret).Error
return ret, err
}

func (db *PackageDB) GetUserBucketPackages(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]model.Package, error) {
var ret []model.Package
err := ctx.Table("UserBucket").
Select("Package.*").
Joins("JOIN Package ON UserBucket.BucketID = Package.BucketID").
Where("UserBucket.UserID = ? AND UserBucket.BucketID = ?", userID, bucketID).
Find(&ret).Error
return ret, err
}

func (db *PackageDB) GetBucketPackages(ctx SQLContext, bucketID cdssdk.BucketID) ([]model.Package, error) {
var ret []model.Package
err := ctx.Table("Package").
Select("Package.*").
Where("BucketID = ?", bucketID).
Find(&ret).Error
return ret, err
}

func (db *PackageDB) GetBucketPackagesByName(ctx SQLContext, bucketName string) ([]model.Package, error) {
var ret []model.Package
err := ctx.Table("Package").
Select("Package.*").
Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID").
Where("Bucket.Name = ?", bucketName).
Find(&ret).Error
return ret, err
}

// IsAvailable 判断一个用户是否拥有指定对象
func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) {
var pkgID cdssdk.PackageID
err := ctx.Table("Package").
Select("Package.PackageID").
Joins("JOIN UserBucket ON Package.BucketID = UserBucket.BucketID").
Where("Package.PackageID = ? AND UserBucket.UserID = ?", packageID, userID).
Scan(&pkgID).Error

if err == gorm.ErrRecordNotFound {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find package failed, err: %w", err)
}

return true, nil
}

// GetUserPackage 获得Package,如果用户没有权限访问,则不会获得结果
func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").
Select("Package.*").
Joins("JOIN UserBucket ON Package.BucketID = UserBucket.BucketID").
Where("Package.PackageID = ? AND UserBucket.UserID = ?", packageID, userID).
First(&ret).Error
return ret, err
}

// 在指定名称的Bucket中查找指定名称的Package
func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, bucketName string, packageName string) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").
Select("Package.*").
Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID").
Joins("JOIN UserBucket ON Bucket.BucketID = UserBucket.BucketID").
Where("Package.Name = ? AND Bucket.Name = ? AND UserBucket.UserID = ?", packageName, bucketName, userID).
First(&ret).Error
return ret, err
}

func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) {
var packageID int64
err := ctx.Table("Package").
Select("PackageID").
Where("Name = ? AND BucketID = ?", name, bucketID).
Scan(&packageID).Error

if err != nil {
return cdssdk.Package{}, err
}
if packageID != 0 {
return cdssdk.Package{}, gorm.ErrDuplicatedKey
}

newPackage := cdssdk.Package{Name: name, BucketID: bucketID, CreateTime: time.Now(), State: cdssdk.PackageStateNormal}
if err := ctx.Create(&newPackage).Error; err != nil {
return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err)
}

return newPackage, nil
}

func (*PackageDB) Delete(ctx SQLContext, packageID cdssdk.PackageID) error {
err := ctx.Delete(&model.Package{}, "PackageID = ?", packageID).Error
return err
}

// 删除与Package相关的所有数据
func (db *PackageDB) DeleteComplete(ctx SQLContext, packageID cdssdk.PackageID) error {
if err := db.Package().Delete(ctx, packageID); err != nil {
return fmt.Errorf("delete package state: %w", err)
}

if err := db.ObjectAccessStat().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("delete from object access stat: %w", err)
}

if err := db.ObjectBlock().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("delete from object block failed, err: %w", err)
}

if err := db.PinnedObject().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("deleting pinned objects in package: %w", err)
}

if err := db.Object().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("deleting objects in package: %w", err)
}

if err := db.PackageAccessStat().DeleteByPackageID(ctx, packageID); err != nil {
return fmt.Errorf("deleting package access stat: %w", err)
}

return nil
}

func (*PackageDB) ChangeState(ctx SQLContext, packageID cdssdk.PackageID, state string) error {
err := ctx.Exec("UPDATE Package SET State = ? WHERE PackageID = ?", state, packageID).Error
return err
}

func (*PackageDB) HasPackageIn(ctx SQLContext, bucketID cdssdk.BucketID) (bool, error) {
var pkg cdssdk.Package
err := ctx.Table("Package").Where("BucketID = ?", bucketID).First(&pkg).Error
if err == gorm.ErrRecordNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}

func (*PackageDB) Move(ctx SQLContext, packageID cdssdk.PackageID, newBktID cdssdk.BucketID, newName string) error {
err := ctx.Table("Package").Where("PackageID = ?", packageID).Update("BucketID", newBktID).Update("Name", newName).Error
return err
}

+ 0
- 79
common/pkgs/db2/package_access_stat.go View File

@@ -1,79 +0,0 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type PackageAccessStatDB struct {
*DB
}

func (db *DB) PackageAccessStat() *PackageAccessStatDB {
return &PackageAccessStatDB{db}
}

func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, stgID cdssdk.StorageID) (stgmod.PackageAccessStat, error) {
var ret stgmod.PackageAccessStat
err := ctx.Table("PackageAccessStat").Where("PackageID = ? AND StorageID = ?", pkgID, stgID).First(&ret).Error
return ret, err
}

func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) {
var ret []stgmod.PackageAccessStat
err := ctx.Table("PackageAccessStat").Where("PackageID = ?", pkgID).Find(&ret).Error
return ret, err
}

func (*PackageAccessStatDB) BatchGetByPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) {
if len(pkgIDs) == 0 {
return nil, nil
}

var ret []stgmod.PackageAccessStat
err := ctx.Table("PackageAccessStat").Where("PackageID IN (?)", pkgIDs).Find(&ret).Error
return ret, err
}

func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error {
if len(entries) == 0 {
return nil
}

accs := make([]stgmod.PackageAccessStat, len(entries))
for i, e := range entries {
accs[i] = stgmod.PackageAccessStat{
PackageID: e.PackageID,
StorageID: e.StorageID,
Counter: e.Counter,
}
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "PackageID"}, {Name: "StorageID"}},
DoUpdates: clause.Assignments(map[string]any{
"Counter": gorm.Expr("Counter + values(Counter)"),
}),
}).Table("PackageAccessStat").Create(&accs).Error
}

func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {
if len(pkgIDs) == 0 {
return nil
}

sql := "UPDATE PackageAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0 WHERE PackageID IN (?)"
return ctx.Exec(sql, historyWeight, historyWeight, pkgIDs).Error
}

func (*PackageAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error {
sql := "UPDATE PackageAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0"
return ctx.Exec(sql, historyWeight, historyWeight).Error
}

func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) error {
return ctx.Table("PackageAccessStat").Where("PackageID = ?", pkgID).Delete(&stgmod.PackageAccessStat{}).Error
}

+ 0
- 122
common/pkgs/db2/pinned_object.go View File

@@ -1,122 +0,0 @@
package db2

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gorm.io/gorm/clause"
)

type PinnedObjectDB struct {
*DB
}

func (db *DB) PinnedObject() *PinnedObjectDB {
return &PinnedObjectDB{DB: db}
}

func (*PinnedObjectDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.PinnedObject, error) {
var ret []cdssdk.PinnedObject
err := ctx.Table("PinnedObject").Find(&ret, "StorageID = ?", stgID).Error
return ret, err
}

func (*PinnedObjectDB) GetObjectsByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Joins("inner join PinnedObject on Object.ObjectID = PinnedObject.ObjectID").Where("StorageID = ?", stgID).Find(&ret).Error
return ret, err
}

func (*PinnedObjectDB) Create(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error {
return ctx.Table("PinnedObject").Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error
}

func (*PinnedObjectDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.PinnedObject, error) {
if len(objectIDs) == 0 {
return nil, nil
}

var pinneds []cdssdk.PinnedObject
err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Order("ObjectID asc").Find(&pinneds).Error
return pinneds, err
}

func (*PinnedObjectDB) TryCreate(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error {
return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}},
DoUpdates: clause.AssignmentColumns([]string{"CreateTime"}),
}).Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error
}

func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error {
if len(pinneds) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}},
DoUpdates: clause.AssignmentColumns([]string{"CreateTime"}),
}).Create(&pinneds).Error
}

func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error {
err := ctx.Exec(
"insert ignore into PinnedObject(StorageID, ObjectID, CreateTime) select ? as StorageID, ObjectID, ? as CreateTime from Object where PackageID = ?",
stgID,
time.Now(),
packageID,
).Error
return err
}

func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, stgIDs []cdssdk.StorageID) error {
if len(stgIDs) == 0 {
return nil
}

for _, id := range stgIDs {
err := db.TryCreate(ctx, id, objectID, time.Now())
if err != nil {
return err
}
}
return nil
}

func (*PinnedObjectDB) Delete(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID) error {
err := ctx.Exec("delete from PinnedObject where StorageID = ? and ObjectID = ?", stgID, objectID).Error
return err
}

func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
err := ctx.Exec("delete from PinnedObject where ObjectID = ?", objectID).Error
return err
}

func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error {
if len(objectIDs) == 0 {
return nil
}

err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Delete(&cdssdk.PinnedObject{}).Error
return err
}

func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
err := ctx.Table("PinnedObject").Where("ObjectID in (select ObjectID from Object where PackageID = ?)", packageID).Delete(&cdssdk.PinnedObject{}).Error
return err
}

func (*PinnedObjectDB) DeleteInPackageAtStorage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error {
err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and StorageID = ?", packageID, stgID).Error
return err
}

func (*PinnedObjectDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, objectIDs []cdssdk.ObjectID) error {
if len(objectIDs) == 0 {
return nil
}

err := ctx.Table("PinnedObject").Where("StorageID = ? and ObjectID in (?)", stgID, objectIDs).Delete(&cdssdk.PinnedObject{}).Error
return err
}

+ 0
- 120
common/pkgs/db2/storage.go View File

@@ -1,120 +0,0 @@
package db2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gorm.io/gorm"
)

type StorageDB struct {
*DB
}

func (db *DB) Storage() *StorageDB {
return &StorageDB{DB: db}
}

func (db *StorageDB) GetByID(ctx SQLContext, stgID cdssdk.StorageID) (model.Storage, error) {
var stg model.Storage
err := ctx.Table("Storage").First(&stg, stgID).Error
return stg, err
}

func (StorageDB) GetAllIDs(ctx SQLContext) ([]cdssdk.StorageID, error) {
var stgs []cdssdk.StorageID
err := ctx.Table("Storage").Select("StorageID").Find(&stgs).Error
return stgs, err
}

func (db *StorageDB) BatchGetByID(ctx SQLContext, stgIDs []cdssdk.StorageID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Find(&stgs, "StorageID IN (?)", stgIDs).Error
return stgs, err
}

func (db *StorageDB) GetUserStorages(ctx SQLContext, userID cdssdk.UserID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Select("Storage.*").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ?", userID).Find(&stgs).Error
return stgs, err
}

func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cdssdk.StorageID, error) {
var ret []cdssdk.StorageID
err := ctx.Table("Storage").Select("StorageID").Find(&ret).Limit(count).Offset(start).Error
return ret, err
}

func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) {
rows, err := ctx.Table("Storage").Select("Storage.StorageID").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ? and Storage.StorageID = ?", userID, storageID).Rows()
if err != nil {
return false, fmt.Errorf("execute sql: %w", err)
}
defer rows.Close()

return rows.Next(), nil
}

func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (model.Storage, error) {
var stg model.Storage
err := ctx.Table("Storage").Select("Storage.*").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ? and Storage.StorageID = ?", userID, storageID).First(&stg).Error

return stg, err
}

func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID, name string) (model.Storage, error) {
var stg model.Storage
err := ctx.Table("Storage").Select("Storage.*").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ? and Name = ?", userID, name).First(&stg).Error

return stg, err
}

func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cdssdk.HubID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "MasterHub = ?", hubID).Error
return stgs, err
}

func (db *StorageDB) FillDetails(ctx SQLContext, details []stgmod.StorageDetail) error {
stgsMp := make(map[cdssdk.StorageID]*stgmod.StorageDetail)
var masterHubIDs []cdssdk.HubID
for i := range details {
stgsMp[details[i].Storage.StorageID] = &details[i]
masterHubIDs = append(masterHubIDs, details[i].Storage.MasterHub)
}

// 获取监护Hub信息
masterHubs, err := db.Hub().BatchGetByID(ctx, masterHubIDs)
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("getting master hub: %w", err)
}
masterHubMap := make(map[cdssdk.HubID]cdssdk.Hub)
for _, hub := range masterHubs {
masterHubMap[hub.HubID] = hub
}
for _, stg := range stgsMp {
if stg.Storage.MasterHub != 0 {
hub, ok := masterHubMap[stg.Storage.MasterHub]
if !ok {
logger.Warnf("master hub %v of storage %v not found, this storage will not be add to result", stg.Storage.MasterHub, stg.Storage)
delete(stgsMp, stg.Storage.StorageID)
continue
}

stg.MasterHub = &hub
}
}

return nil
}

+ 0
- 44
common/pkgs/db2/union_serializer.go View File

@@ -1,44 +0,0 @@
package db2

import (
"context"
"fmt"
"reflect"

"gitlink.org.cn/cloudream/common/utils/serder"
"gorm.io/gorm/schema"
)

type UnionSerializer struct {
}

func (UnionSerializer) Scan(ctx context.Context, field *schema.Field, dst reflect.Value, dbValue interface{}) error {
fieldValue := reflect.New(field.FieldType)
if dbValue != nil {
var data []byte
switch v := dbValue.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return fmt.Errorf("failed to unmarshal JSONB value: %#v", dbValue)
}

err := serder.JSONToObjectExRaw(data, fieldValue.Interface())
if err != nil {
return err
}
}

field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem())
return nil
}

func (UnionSerializer) Value(ctx context.Context, field *schema.Field, dst reflect.Value, fieldValue interface{}) (interface{}, error) {
return serder.ObjectToJSONEx(fieldValue)
}

func init() {
schema.RegisterSerializer("union", UnionSerializer{})
}

+ 0
- 44
common/pkgs/db2/user.go View File

@@ -1,44 +0,0 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gorm.io/gorm"
)

type UserDB struct {
*DB
}

func (db *DB) User() *UserDB {
return &UserDB{DB: db}
}

func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (cdssdk.User, error) {
var ret cdssdk.User
err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error
return ret, err
}

func (db *UserDB) GetByName(ctx SQLContext, name string) (cdssdk.User, error) {
var ret cdssdk.User
err := ctx.Table("User").Where("Name = ?", name).First(&ret).Error
return ret, err
}

func (db *UserDB) Create(ctx SQLContext, name string) (cdssdk.User, error) {
_, err := db.GetByName(ctx, name)
if err == nil {
return cdssdk.User{}, gorm.ErrDuplicatedKey
}
if err != gorm.ErrRecordNotFound {
return cdssdk.User{}, err
}

user := cdssdk.User{Name: name}
err = ctx.Table("User").Create(&user).Error
return user, err
}

func (*UserDB) Delete(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("User").Delete(&cdssdk.User{UserID: userID}).Error
}

+ 0
- 36
common/pkgs/db2/user_bucket.go View File

@@ -1,36 +0,0 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type UserBucketDB struct {
*DB
}

func (db *DB) UserBucket() *UserBucketDB {
return &UserBucketDB{DB: db}
}

func (*UserBucketDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserBucket, error) {
var userBuckets []model.UserBucket
err := ctx.Table("UserBucket").Where("UserID = ?", userID).Find(&userBuckets).Error
return userBuckets, err
}

func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) error {
userBucket := model.UserBucket{
UserID: userID,
BucketID: bucketID,
}
return ctx.Table("UserBucket").Create(&userBucket).Error
}

func (*UserBucketDB) DeleteByBucketID(ctx SQLContext, bucketID cdssdk.BucketID) error {
return ctx.Table("UserBucket").Where("BucketID = ?", bucketID).Delete(&model.UserBucket{}).Error
}

func (*UserBucketDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("UserBucket").Where("UserID = ?", userID).Delete(&model.UserBucket{}).Error
}

+ 0
- 34
common/pkgs/db2/user_hub.go View File

@@ -1,34 +0,0 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type UserHubDB struct {
*DB
}

func (db *DB) UserHub() *UserHubDB {
return &UserHubDB{db}
}

func (*UserHubDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserHub, error) {
var userHubs []model.UserHub
if err := ctx.Table("UserHub").Where("UserID = ?", userID).Find(&userHubs).Error; err != nil {
return nil, err
}

return userHubs, nil
}

func (*UserHubDB) Create(ctx SQLContext, userID cdssdk.UserID, hubID cdssdk.HubID) error {
return ctx.Table("UserHub").Create(&model.UserHub{
UserID: userID,
HubID: hubID,
}).Error
}

func (*UserHubDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("UserHub").Delete(&model.UserHub{}, "UserID = ?", userID).Error
}

+ 0
- 34
common/pkgs/db2/user_storage.go View File

@@ -1,34 +0,0 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type UserStorageDB struct {
*DB
}

func (db *DB) UserStorage() *UserStorageDB {
return &UserStorageDB{db}
}

func (*UserStorageDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserStorage, error) {
var userStgs []model.UserStorage
if err := ctx.Table("UserStorage").Where("UserID = ?", userID).Find(&userStgs).Error; err != nil {
return nil, err
}

return userStgs, nil
}

func (*UserStorageDB) Create(ctx SQLContext, userID cdssdk.UserID, stgID cdssdk.StorageID) error {
return ctx.Table("UserStorage").Create(&model.UserStorage{
UserID: userID,
StorageID: stgID,
}).Error
}

func (*UserStorageDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("UserStorage").Delete(&model.UserStorage{}, "UserID = ?", userID).Error
}

+ 0
- 81
common/pkgs/db2/utils.go View File

@@ -1,81 +0,0 @@
package db2

import (
"strings"

"gorm.io/gorm"
)

const (
maxPlaceholderCount = 65535
)

func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(result *gorm.DB) bool) error {
if argCnt == 0 {
result := ctx.Exec(sql, toInterfaceSlice(arr)...)
if result.Error != nil {
return result.Error
}

if callback != nil {
callback(result)
}

return nil
}

batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := min(batchSize, len(arr))

result := ctx.Exec(sql, toInterfaceSlice(arr[:curBatchSize])...)
if result.Error != nil {
return result.Error
}
if callback != nil && !callback(result) {
return nil
}

arr = arr[curBatchSize:]
}

return nil
}

// 将 []T 转换为 []interface{}
func toInterfaceSlice[T any](arr []T) []interface{} {
interfaceSlice := make([]interface{}, len(arr))
for i, v := range arr {
interfaceSlice[i] = v
}
return interfaceSlice
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

func escapeLike(left, right, word string) string {
var n int
for i := range word {
if c := word[i]; c == '%' || c == '_' || c == '\\' {
n++
}
}
// No characters to escape.
if n == 0 {
return left + word + right
}
var b strings.Builder
b.Grow(len(word) + n)
for _, c := range word {
if c == '%' || c == '_' || c == '\\' {
b.WriteByte('\\')
}
b.WriteRune(c)
}
return left + b.String() + right
}

+ 0
- 4
common/pkgs/mq/coordinator/agent.go View File

@@ -1,4 +0,0 @@
package coordinator

type AgentService interface {
}

+ 0
- 154
common/pkgs/mq/coordinator/bucket.go View File

@@ -1,154 +0,0 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type BucketService interface {
GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, *mq.CodeMessage)

GetUserBuckets(msg *GetUserBuckets) (*GetUserBucketsResp, *mq.CodeMessage)

GetBucketPackages(msg *GetBucketPackages) (*GetBucketPackagesResp, *mq.CodeMessage)

CreateBucket(msg *CreateBucket) (*CreateBucketResp, *mq.CodeMessage)

DeleteBucket(msg *DeleteBucket) (*DeleteBucketResp, *mq.CodeMessage)
}

// 根据桶名获取桶
var _ = Register(Service.GetBucketByName)

type GetBucketByName struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Name string `json:"name"`
}
type GetBucketByNameResp struct {
mq.MessageBodyBase
Bucket cdssdk.Bucket `json:"bucket"`
}

func ReqGetBucketByName(userID cdssdk.UserID, name string) *GetBucketByName {
return &GetBucketByName{
UserID: userID,
Name: name,
}
}
func RespGetBucketByName(bucket cdssdk.Bucket) *GetBucketByNameResp {
return &GetBucketByNameResp{
Bucket: bucket,
}
}
func (client *Client) GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, error) {
return mq.Request(Service.GetBucketByName, client.rabbitCli, msg)
}

// 获取用户所有的桶
var _ = Register(Service.GetUserBuckets)

type GetUserBuckets struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
}
type GetUserBucketsResp struct {
mq.MessageBodyBase
Buckets []model.Bucket `json:"buckets"`
}

func NewGetUserBuckets(userID cdssdk.UserID) *GetUserBuckets {
return &GetUserBuckets{
UserID: userID,
}
}
func NewGetUserBucketsResp(buckets []model.Bucket) *GetUserBucketsResp {
return &GetUserBucketsResp{
Buckets: buckets,
}
}
func (client *Client) GetUserBuckets(msg *GetUserBuckets) (*GetUserBucketsResp, error) {
return mq.Request(Service.GetUserBuckets, client.rabbitCli, msg)
}

// 获取桶中的所有Package
var _ = Register(Service.GetBucketPackages)

type GetBucketPackages struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
BucketID cdssdk.BucketID `json:"bucketID"`
}
type GetBucketPackagesResp struct {
mq.MessageBodyBase
Packages []model.Package `json:"packages"`
}

func NewGetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) *GetBucketPackages {
return &GetBucketPackages{
UserID: userID,
BucketID: bucketID,
}
}
func NewGetBucketPackagesResp(packages []model.Package) *GetBucketPackagesResp {
return &GetBucketPackagesResp{
Packages: packages,
}
}
func (client *Client) GetBucketPackages(msg *GetBucketPackages) (*GetBucketPackagesResp, error) {
return mq.Request(Service.GetBucketPackages, client.rabbitCli, msg)
}

// 创建桶
var _ = Register(Service.CreateBucket)

type CreateBucket struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
BucketName string `json:"bucketName"`
}
type CreateBucketResp struct {
mq.MessageBodyBase
Bucket cdssdk.Bucket `json:"bucket"`
}

func NewCreateBucket(userID cdssdk.UserID, bucketName string) *CreateBucket {
return &CreateBucket{
UserID: userID,
BucketName: bucketName,
}
}
func NewCreateBucketResp(bucket cdssdk.Bucket) *CreateBucketResp {
return &CreateBucketResp{
Bucket: bucket,
}
}
func (client *Client) CreateBucket(msg *CreateBucket) (*CreateBucketResp, error) {
return mq.Request(Service.CreateBucket, client.rabbitCli, msg)
}

// 删除桶
var _ = Register(Service.DeleteBucket)

type DeleteBucket struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
BucketID cdssdk.BucketID `json:"bucketID"`
}
type DeleteBucketResp struct {
mq.MessageBodyBase
}

func NewDeleteBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) *DeleteBucket {
return &DeleteBucket{
UserID: userID,
BucketID: bucketID,
}
}
func NewDeleteBucketResp() *DeleteBucketResp {
return &DeleteBucketResp{}
}
func (client *Client) DeleteBucket(msg *DeleteBucket) (*DeleteBucketResp, error) {
return mq.Request(Service.DeleteBucket, client.rabbitCli, msg)
}

+ 0
- 62
common/pkgs/mq/coordinator/cache.go View File

@@ -1,62 +0,0 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type CacheService interface {
CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage)

CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, *mq.CodeMessage)
}

// Package的Object移动到了节点的Cache中
var _ = Register(Service.CachePackageMoved)

type CachePackageMoved struct {
mq.MessageBodyBase
PackageID cdssdk.PackageID `json:"packageID"`
StorageID cdssdk.StorageID `json:"storageID"`
}
type CachePackageMovedResp struct {
mq.MessageBodyBase
}

func NewCachePackageMoved(packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CachePackageMoved {
return &CachePackageMoved{
PackageID: packageID,
StorageID: stgID,
}
}
func NewCachePackageMovedResp() *CachePackageMovedResp {
return &CachePackageMovedResp{}
}
func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) {
return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg)
}

// 删除移动到指定节点Cache中的Package
var _ = Register(Service.CacheRemovePackage)

type CacheRemovePackage struct {
mq.MessageBodyBase
PackageID cdssdk.PackageID `json:"packageID"`
StorageID cdssdk.StorageID `json:"storageID"`
}
type CacheRemovePackageResp struct {
mq.MessageBodyBase
}

func ReqCacheRemoveMovedPackage(packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CacheRemovePackage {
return &CacheRemovePackage{
PackageID: packageID,
StorageID: stgID,
}
}
func RespCacheRemovePackage() *CacheRemovePackageResp {
return &CacheRemovePackageResp{}
}
func (client *Client) CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, error) {
return mq.Request(Service.CacheRemovePackage, client.rabbitCli, msg)
}

+ 10
- 93
common/pkgs/mq/coordinator/hub.go View File

@@ -2,73 +2,13 @@ package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

type HubService interface {
GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, *mq.CodeMessage)

GetUserHubs(msg *GetUserHubs) (*GetUserHubsResp, *mq.CodeMessage)

GetHubs(msg *GetHubs) (*GetHubsResp, *mq.CodeMessage)

GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *mq.CodeMessage)

UpdateHubConnectivities(msg *UpdateHubConnectivities) (*UpdateHubConnectivitiesResp, *mq.CodeMessage)
}

var _ = Register(Service.GetHubConfig)

type GetHubConfig struct {
mq.MessageBodyBase
HubID cdssdk.HubID `json:"hubID"`
}
type GetHubConfigResp struct {
mq.MessageBodyBase
Hub cdssdk.Hub `json:"hub"`
Storages []stgmod.StorageDetail `json:"storages"`
}

func ReqGetHubConfig(hubID cdssdk.HubID) *GetHubConfig {
return &GetHubConfig{
HubID: hubID,
}
}
func RespGetHubConfig(hub cdssdk.Hub, storages []stgmod.StorageDetail) *GetHubConfigResp {
return &GetHubConfigResp{
Hub: hub,
Storages: storages,
}
}
func (client *Client) GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, error) {
return mq.Request(Service.GetHubConfig, client.rabbitCli, msg)
}

// 查询用户可用的节点
var _ = Register(Service.GetUserHubs)

type GetUserHubs struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
}
type GetUserHubsResp struct {
mq.MessageBodyBase
Hubs []cdssdk.Hub `json:"hubs"`
}

func NewGetUserHubs(userID cdssdk.UserID) *GetUserHubs {
return &GetUserHubs{
UserID: userID,
}
}
func NewGetUserHubsResp(hubs []cdssdk.Hub) *GetUserHubsResp {
return &GetUserHubsResp{
Hubs: hubs,
}
}
func (client *Client) GetUserHubs(msg *GetUserHubs) (*GetUserHubsResp, error) {
return mq.Request(Service.GetUserHubs, client.rabbitCli, msg)
}

// 获取指定节点的信息。如果HubIDs为nil,则返回所有Hub
@@ -76,24 +16,24 @@ var _ = Register(Service.GetHubs)

type GetHubs struct {
mq.MessageBodyBase
HubIDs []cdssdk.HubID `json:"hubIDs"`
HubIDs []cortypes.HubID `json:"hubIDs"`
}
type GetHubsResp struct {
mq.MessageBodyBase
Hubs []*cdssdk.Hub `json:"hubs"`
Hubs []*cortypes.Hub `json:"hubs"`
}

func NewGetHubs(hubIDs []cdssdk.HubID) *GetHubs {
func NewGetHubs(hubIDs []cortypes.HubID) *GetHubs {
return &GetHubs{
HubIDs: hubIDs,
}
}
func NewGetHubsResp(hubs []*cdssdk.Hub) *GetHubsResp {
func NewGetHubsResp(hubs []*cortypes.Hub) *GetHubsResp {
return &GetHubsResp{
Hubs: hubs,
}
}
func (r *GetHubsResp) GetHub(id cdssdk.HubID) *cdssdk.Hub {
func (r *GetHubsResp) GetHub(id cortypes.HubID) *cortypes.Hub {
for _, n := range r.Hubs {
if n.HubID == id {
return n
@@ -111,19 +51,19 @@ var _ = Register(Service.GetHubConnectivities)

type GetHubConnectivities struct {
mq.MessageBodyBase
HubIDs []cdssdk.HubID `json:"hubIDs"`
HubIDs []cortypes.HubID `json:"hubIDs"`
}
type GetHubConnectivitiesResp struct {
mq.MessageBodyBase
Connectivities []cdssdk.HubConnectivity `json:"hubs"`
Connectivities []cortypes.HubConnectivity `json:"hubs"`
}

func ReqGetHubConnectivities(hubIDs []cdssdk.HubID) *GetHubConnectivities {
func ReqGetHubConnectivities(hubIDs []cortypes.HubID) *GetHubConnectivities {
return &GetHubConnectivities{
HubIDs: hubIDs,
}
}
func RespGetHubConnectivities(cons []cdssdk.HubConnectivity) *GetHubConnectivitiesResp {
func RespGetHubConnectivities(cons []cortypes.HubConnectivity) *GetHubConnectivitiesResp {
return &GetHubConnectivitiesResp{
Connectivities: cons,
}
@@ -131,26 +71,3 @@ func RespGetHubConnectivities(cons []cdssdk.HubConnectivity) *GetHubConnectiviti
func (client *Client) GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, error) {
return mq.Request(Service.GetHubConnectivities, client.rabbitCli, msg)
}

// 批量更新节点连通性信息
var _ = Register(Service.UpdateHubConnectivities)

type UpdateHubConnectivities struct {
mq.MessageBodyBase
Connectivities []cdssdk.HubConnectivity `json:"connectivities"`
}
type UpdateHubConnectivitiesResp struct {
mq.MessageBodyBase
}

func ReqUpdateHubConnectivities(cons []cdssdk.HubConnectivity) *UpdateHubConnectivities {
return &UpdateHubConnectivities{
Connectivities: cons,
}
}
func RespUpdateHubConnectivities() *UpdateHubConnectivitiesResp {
return &UpdateHubConnectivitiesResp{}
}
func (client *Client) UpdateHubConnectivities(msg *UpdateHubConnectivities) (*UpdateHubConnectivitiesResp, error) {
return mq.Request(Service.UpdateHubConnectivities, client.rabbitCli, msg)
}

+ 0
- 400
common/pkgs/mq/coordinator/object.go View File

@@ -1,400 +0,0 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"

stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type ObjectService interface {
GetObjects(msg *GetObjects) (*GetObjectsResp, *mq.CodeMessage)

ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, *mq.CodeMessage)

GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage)

GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage)

GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, *mq.CodeMessage)

UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, *mq.CodeMessage)

UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage)

MoveObjects(msg *MoveObjects) (*MoveObjectsResp, *mq.CodeMessage)

DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage)

CloneObjects(msg *CloneObjects) (*CloneObjectsResp, *mq.CodeMessage)

GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, *mq.CodeMessage)

AddAccessStat(msg *AddAccessStat)

NewMultipartUploadObject(msg *NewMultipartUploadObject) (*NewMultipartUploadObjectResp, *mq.CodeMessage)

AddMultipartUploadPart(msg *AddMultipartUploadPart) (*AddMultipartUploadPartResp, *mq.CodeMessage)
}

var _ = Register(Service.GetObjects)

type GetObjects struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
}
type GetObjectsResp struct {
mq.MessageBodyBase
Objects []*cdssdk.Object `json:"objects"`
}

func ReqGetObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *GetObjects {
return &GetObjects{
UserID: userID,
ObjectIDs: objectIDs,
}
}
func RespGetObjects(objects []*cdssdk.Object) *GetObjectsResp {
return &GetObjectsResp{
Objects: objects,
}
}
func (client *Client) GetObjects(msg *GetObjects) (*GetObjectsResp, error) {
return mq.Request(Service.GetObjects, client.rabbitCli, msg)
}

// 查询指定前缀的Object,返回的Objects会按照ObjectID升序
var _ = Register(Service.ListObjectsByPath)

type ListObjectsByPath struct {
mq.MessageBodyBase
cdsapi.ObjectListByPath
}
type ListObjectsByPathResp struct {
mq.MessageBodyBase
cdsapi.ObjectListByPathResp
}

func ReqListObjectsByPath(req cdsapi.ObjectListByPath) *ListObjectsByPath {
return &ListObjectsByPath{
ObjectListByPath: req,
}
}
func RespListObjectsByPath(resp cdsapi.ObjectListByPathResp) *ListObjectsByPathResp {
return &ListObjectsByPathResp{
ObjectListByPathResp: resp,
}
}
func (client *Client) ListObjectsByPath(msg *ListObjectsByPath) (*ListObjectsByPathResp, error) {
return mq.Request(Service.ListObjectsByPath, client.rabbitCli, msg)
}

// 查询Package中的所有Object,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjects)

type GetPackageObjects struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
}
type GetPackageObjectsResp struct {
mq.MessageBodyBase
Objects []model.Object `json:"objects"`
}

func ReqGetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageObjects {
return &GetPackageObjects{
UserID: userID,
PackageID: packageID,
}
}
func RespGetPackageObjects(objects []model.Object) *GetPackageObjectsResp {
return &GetPackageObjectsResp{
Objects: objects,
}
}
func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, error) {
return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg)
}

// 获取Package中所有Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序
var _ = Register(Service.GetPackageObjectDetails)

type GetPackageObjectDetails struct {
mq.MessageBodyBase
PackageID cdssdk.PackageID `json:"packageID"`
}
type GetPackageObjectDetailsResp struct {
mq.MessageBodyBase
Objects []stgmod.ObjectDetail `json:"objects"`
}

func ReqGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails {
return &GetPackageObjectDetails{
PackageID: packageID,
}
}
func RespPackageObjectDetails(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp {
return &GetPackageObjectDetailsResp{
Objects: objects,
}
}
func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, error) {
return mq.Request(Service.GetPackageObjectDetails, client.rabbitCli, msg)
}

// 获取多个Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序。
var _ = Register(Service.GetObjectDetails)

type GetObjectDetails struct {
mq.MessageBodyBase
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
}
type GetObjectDetailsResp struct {
mq.MessageBodyBase
Objects []*stgmod.ObjectDetail `json:"objects"` // 如果没有查询到某个ID对应的信息,则此数组对应位置为nil
}

func ReqGetObjectDetails(objectIDs []cdssdk.ObjectID) *GetObjectDetails {
return &GetObjectDetails{
ObjectIDs: objectIDs,
}
}
func RespGetObjectDetails(objects []*stgmod.ObjectDetail) *GetObjectDetailsResp {
return &GetObjectDetailsResp{
Objects: objects,
}
}
func (client *Client) GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, error) {
return mq.Request(Service.GetObjectDetails, client.rabbitCli, msg)
}

// 更新Object的冗余方式
var _ = Register(Service.UpdateObjectRedundancy)

type UpdateObjectRedundancy struct {
mq.MessageBodyBase
Updatings []UpdatingObjectRedundancy `json:"updatings"`
}
type UpdateObjectRedundancyResp struct {
mq.MessageBodyBase
}
type UpdatingObjectRedundancy struct {
ObjectID cdssdk.ObjectID `json:"objectID"`
FileHash cdssdk.FileHash `json:"fileHash"`
Size int64 `json:"size"`
Redundancy cdssdk.Redundancy `json:"redundancy"`
PinnedAt []cdssdk.StorageID `json:"pinnedAt"`
Blocks []stgmod.ObjectBlock `json:"blocks"`
}

func ReqUpdateObjectRedundancy(updatings []UpdatingObjectRedundancy) *UpdateObjectRedundancy {
return &UpdateObjectRedundancy{
Updatings: updatings,
}
}
func RespUpdateObjectRedundancy() *UpdateObjectRedundancyResp {
return &UpdateObjectRedundancyResp{}
}
func (client *Client) UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, error) {
return mq.Request(Service.UpdateObjectRedundancy, client.rabbitCli, msg)
}

// 更新Object元数据
var _ = Register(Service.UpdateObjectInfos)

type UpdateObjectInfos struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Updatings []cdsapi.UpdatingObject `json:"updatings"`
}

type UpdateObjectInfosResp struct {
mq.MessageBodyBase
Successes []cdssdk.ObjectID `json:"successes"`
}

func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdsapi.UpdatingObject) *UpdateObjectInfos {
return &UpdateObjectInfos{
UserID: userID,
Updatings: updatings,
}
}
func RespUpdateObjectInfos(successes []cdssdk.ObjectID) *UpdateObjectInfosResp {
return &UpdateObjectInfosResp{
Successes: successes,
}
}
func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) {
return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg)
}

// 移动Object
var _ = Register(Service.MoveObjects)

type MoveObjects struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Movings []cdsapi.MovingObject `json:"movings"`
}

type MoveObjectsResp struct {
mq.MessageBodyBase
Successes []cdssdk.ObjectID `json:"successes"`
}

func ReqMoveObjects(userID cdssdk.UserID, movings []cdsapi.MovingObject) *MoveObjects {
return &MoveObjects{
UserID: userID,
Movings: movings,
}
}
func RespMoveObjects(successes []cdssdk.ObjectID) *MoveObjectsResp {
return &MoveObjectsResp{
Successes: successes,
}
}
func (client *Client) MoveObjects(msg *MoveObjects) (*MoveObjectsResp, error) {
return mq.Request(Service.MoveObjects, client.rabbitCli, msg)
}

// 删除Object
var _ = Register(Service.DeleteObjects)

type DeleteObjects struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
}

type DeleteObjectsResp struct {
mq.MessageBodyBase
Successes []cdssdk.ObjectID `json:"successes"`
}

func ReqDeleteObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *DeleteObjects {
return &DeleteObjects{
UserID: userID,
ObjectIDs: objectIDs,
}
}
func RespDeleteObjects(sucs []cdssdk.ObjectID) *DeleteObjectsResp {
return &DeleteObjectsResp{
Successes: sucs,
}
}
func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, error) {
return mq.Request(Service.DeleteObjects, client.rabbitCli, msg)
}

// 克隆Object
var _ = Register(Service.CloneObjects)

type CloneObjects struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Clonings []cdsapi.CloningObject `json:"clonings"`
}
type CloneObjectsResp struct {
mq.MessageBodyBase
Objects []*cdssdk.Object `json:"objects"`
}

func ReqCloneObjects(userID cdssdk.UserID, clonings []cdsapi.CloningObject) *CloneObjects {
return &CloneObjects{
UserID: userID,
Clonings: clonings,
}
}
func RespCloneObjects(objects []*cdssdk.Object) *CloneObjectsResp {
return &CloneObjectsResp{
Objects: objects,
}
}
func (client *Client) CloneObjects(msg *CloneObjects) (*CloneObjectsResp, error) {
return mq.Request(Service.CloneObjects, client.rabbitCli, msg)
}

// 增加访问计数
var _ = RegisterNoReply(Service.AddAccessStat)

type AddAccessStat struct {
mq.MessageBodyBase
Entries []AddAccessStatEntry `json:"entries"`
}

type AddAccessStatEntry struct {
ObjectID cdssdk.ObjectID `json:"objectID"`
PackageID cdssdk.PackageID `json:"packageID"`
StorageID cdssdk.StorageID `json:"storageID"`
Counter float64 `json:"counter"`
}

func ReqAddAccessStat(entries []AddAccessStatEntry) *AddAccessStat {
return &AddAccessStat{
Entries: entries,
}
}

func (client *Client) AddAccessStat(msg *AddAccessStat) error {
return mq.Send(Service.AddAccessStat, client.rabbitCli, msg)
}

var _ = Register(Service.NewMultipartUploadObject)

type NewMultipartUploadObject struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
Path string `json:"path"`
}
type NewMultipartUploadObjectResp struct {
mq.MessageBodyBase
Object cdssdk.Object `json:"object"`
}

func ReqNewMultipartUploadObject(userID cdssdk.UserID, packageID cdssdk.PackageID, path string) *NewMultipartUploadObject {
return &NewMultipartUploadObject{
UserID: userID,
PackageID: packageID,
Path: path,
}
}
func RespNewMultipartUploadObject(object cdssdk.Object) *NewMultipartUploadObjectResp {
return &NewMultipartUploadObjectResp{
Object: object,
}
}
func (client *Client) NewMultipartUploadObject(msg *NewMultipartUploadObject) (*NewMultipartUploadObjectResp, error) {
return mq.Request(Service.NewMultipartUploadObject, client.rabbitCli, msg)
}

var _ = Register(Service.AddMultipartUploadPart)

type AddMultipartUploadPart struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
ObjectID cdssdk.ObjectID `json:"objectID"`
Block stgmod.ObjectBlock `json:"block"`
}

type AddMultipartUploadPartResp struct {
mq.MessageBodyBase
}

func ReqAddMultipartUploadPart(userID cdssdk.UserID, objectID cdssdk.ObjectID, blk stgmod.ObjectBlock) *AddMultipartUploadPart {
return &AddMultipartUploadPart{
UserID: userID,
ObjectID: objectID,
Block: blk,
}
}
func RespAddMultipartUploadPart() *AddMultipartUploadPartResp {
return &AddMultipartUploadPartResp{}
}
func (client *Client) AddMultipartUploadPart(msg *AddMultipartUploadPart) (*AddMultipartUploadPartResp, error) {
return mq.Request(Service.AddMultipartUploadPart, client.rabbitCli, msg)
}

+ 0
- 258
common/pkgs/mq/coordinator/package.go View File

@@ -1,258 +0,0 @@
package coordinator

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type PackageService interface {
GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage)

GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, *mq.CodeMessage)

CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage)

UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, *mq.CodeMessage)

DeletePackage(msg *DeletePackage) (*DeletePackageResp, *mq.CodeMessage)

ClonePackage(msg *ClonePackage) (*ClonePackageResp, *mq.CodeMessage)

GetPackageCachedStorages(msg *GetPackageCachedStorages) (*GetPackageCachedStoragesResp, *mq.CodeMessage)
}

// 获取Package基本信息
var _ = Register(Service.GetPackage)

type GetPackage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
}
type GetPackageResp struct {
mq.MessageBodyBase
model.Package
}

func NewGetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackage {
return &GetPackage{
UserID: userID,
PackageID: packageID,
}
}
func NewGetPackageResp(pkg model.Package) *GetPackageResp {
return &GetPackageResp{
Package: pkg,
}
}

func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) {
return mq.Request(Service.GetPackage, client.rabbitCli, msg)
}

// 根据名称获取Package
var _ = Register(Service.GetPackageByName)

type GetPackageByName struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
BucketName string `json:"bucketName"`
PackageName string `json:"packageName"`
}
type GetPackageByNameResp struct {
mq.MessageBodyBase
Package cdssdk.Package `json:"package"`
}

func ReqGetPackageByName(userID cdssdk.UserID, bucketName string, packageName string) *GetPackageByName {
return &GetPackageByName{
UserID: userID,
BucketName: bucketName,
PackageName: packageName,
}
}
func NewGetPackageByNameResp(pkg cdssdk.Package) *GetPackageByNameResp {
return &GetPackageByNameResp{
Package: pkg,
}
}
func (client *Client) GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, error) {
return mq.Request(Service.GetPackageByName, client.rabbitCli, msg)
}

// 创建一个Package
var _ = Register(Service.CreatePackage)

type CreatePackage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
BucketID cdssdk.BucketID `json:"bucketID"`
Name string `json:"name"`
}
type CreatePackageResp struct {
mq.MessageBodyBase
Package cdssdk.Package `json:"package"`
}

func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) *CreatePackage {
return &CreatePackage{
UserID: userID,
BucketID: bucketID,
Name: name,
}
}
func NewCreatePackageResp(pkg cdssdk.Package) *CreatePackageResp {
return &CreatePackageResp{
Package: pkg,
}
}

func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, error) {
return mq.Request(Service.CreatePackage, client.rabbitCli, msg)
}

// 更新Package
var _ = Register(Service.UpdatePackage)

type UpdatePackage struct {
mq.MessageBodyBase
PackageID cdssdk.PackageID `json:"packageID"`
Adds []AddObjectEntry `json:"adds"`
}
type UpdatePackageResp struct {
mq.MessageBodyBase
Added []cdssdk.Object `json:"added"`
}
type AddObjectEntry struct {
Path string `json:"path"`
Size int64 `json:"size,string"`
FileHash cdssdk.FileHash `json:"fileHash"`
UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间
StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}

func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry) *UpdatePackage {
return &UpdatePackage{
PackageID: packageID,
Adds: adds,
}
}
func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp {
return &UpdatePackageResp{
Added: added,
}
}
func NewAddObjectEntry(path string, size int64, fileHash cdssdk.FileHash, uploadTime time.Time, stgIDs []cdssdk.StorageID) AddObjectEntry {
return AddObjectEntry{
Path: path,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageIDs: stgIDs,
}
}
func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) {
return mq.Request(Service.UpdatePackage, client.rabbitCli, msg)
}

// 删除对象
var _ = Register(Service.DeletePackage)

type DeletePackage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
}
type DeletePackageResp struct {
mq.MessageBodyBase
}

func NewDeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *DeletePackage {
return &DeletePackage{
UserID: userID,
PackageID: packageID,
}
}
func NewDeletePackageResp() *DeletePackageResp {
return &DeletePackageResp{}
}
func (client *Client) DeletePackage(msg *DeletePackage) (*DeletePackageResp, error) {
return mq.Request(Service.DeletePackage, client.rabbitCli, msg)
}

// 克隆Package
var _ = Register(Service.ClonePackage)

type ClonePackage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
BucketID cdssdk.BucketID `json:"bucketID"`
Name string `json:"name"`
}
type ClonePackageResp struct {
mq.MessageBodyBase
Package cdssdk.Package `json:"package"`
}

func ReqClonePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, bucketID cdssdk.BucketID, name string) *ClonePackage {
return &ClonePackage{
UserID: userID,
PackageID: packageID,
BucketID: bucketID,
Name: name,
}
}
func RespClonePackage(pkg cdssdk.Package) *ClonePackageResp {
return &ClonePackageResp{
Package: pkg,
}
}

func (client *Client) ClonePackage(msg *ClonePackage) (*ClonePackageResp, error) {
return mq.Request(Service.ClonePackage, client.rabbitCli, msg)
}

// 根据PackageID获取object分布情况
var _ = Register(Service.GetPackageCachedStorages)

type GetPackageCachedStorages struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type PackageCachedStorageInfo struct {
StorageID int64 `json:"storageID"`
FileSize int64 `json:"fileSize"`
ObjectCount int64 `json:"objectCount"`
}

type GetPackageCachedStoragesResp struct {
mq.MessageBodyBase
cdssdk.PackageCachingInfo
}

func ReqGetPackageCachedStorages(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageCachedStorages {
return &GetPackageCachedStorages{
UserID: userID,
PackageID: packageID,
}
}

func ReqGetPackageCachedStoragesResp(stgInfos []cdssdk.StoragePackageCachingInfo, packageSize int64) *GetPackageCachedStoragesResp {
return &GetPackageCachedStoragesResp{
PackageCachingInfo: cdssdk.PackageCachingInfo{
StorageInfos: stgInfos,
PackageSize: packageSize,
},
}
}

func (client *Client) GetPackageCachedStorages(msg *GetPackageCachedStorages) (*GetPackageCachedStoragesResp, error) {
return mq.Request(Service.GetPackageCachedStorages, client.rabbitCli, msg)
}

+ 0
- 12
common/pkgs/mq/coordinator/server.go View File

@@ -8,21 +8,9 @@ import (

// Service 协调端接口
type Service interface {
AgentService

BucketService

CacheService

HubService

ObjectService

PackageService

StorageService

UserService
}

type Server struct {


+ 5
- 140
common/pkgs/mq/coordinator/storage.go View File

@@ -2,21 +2,11 @@ package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

type StorageService interface {
GetStorage(msg *GetStorage) (*GetStorageResp, *mq.CodeMessage)

GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, *mq.CodeMessage)

GetUserStorageDetails(msg *GetUserStorageDetails) (*GetUserStorageDetailsResp, *mq.CodeMessage)

GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, *mq.CodeMessage)

StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage)
}

// 获取Storage信息
@@ -24,21 +14,19 @@ var _ = Register(Service.GetStorage)

type GetStorage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
StorageID cdssdk.StorageID `json:"storageID"`
StorageID cortypes.StorageID `json:"storageID"`
}
type GetStorageResp struct {
mq.MessageBodyBase
Storage model.Storage `json:"storage"`
Storage cortypes.Storage `json:"storage"`
}

func ReqGetStorage(userID cdssdk.UserID, storageID cdssdk.StorageID) *GetStorage {
func ReqGetStorage(storageID cortypes.StorageID) *GetStorage {
return &GetStorage{
UserID: userID,
StorageID: storageID,
}
}
func RespGetStorage(stg model.Storage) *GetStorageResp {
func RespGetStorage(stg cortypes.Storage) *GetStorageResp {
return &GetStorageResp{
Storage: stg,
}
@@ -46,126 +34,3 @@ func RespGetStorage(stg model.Storage) *GetStorageResp {
func (client *Client) GetStorage(msg *GetStorage) (*GetStorageResp, error) {
return mq.Request(Service.GetStorage, client.rabbitCli, msg)
}

// 获取Storage的详细信息
var _ = Register(Service.GetStorageDetails)

type GetStorageDetails struct {
mq.MessageBodyBase
StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}
type GetStorageDetailsResp struct {
mq.MessageBodyBase
Storages []*stgmod.StorageDetail `json:"storages"`
}

func ReqGetStorageDetails(storageIDs []cdssdk.StorageID) *GetStorageDetails {
return &GetStorageDetails{
StorageIDs: storageIDs,
}
}
func RespGetStorageDetails(stgs []*stgmod.StorageDetail) *GetStorageDetailsResp {
return &GetStorageDetailsResp{
Storages: stgs,
}
}

func (r *GetStorageDetailsResp) ToMap() map[cdssdk.StorageID]stgmod.StorageDetail {
m := make(map[cdssdk.StorageID]stgmod.StorageDetail)
for _, stg := range r.Storages {
if stg == nil {
continue
}

m[stg.Storage.StorageID] = *stg
}
return m
}

func (client *Client) GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, error) {
return mq.Request(Service.GetStorageDetails, client.rabbitCli, msg)
}

var _ = Register(Service.GetStorageByName)

type GetStorageByName struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
Name string `json:"name"`
}
type GetStorageByNameResp struct {
mq.MessageBodyBase
Storage model.Storage `json:"storage"`
}

func ReqGetStorageByName(userID cdssdk.UserID, name string) *GetStorageByName {
return &GetStorageByName{
UserID: userID,
Name: name,
}
}
func RespGetStorageByNameResp(storage model.Storage) *GetStorageByNameResp {
return &GetStorageByNameResp{
Storage: storage,
}
}
func (client *Client) GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, error) {
return mq.Request(Service.GetStorageByName, client.rabbitCli, msg)
}

// 获取用户的Storage信息
var _ = Register(Service.GetUserStorageDetails)

type GetUserStorageDetails struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
}
type GetUserStorageDetailsResp struct {
mq.MessageBodyBase
Storages []stgmod.StorageDetail `json:"storages"`
}

func ReqGetUserStorageDetails(userID cdssdk.UserID) *GetUserStorageDetails {
return &GetUserStorageDetails{
UserID: userID,
}
}
func RespGetUserStorageDetails(stgs []stgmod.StorageDetail) *GetUserStorageDetailsResp {
return &GetUserStorageDetailsResp{
Storages: stgs,
}
}
func (client *Client) GetUserStorageDetails(msg *GetUserStorageDetails) (*GetUserStorageDetailsResp, error) {
return mq.Request(Service.GetUserStorageDetails, client.rabbitCli, msg)
}

// 提交调度记录
var _ = Register(Service.StoragePackageLoaded)

type StoragePackageLoaded struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
StorageID cdssdk.StorageID `json:"storageID"`
RootPath string `json:"rootPath"`
PinnedObjects []cdssdk.ObjectID `json:"pinnedObjects"`
}
type StoragePackageLoadedResp struct {
mq.MessageBodyBase
}

func ReqStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID, rootPath string, pinnedObjects []cdssdk.ObjectID) *StoragePackageLoaded {
return &StoragePackageLoaded{
UserID: userID,
PackageID: packageID,
StorageID: stgID,
RootPath: rootPath,
PinnedObjects: pinnedObjects,
}
}
func RespStoragePackageLoaded() *StoragePackageLoadedResp {
return &StoragePackageLoadedResp{}
}
func (client *Client) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, error) {
return mq.Request(Service.StoragePackageLoaded, client.rabbitCli, msg)
}

+ 0
- 38
common/pkgs/mq/coordinator/temp.go View File

@@ -1,38 +0,0 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
)

// 删除Object
var _ = Register(Service.GetDatabaseAll)

type GetDatabaseAll struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
}

type GetDatabaseAllResp struct {
mq.MessageBodyBase
Buckets []cdssdk.Bucket `json:"buckets"`
Packages []cdssdk.Package `json:"packages"`
Objects []stgmod.ObjectDetail `json:"objects"`
}

func ReqGetDatabaseAll(userID cdssdk.UserID) *GetDatabaseAll {
return &GetDatabaseAll{
UserID: userID,
}
}
func RespGetDatabaseAll(buckets []cdssdk.Bucket, packages []cdssdk.Package, objects []stgmod.ObjectDetail) *GetDatabaseAllResp {
return &GetDatabaseAllResp{
Buckets: buckets,
Packages: packages,
Objects: objects,
}
}
func (client *Client) GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, error) {
return mq.Request(Service.GetDatabaseAll, client.rabbitCli, msg)
}

+ 0
- 67
common/pkgs/mq/coordinator/user.go View File

@@ -1,67 +0,0 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type UserService interface {
CreateUser(msg *CreateUser) (*CreateUserResp, *mq.CodeMessage)

DeleteUser(msg *DeleteUser) (*DeleteUserResp, *mq.CodeMessage)
}

// 创建用户
var _ = Register(Service.CreateUser)

type CreateUser struct {
mq.MessageBodyBase
Name string `json:"name"`
}

type CreateUserResp struct {
mq.MessageBodyBase
User cdssdk.User `json:"user"`
}

func ReqCreateUser(name string) *CreateUser {
return &CreateUser{
Name: name,
}
}

func RespCreateUser(user cdssdk.User) *CreateUserResp {
return &CreateUserResp{
User: user,
}
}

func (c *Client) CreateUser(msg *CreateUser) (*CreateUserResp, error) {
return mq.Request(Service.CreateUser, c.rabbitCli, msg)
}

// 删除用户
var _ = Register(Service.DeleteUser)

type DeleteUser struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
}

type DeleteUserResp struct {
mq.MessageBodyBase
}

func ReqDeleteUser(userID cdssdk.UserID) *DeleteUser {
return &DeleteUser{
UserID: userID,
}
}

func RespDeleteUser() *DeleteUserResp {
return &DeleteUserResp{}
}

func (c *Client) DeleteUser(msg *DeleteUser) (*DeleteUserResp, error) {
return mq.Request(Service.DeleteUser, c.rabbitCli, msg)
}

Loading…
Cancel
Save