Browse Source

优化RepWrite操作数据的代码

gitlink
Gitea 2 years ago
parent
commit
5408e5f1b5
2 changed files with 70 additions and 71 deletions
  1. +59
    -70
      command_service.go
  2. +11
    -1
      main.go

+ 59
- 70
command_service.go View File

@@ -3,14 +3,23 @@ package main
import (
"fmt"

mydb "gitlink.org.cn/cloudream/db"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
"gitlink.org.cn/cloudream/utils"
"gitlink.org.cn/cloudream/utils/consts/errorcode"
)

type CommandService struct {
db *mydb.DB
}

func NewCommandService(db *mydb.DB) *CommandService {
return &CommandService{
db: db,
}
}

// TODO 需要考虑数据库操作的事务性
func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp {
//查询数据库,获取冗余类型,冗余参数
//jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
@@ -21,22 +30,42 @@ func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp {
//--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
//--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
//--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
BucketID := Query_BucketID(msg.BucketName)
bucketID, err := service.db.QueryBucketID(msg.BucketName)
if err != nil {
// TODO 日志
return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query BucketID failed")
}

//jh:使用command中的bucketid和objectname查询对象表,获得objectid,redundancy,EcName,fileSizeInBytes
ObjectID, fileSizeInBytes, redundancyy, ecName := Query_Object(msg.ObjectName, BucketID)
//-若redundancy是rep,查询对象副本表, 获得repHash
object, err := service.db.QueryObject(msg.ObjectName, bucketID)
if err != nil {
// TODO 日志
return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
}

//-若redundancy是rep,查询对象副本表, 获得repHash
var hashs []string
ids := []int{0}
redundancy := "rep"
if redundancyy { //rep
repHash := Query_ObjectRep(ObjectID)
hashs = append(hashs, repHash)
if object.Redundancy { //rep
objectRep, err := service.db.QueryObjectRep(object.ObjectID)
if err != nil {
// TODO 日志
return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
}

hashs = append(hashs, objectRep.RepHash)

} else { //ec
redundancy = "ec"
blockHashs := Query_ObjectBlock(ObjectID)
blockHashs, err := service.db.QueryObjectBlock(object.ObjectID)
if err != nil {
// TODO 日志
return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectBlock failed")
}

ecPolicies := *utils.GetEcPolicy()
ecPolicy := ecPolicies[ecName]
ecPolicy := ecPolicies[object.ECName]
ecN := ecPolicy.GetN()
ecK := ecPolicy.GetK()
ids = make([]int, ecK)
@@ -65,69 +94,43 @@ func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp {
//--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
}*/
}
//--ids :={0}
//--hashs := {repHash}

//-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID)
// var objectblock []ObjectBlock
//type ObjectBlock struct {InnerID int, BlockHash string}
/*redundancy := "rep"
ecName := "ecName"
hashs := []string{"QmPeaWD8vrtnd1CE4WLnKBtYnMmAeVmUhqzhMnJ3QvLHsZ", "block2.json"}
ids := []int{0, 1}
fileSizeInBytes := 21

res := rabbitmq.MoveRes{
Redundancy: redundancy,
EcName: ecName,
Hashs: hashs,
Ids: ids,
FileSizeInBytes: int64(fileSizeInBytes),
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)*/
/*ec Move测试
redundancy := "ec"
ecName := "rs_3_2"
hashs := []string{"QmZD9YwEXpYAw5zncYzYv7LN9K6bw29FcbYsdL4whiRAyz", "QmSzxPyMJfw8fgwWHmEEMBpcdg8vqHtQuRAjUbzbv3KrEa"}
ids := []int{1, 2}
fileSizeInBytes := 21*/

return ramsg.NewCoorMoveRespOK(
redundancy,
ecName,
object.ECName,
hashs,
ids,
fileSizeInBytes,
object.FileSizeInBytes,
)
}

func (service *CommandService) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteResp {
//查询用户可用的节点IP
nodeip := Query_UserNode(msg.UserID) //nodeip格式:[]string
nodeIPs, err := service.db.QueryUserAvailableNodeIPs(msg.UserID)
if err != nil {
// TODO2 日志
return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user available node ip failed")
}

if len(nodeIPs) < msg.ReplicateNumber {
// TODO2 日志
return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "user nodes are not enough")
}

numRep := msg.ReplicateNumber
/*
TODO xh:错误判断
1.判断用户可用节点是否小于numRep
2.判断用户要写入的对象是否已存在
如果存在上述错误,直接通过消息队列返回错误代码给客户端(需在rabbitmq/commands.go中给writeRes加上错误代码字段)
*/
ips := make([]string, numRep)
//随机选取numRep个nodeIp
start := utils.GetRandInt(len(nodeip))
start := utils.GetRandInt(len(nodeIPs))
for i := 0; i < numRep; i++ {
ips[i] = nodeip[(start+i)%len(nodeip)]
ips[i] = nodeIPs[(start+i)%len(nodeIPs)]
}

_, err = service.db.CreateRepObject(msg.ObjectName, msg.BucketName, msg.FileSizeInBytes, msg.ReplicateNumber)
if err != nil {
// TODO2 日志
return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "create object failed")
}
//TODO xh: Query_BucketID和Insert_RepObject合成一个命令(两个动作可以一条sql语句搞定),保留Query_BucketID函数,
//但这里不使用它,而是把command.BucketName也传入Insert_RepObject,让其完成查询和插入操作,
//TODO xh: 元数据插入失败,需返回错误码
BucketID := Query_BucketID(msg.BucketName)
//对象表插入
ObjectID := Insert_RepObject(msg.ObjectName, BucketID, msg.FileSizeInBytes, msg.ReplicateNumber)
//对象副本表的插入
Insert_ObjectRep(ObjectID)

//返回消息
return ramsg.NewCoorWriteRespOK(ips)
}

@@ -252,21 +255,7 @@ func (service *CommandService) Read(msg *ramsg.ReadCommand) ramsg.ReadResp {
}

func (service *CommandService) TempCacheReport(msg *ramsg.TempCacheReport) {
//返回消息
for i := 0; i < len(msg.Hashes); i++ {
cache := Update_Cache(msg.Hashes[i], msg.IP)
//若要插入缓存不存在,将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳
if cache == (Cache{}) {
a := []string{}
b := []string{}
a = append(a, msg.Hashes[i])
b = append(b, msg.IP)
Insert_Cache(a, b, true)
}
}
//jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳
//-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作
//-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time
service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.IP)
}

func (service *CommandService) AgentStatusReport(msg *ramsg.AgentStatusReport) {


+ 11
- 1
main.go View File

@@ -1,15 +1,25 @@
package main

import (
mydb "gitlink.org.cn/cloudream/db"
rasvr "gitlink.org.cn/cloudream/rabbitmq/server"
)

const (
s_DATABASE_SOURCE_NAME = "root:123456@tcp(127.0.0.1:3306)/kx?charset=utf8mb4&parseTime=true"
)

//TODO xh: 读取配置文件,初始化变量

func main() {
//TODO xh:解析配置文件
db, err := mydb.NewDB(s_DATABASE_SOURCE_NAME)
if err != nil {
// TODO 错误处理
return
}

cmdSvr, err := rasvr.NewCoordinatorServer(&CommandService{})
cmdSvr, err := rasvr.NewCoordinatorServer(NewCommandService(db))
if err != nil {
// TODO 错误日志
return


Loading…
Cancel
Save