Browse Source

进一步优化代码,减少增加新消息类型所需的改动次数

gitlink
Sydonian 2 years ago
parent
commit
200dd5d801
4 changed files with 43 additions and 41 deletions
  1. +3
    -3
      internal/services/agent.go
  2. +20
    -19
      internal/services/bucket.go
  3. +8
    -8
      internal/services/command_service_ec.go
  4. +12
    -11
      internal/services/storage.go

+ 3
- 3
internal/services/agent.go View File

@@ -5,7 +5,7 @@ import (
)

func (service *Service) TempCacheReport(msg *coormsg.TempCacheReport) {
service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID)
service.db.BatchInsertOrUpdateCache(msg.Body.Hashes, msg.Body.NodeID)
}

func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) {
@@ -15,10 +15,10 @@ func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) {
// TODO
/*
ips := utils.GetAgentIps()
Insert_NodeDelay(msg.IP, ips, msg.AgentDelay)
Insert_NodeDelay(msg.Body.IP, ips, msg.Body.AgentDelay)

//从配置表里读取节点地域NodeLocation
//插入节点表的NodeStatus
Insert_Node(msg.IP, msg.IP, msg.IPFSStatus, msg.LocalDirStatus)
Insert_Node(msg.Body.IP, msg.Body.IP, msg.Body.IPFSStatus, msg.Body.LocalDirStatus)
*/
}

+ 20
- 19
internal/services/bucket.go View File

@@ -3,6 +3,7 @@ package services
import (
log "github.com/sirupsen/logrus"
"gitlink.org.cn/cloudream/db/model"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"gitlink.org.cn/cloudream/utils/consts/errorcode"
)
@@ -13,52 +14,52 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) {
}

func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) *coormsg.GetUserBucketsResp {
buckets, err := svc.db.GetUserBuckets(msg.UserID)
buckets, err := svc.db.GetUserBuckets(msg.Body.UserID)

if err != nil {
log.WithField("UserID", msg.UserID).
log.WithField("UserID", msg.Body.UserID).
Warnf("get user buckets failed, err: %s", err.Error())
return coormsg.NewGetUserBucketsRespFailed(errorcode.OPERATION_FAILED, "get all buckets failed")
return ramsg.ReplyFailed[coormsg.GetUserBucketsResp](errorcode.OPERATION_FAILED, "get all buckets failed")
}

return coormsg.NewGetUserBucketsRespOK(buckets)
return ramsg.ReplyOK(coormsg.NewGetUserBucketsRespBody(buckets))
}

func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) *coormsg.GetBucketObjectsResp {
objects, err := svc.db.GetBucketObjects(msg.UserID, msg.BucketID)
objects, err := svc.db.GetBucketObjects(msg.Body.UserID, msg.Body.BucketID)

if err != nil {
log.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
log.WithField("UserID", msg.Body.UserID).
WithField("BucketID", msg.Body.BucketID).
Warnf("get bucket objects failed, err: %s", err.Error())
return coormsg.NewGetBucketObjectsRespFailed(errorcode.OPERATION_FAILED, "get all buckets failed")
return ramsg.ReplyFailed[coormsg.GetBucketObjectsResp](errorcode.OPERATION_FAILED, "get bucket objects failed")
}

return coormsg.NewGetBucketObjectsRespOK(objects)
return ramsg.ReplyOK(coormsg.NewGetBucketObjectsRespBody(objects))
}

func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) *coormsg.CreateBucketResp {
bucketID, err := svc.db.CreateBucket(msg.UserID, msg.BucketName)
bucketID, err := svc.db.CreateBucket(msg.Body.UserID, msg.Body.BucketName)

if err != nil {
log.WithField("UserID", msg.UserID).
WithField("BucketName", msg.BucketName).
log.WithField("UserID", msg.Body.UserID).
WithField("BucketName", msg.Body.BucketName).
Warnf("create bucket failed, err: %s", err.Error())
return coormsg.NewCreateBucketRespFailed(errorcode.OPERATION_FAILED, "create bucket failed")
return ramsg.ReplyFailed[coormsg.CreateBucketResp](errorcode.OPERATION_FAILED, "create bucket failed")
}

return coormsg.NewCreateBucketRespOK(bucketID)
return ramsg.ReplyOK(coormsg.NewCreateBucketRespBody(bucketID))
}

func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) *coormsg.DeleteBucketResp {
err := svc.db.DeleteBucket(msg.UserID, msg.BucketID)
err := svc.db.DeleteBucket(msg.Body.UserID, msg.Body.BucketID)

if err != nil {
log.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
log.WithField("UserID", msg.Body.UserID).
WithField("BucketID", msg.Body.BucketID).
Warnf("delete bucket failed, err: %s", err.Error())
return coormsg.NewDeleteBucketRespFailed(errorcode.OPERATION_FAILED, "delete bucket failed")
return ramsg.ReplyFailed[coormsg.DeleteBucketResp](errorcode.OPERATION_FAILED, "delete bucket failed")
}

return coormsg.NewDeleteBucketRespOK()
return ramsg.ReplyOK(coormsg.NewDeleteBucketRespBody())
}

+ 8
- 8
internal/services/command_service_ec.go View File

@@ -13,13 +13,13 @@ func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.WriteResp
//jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入)
//返回消息
//查询用户可用的节点IP
nodes, err := service.db.QueryUserNodes(msg.UserID)
nodes, err := service.db.QueryUserNodes(msg.Body.UserID)
if err != nil {
log.Warnf("query user nodes failed, err: %s", err.Error())
return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user nodes failed")
}

ecid := msg.ECName
ecid := msg.Body.ECName
ecPolicies := *utils.GetEcPolicy()
ecPolicy := ecPolicies[ecid]
ecN := ecPolicy.GetN()
@@ -41,13 +41,13 @@ func (service *Service) ECWrite(msg *coormsg.ECWriteCommand) *coormsg.WriteResp

// TODO 参考RepWrite,将创建EC对象的逻辑移动到WriteECHash中,并合成成一个事务
//根据BucketName查询BucketID
BucketID := Query_BucketID(msg.BucketName)
BucketID := Query_BucketID(msg.Body.BucketName)
if BucketID == -1 {
// TODO 日志
return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("bucket id not found for %s", msg.BucketName))
return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("bucket id not found for %s", msg.Body.BucketName))
}
//对象表插入Insert_Cache
ObjectID := Insert_EcObject(msg.ObjectName, BucketID, msg.FileSizeInBytes, msg.ECName)
ObjectID := Insert_EcObject(msg.Body.ObjectName, BucketID, msg.Body.FileSizeInBytes, msg.Body.ECName)
//对象编码块表插入,hash暂时为空
for i := 0; i < ecN; i++ {
Insert_EcObjectBlock(ObjectID, i)
@@ -64,10 +64,10 @@ func (service *Service) WriteECHash(msg *coormsg.WriteECHashCommand) *coormsg.Wr
//返回消息
//插入对象编码块表中的Hash字段
// TODO 参考WriteRepHash的逻辑
ObjectId := Query_ObjectID(msg.ObjectName)
Insert_EcHash(ObjectId, msg.Hashes)
ObjectId := Query_ObjectID(msg.Body.ObjectName)
Insert_EcHash(ObjectId, msg.Body.Hashes)
//缓存表的插入
Insert_Cache(msg.Hashes, msg.NodeIDs, false)
Insert_Cache(msg.Body.Hashes, msg.Body.NodeIDs, false)

return ramsg.NewCoorWriteHashRespOK()
*/


+ 12
- 11
internal/services/storage.go View File

@@ -2,6 +2,7 @@ package services

import (
log "github.com/sirupsen/logrus"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"gitlink.org.cn/cloudream/utils"
"gitlink.org.cn/cloudream/utils/consts"
@@ -22,20 +23,20 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje
// TODO 需要在StorageData中增加记录

// 查询用户关联的存储服务
stg, err := service.db.QueryUserStorage(msg.UserID, msg.StorageID)
stg, err := service.db.QueryUserStorage(msg.Body.UserID, msg.Body.StorageID)
if err != nil {
log.WithField("UserID", msg.UserID).
WithField("StorageID", msg.StorageID).
log.WithField("UserID", msg.Body.UserID).
WithField("StorageID", msg.Body.StorageID).
Warnf("query storage directory failed, err: %s", err.Error())
return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query storage directory failed")
return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query storage directory failed")
}

// 查询文件对象
object, err := service.db.QueryObjectByID(msg.ObjectID)
object, err := service.db.QueryObjectByID(msg.Body.ObjectID)
if err != nil {
log.WithField("ObjectID", msg.ObjectID).
log.WithField("ObjectID", msg.Body.ObjectID).
Warnf("query Object failed, err: %s", err.Error())
return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query Object failed")
}

//-若redundancy是rep,查询对象副本表, 获得repHash
@@ -45,7 +46,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje
objectRep, err := service.db.QueryObjectRep(object.ObjectID)
if err != nil {
log.Warnf("query ObjectRep failed, err: %s", err.Error())
return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectRep failed")
}

hashs = append(hashs, objectRep.RepHash)
@@ -54,7 +55,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje
blockHashs, err := service.db.QueryObjectBlock(object.ObjectID)
if err != nil {
log.Warnf("query ObjectBlock failed, err: %s", err.Error())
return coormsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectBlock failed")
return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "query ObjectBlock failed")
}

ecPolicies := *utils.GetEcPolicy()
@@ -88,7 +89,7 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje
}*/
}

return coormsg.NewCoorMoveRespOK(
return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageRespBody(
stg.NodeID,
stg.Directory,
object.Redundancy,
@@ -96,5 +97,5 @@ func (service *Service) Move(msg *coormsg.MoveObjectToStorage) *coormsg.MoveObje
hashs,
ids,
object.FileSizeInBytes,
)
))
}

Loading…
Cancel
Save