Browse Source

取消db模块的sql包;给CreateBucket接口增加上锁过程

gitlink
Sydonian 2 years ago
parent
commit
a5b132a9aa
6 changed files with 58 additions and 34 deletions
  1. +5
    -3
      internal/config/config.go
  2. +16
    -4
      internal/services/bucket.go
  3. +14
    -14
      internal/services/object.go
  4. +8
    -5
      internal/services/service.go
  5. +7
    -7
      internal/services/storage.go
  6. +8
    -1
      main.go

+ 5
- 3
internal/config/config.go View File

@@ -1,6 +1,7 @@
package config

import (
"gitlink.org.cn/cloudream/common/pkg/distlock"
log "gitlink.org.cn/cloudream/common/pkg/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
db "gitlink.org.cn/cloudream/db/config"
@@ -8,9 +9,10 @@ import (
)

type Config struct {
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
DistLock distlock.Config `json:"distlock"`
}

var cfg Config


+ 16
- 4
internal/services/bucket.go View File

@@ -2,6 +2,7 @@ package services

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
log "gitlink.org.cn/cloudream/common/pkg/logger"
"gitlink.org.cn/cloudream/db/model"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
@@ -14,7 +15,7 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) {
}

func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUserBucketsResp {
buckets, err := svc.db.Bucket().GetUserBuckets(msg.Body.UserID)
buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.Body.UserID)

if err != nil {
log.WithField("UserID", msg.Body.UserID).
@@ -26,7 +27,7 @@ func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUser
}

func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.GetBucketObjectsResp {
objects, err := svc.db.Object().GetBucketObjects(msg.Body.UserID, msg.Body.BucketID)
objects, err := svc.db.Object().GetBucketObjects(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.BucketID)

if err != nil {
log.WithField("UserID", msg.Body.UserID).
@@ -39,7 +40,18 @@ func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.Get
}

func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucketResp {
bucketID, err := svc.db.Bucket().Create(msg.Body.UserID, msg.Body.BucketName)
mutex, err := reqbuilder.NewBuilder().
Metadata().Bucket().CreateOne(msg.Body.UserID, msg.Body.BucketName).
// TODO 可以考虑二次加锁,加的更精确
UserBucket().CreateAny().
MutextLock(svc.distlock)
if err != nil {
log.Warnf("acquire locks failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.CreateBucketResp](errorcode.OPERATION_FAILED, "acquire locks failed")
}
defer mutex.Unlock()

bucketID, err := svc.db.Bucket().Create(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.BucketName)

if err != nil {
log.WithField("UserID", msg.Body.UserID).
@@ -52,7 +64,7 @@ func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucke
}

func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) *coormsg.DeleteBucketResp {
err := svc.db.Bucket().Delete(msg.Body.UserID, msg.Body.BucketID)
err := svc.db.Bucket().Delete(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.BucketID)

if err != nil {
log.WithField("UserID", msg.Body.UserID).


+ 14
- 14
internal/services/object.go View File

@@ -17,7 +17,7 @@ import (
func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp {

// 查询文件对象
object, err := svc.db.Object().GetUserObject(msg.Body.UserID, msg.Body.ObjectID)
object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID)
if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
Warnf("query Object failed, err: %s", err.Error())
@@ -25,7 +25,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P
}

// 查询客户端所属节点
belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
if err != nil {
logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
Warnf("query client belong node failed, err: %s", err.Error())
@@ -35,7 +35,7 @@ func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.P

//-若redundancy是rep,查询对象副本表, 获得FileHash
if object.Redundancy == consts.REDUNDANCY_REP {
objectRep, err := svc.db.ObjectRep().GetObjectRep(object.ObjectID)
objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
if err != nil {
logger.WithField("ObjectID", object.ObjectID).
Warnf("get ObjectRep failed, err: %s", err.Error())
@@ -116,7 +116,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg

// 判断同名对象是否存在。等到UploadRepObject时再判断一次。
// 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
isBucketAvai, err := svc.db.Bucket().IsAvailable(msg.Body.BucketID, msg.Body.UserID)
isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.UserID)
if err != nil {
logger.WithField("BucketID", msg.Body.BucketID).
Warnf("check bucket available failed, err: %s", err.Error())
@@ -128,7 +128,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg
return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user")
}

_, err = svc.db.Object().GetByName(msg.Body.BucketID, msg.Body.ObjectName)
_, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.ObjectName)
if err == nil {
logger.WithField("BucketID", msg.Body.BucketID).
WithField("ObjectName", msg.Body.ObjectName).
@@ -143,7 +143,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg
}

//查询用户可用的节点IP
nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID)
nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID)
if err != nil {
logger.WithField("UserID", msg.Body.UserID).
Warnf("query user nodes failed, err: %s", err.Error())
@@ -151,7 +151,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg
}

// 查询客户端所属节点
belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
if err != nil {
logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
Warnf("query client belong node failed, err: %s", err.Error())
@@ -173,7 +173,7 @@ func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg
}

func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp {
_, err := svc.db.Object().CreateRepObject(msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash)
_, err := svc.db.Object().CreateRepObject(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash)
if err != nil {
logger.WithField("BucketName", msg.Body.BucketID).
WithField("ObjectName", msg.Body.ObjectName).
@@ -193,7 +193,7 @@ func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.Creat
func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp {
// TODO 检查用户是否有Object的权限
// 获取对象信息
obj, err := svc.db.Object().GetByID(msg.Body.ObjectID)
obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID)
if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
Warnf("get object failed, err: %s", err.Error())
@@ -206,7 +206,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg
}

// 获取对象Rep信息
objRep, err := svc.db.ObjectRep().GetObjectRep(msg.Body.ObjectID)
objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID)
if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
Warnf("get object rep failed, err: %s", err.Error())
@@ -214,7 +214,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg
}

//查询用户可用的节点IP
nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID)
nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID)
if err != nil {
logger.WithField("UserID", msg.Body.UserID).
Warnf("query user nodes failed, err: %s", err.Error())
@@ -222,7 +222,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg
}

// 查询客户端所属节点
belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
if err != nil {
logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
Warnf("query client belong node failed, err: %s", err.Error())
@@ -253,7 +253,7 @@ func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg
}

func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp {
err := svc.db.Object().UpdateRepObject(msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash)
err := svc.db.Object().UpdateRepObject(svc.db.SQLCtx(), msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash)
if err != nil {
logger.WithField("ObjectID", msg.Body.ObjectID).
Warnf("update rep object failed, err: %s", err.Error())
@@ -270,7 +270,7 @@ func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.Updat
}

func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjectResp {
err := svc.db.Object().SoftDelete(msg.Body.UserID, msg.Body.ObjectID)
err := svc.db.Object().SoftDelete(svc.db.SQLCtx(), msg.Body.ObjectID)

if err != nil {
logger.WithField("UserID", msg.Body.UserID).


+ 8
- 5
internal/services/service.go View File

@@ -1,18 +1,21 @@
package services

import (
distlock "gitlink.org.cn/cloudream/common/pkg/distlock/service"
mydb "gitlink.org.cn/cloudream/db"
sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner"
)

type Service struct {
db *mydb.DB
scanner *sccli.Client
db *mydb.DB
scanner *sccli.Client
distlock *distlock.Service
}

func NewService(db *mydb.DB, scanner *sccli.Client) *Service {
func NewService(db *mydb.DB, scanner *sccli.Client, distlock *distlock.Service) *Service {
return &Service{
db: db,
scanner: scanner,
db: db,
scanner: scanner,
distlock: distlock,
}
}

+ 7
- 7
internal/services/storage.go View File

@@ -9,7 +9,7 @@ import (
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
)

func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) *coormsg.PreMoveObjectToStorageResp {
func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) *coormsg.PreMoveObjectToStorageResp {
//查询数据库,获取冗余类型,冗余参数
//jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSize
//-若redundancy是rep,查询对象副本表, 获得repHash
@@ -21,7 +21,7 @@ func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStora
//--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids

// 查询用户关联的存储服务
stg, err := service.db.Storage().GetUserStorage(msg.Body.UserID, msg.Body.StorageID)
stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.StorageID)
if err != nil {
log.WithField("UserID", msg.Body.UserID).
WithField("StorageID", msg.Body.StorageID).
@@ -30,7 +30,7 @@ func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStora
}

// 查询文件对象
object, err := service.db.Object().GetUserObject(msg.Body.UserID, msg.Body.ObjectID)
object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID)
if err != nil {
log.WithField("ObjectID", msg.Body.ObjectID).
Warnf("get user Object failed, err: %s", err.Error())
@@ -39,7 +39,7 @@ func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStora

//-若redundancy是rep,查询对象副本表, 获得FileHash
if object.Redundancy == consts.REDUNDANCY_REP {
objectRep, err := service.db.ObjectRep().GetObjectRep(object.ObjectID)
objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
if err != nil {
log.Warnf("get ObjectRep failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "get ObjectRep failed")
@@ -58,7 +58,7 @@ func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStora

var hashs []string
ids := []int{0}
blockHashs, err := service.db.QueryObjectBlock(object.ObjectID)
blockHashs, err := svc.db.QueryObjectBlock(object.ObjectID)
if err != nil {
log.Warnf("query ObjectBlock failed, err: %s", err.Error())
return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectBlock failed")
@@ -97,8 +97,8 @@ func (service *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStora
}
}

func (service *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObjectToStorageResp {
err := service.db.Storage().UserMoveObjectTo(msg.Body.UserID, msg.Body.ObjectID, msg.Body.StorageID)
func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObjectToStorageResp {
err := svc.db.Storage().UserMoveObjectTo(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID, msg.Body.StorageID)
if err != nil {
log.WithField("UserID", msg.Body.UserID).
WithField("ObjectID", msg.Body.ObjectID).


+ 8
- 1
main.go View File

@@ -4,6 +4,7 @@ import (
"fmt"
"os"

distlocksvc "gitlink.org.cn/cloudream/common/pkg/distlock/service"
"gitlink.org.cn/cloudream/common/pkg/logger"
log "gitlink.org.cn/cloudream/common/pkg/logger"
"gitlink.org.cn/cloudream/coordinator/internal/config"
@@ -36,7 +37,13 @@ func main() {
log.Fatalf("new scanner client failed, err: %s", err.Error())
}

coorSvr, err := rasvr.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ)
distlockSvc, err := distlocksvc.NewService(&config.Cfg().DistLock)
if err != nil {
log.Warnf("new distlock service failed, err: %s", err.Error())
os.Exit(1)
}

coorSvr, err := rasvr.NewServer(services.NewService(db, scanner, distlockSvc), &config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new coordinator server failed, err: %s", err.Error())
}


Loading…
Cancel
Save