diff --git a/command_service.go b/command_service.go index 7fad841..d3856b9 100644 --- a/command_service.go +++ b/command_service.go @@ -3,14 +3,23 @@ package main import ( "fmt" + mydb "gitlink.org.cn/cloudream/db" ramsg "gitlink.org.cn/cloudream/rabbitmq/message" "gitlink.org.cn/cloudream/utils" "gitlink.org.cn/cloudream/utils/consts/errorcode" ) type CommandService struct { + db *mydb.DB } +func NewCommandService(db *mydb.DB) *CommandService { + return &CommandService{ + db: db, + } +} + +// TODO 需要考虑数据库操作的事务性 func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp { //查询数据库,获取冗余类型,冗余参数 //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes @@ -21,22 +30,42 @@ func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp { //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay) //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids - BucketID := Query_BucketID(msg.BucketName) + bucketID, err := service.db.QueryBucketID(msg.BucketName) + if err != nil { + // TODO 日志 + return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query BucketID failed") + } + //jh:使用command中的bucketid和objectname查询对象表,获得objectid,redundancy,EcName,fileSizeInBytes - ObjectID, fileSizeInBytes, redundancyy, ecName := Query_Object(msg.ObjectName, BucketID) - //-若redundancy是rep,查询对象副本表, 获得repHash + object, err := service.db.QueryObject(msg.ObjectName, bucketID) + if err != nil { + // TODO 日志 + return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query Object failed") + } + //-若redundancy是rep,查询对象副本表, 获得repHash var hashs []string ids := []int{0} redundancy := "rep" - if redundancyy { //rep - repHash := Query_ObjectRep(ObjectID) - hashs = append(hashs, repHash) + if object.Redundancy { //rep + objectRep, err := service.db.QueryObjectRep(object.ObjectID) + if err != nil { + // TODO 日志 + return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed") + } + + hashs = append(hashs, objectRep.RepHash) + } else { //ec redundancy = "ec" - blockHashs := Query_ObjectBlock(ObjectID) + blockHashs, err := service.db.QueryObjectBlock(object.ObjectID) + if err != nil { + // TODO 日志 + return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectBlock failed") + } + ecPolicies := *utils.GetEcPolicy() - ecPolicy := ecPolicies[ecName] + ecPolicy := ecPolicies[object.ECName] ecN := ecPolicy.GetN() ecK := ecPolicy.GetK() ids = make([]int, ecK) @@ -65,69 +94,43 @@ func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp { //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids }*/ } - //--ids :={0} - //--hashs := {repHash} - - //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID) - // var objectblock []ObjectBlock - //type ObjectBlock struct {InnerID int, BlockHash string} - /*redundancy := "rep" - ecName := "ecName" - hashs := []string{"QmPeaWD8vrtnd1CE4WLnKBtYnMmAeVmUhqzhMnJ3QvLHsZ", "block2.json"} - ids := []int{0, 1} - fileSizeInBytes := 21 - - res := rabbitmq.MoveRes{ - Redundancy: redundancy, - EcName: ecName, - Hashs: hashs, - Ids: ids, - FileSizeInBytes: int64(fileSizeInBytes), - } - c, _ := json.Marshal(res) - rabbitSend(c, command.UserId)*/ - /*ec Move测试 - redundancy := "ec" - ecName := "rs_3_2" - hashs := []string{"QmZD9YwEXpYAw5zncYzYv7LN9K6bw29FcbYsdL4whiRAyz", "QmSzxPyMJfw8fgwWHmEEMBpcdg8vqHtQuRAjUbzbv3KrEa"} - ids := []int{1, 2} - fileSizeInBytes := 21*/ return ramsg.NewCoorMoveRespOK( redundancy, - ecName, + object.ECName, hashs, ids, - fileSizeInBytes, + object.FileSizeInBytes, ) } func (service *CommandService) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteResp { //查询用户可用的节点IP - nodeip := Query_UserNode(msg.UserID) //nodeip格式:[]string + nodeIPs, err := service.db.QueryUserAvailableNodeIPs(msg.UserID) + if err != nil { + // TODO2 日志 + return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user available node ip failed") + } + + if len(nodeIPs) < msg.ReplicateNumber { + // TODO2 日志 + return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "user nodes are not enough") + } + numRep := msg.ReplicateNumber - /* - TODO xh:错误判断 - 1.判断用户可用节点是否小于numRep - 2.判断用户要写入的对象是否已存在 - 如果存在上述错误,直接通过消息队列返回错误代码给客户端(需在rabbitmq/commands.go中给writeRes加上错误代码字段) - */ ips := make([]string, numRep) //随机选取numRep个nodeIp - start := utils.GetRandInt(len(nodeip)) + start := utils.GetRandInt(len(nodeIPs)) for i := 0; i < numRep; i++ { - ips[i] = nodeip[(start+i)%len(nodeip)] + ips[i] = nodeIPs[(start+i)%len(nodeIPs)] + } + + _, err = service.db.CreateRepObject(msg.ObjectName, msg.BucketName, msg.FileSizeInBytes, msg.ReplicateNumber) + if err != nil { + // TODO2 日志 + return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "create object failed") } - //TODO xh: Query_BucketID和Insert_RepObject合成一个命令(两个动作可以一条sql语句搞定),保留Query_BucketID函数, - //但这里不使用它,而是把command.BucketName也传入Insert_RepObject,让其完成查询和插入操作, - //TODO xh: 元数据插入失败,需返回错误码 - BucketID := Query_BucketID(msg.BucketName) - //对象表插入 - ObjectID := Insert_RepObject(msg.ObjectName, BucketID, msg.FileSizeInBytes, msg.ReplicateNumber) - //对象副本表的插入 - Insert_ObjectRep(ObjectID) - //返回消息 return ramsg.NewCoorWriteRespOK(ips) } @@ -252,21 +255,7 @@ func (service *CommandService) Read(msg *ramsg.ReadCommand) ramsg.ReadResp { } func (service *CommandService) TempCacheReport(msg *ramsg.TempCacheReport) { - //返回消息 - for i := 0; i < len(msg.Hashes); i++ { - cache := Update_Cache(msg.Hashes[i], msg.IP) - //若要插入缓存不存在,将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳 - if cache == (Cache{}) { - a := []string{} - b := []string{} - a = append(a, msg.Hashes[i]) - b = append(b, msg.IP) - Insert_Cache(a, b, true) - } - } - //jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳 - //-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作 - //-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time + service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.IP) } func (service *CommandService) AgentStatusReport(msg *ramsg.AgentStatusReport) { diff --git a/main.go b/main.go index 99c3a3d..7ee1303 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,25 @@ package main import ( + mydb "gitlink.org.cn/cloudream/db" rasvr "gitlink.org.cn/cloudream/rabbitmq/server" ) +const ( + s_DATABASE_SOURCE_NAME = "root:123456@tcp(127.0.0.1:3306)/kx?charset=utf8mb4&parseTime=true" +) + //TODO xh: 读取配置文件,初始化变量 func main() { //TODO xh:解析配置文件 + db, err := mydb.NewDB(s_DATABASE_SOURCE_NAME) + if err != nil { + // TODO 错误处理 + return + } - cmdSvr, err := rasvr.NewCoordinatorServer(&CommandService{}) + cmdSvr, err := rasvr.NewCoordinatorServer(NewCommandService(db)) if err != nil { // TODO 错误日志 return