diff --git a/commandServer.go b/commandServer.go index b321083..417cb76 100644 --- a/commandServer.go +++ b/commandServer.go @@ -1,145 +1,143 @@ package main import ( - //"context" - //"io" - "fmt" - //"path/filepath" - //"sync" - "encoding/json" - "strconv" - "rabbitmq" - - //agentcaller "proto" - - //"github.com/pborman/uuid" - //"github.com/streadway/amqp" - - //"google.golang.org/grpc" - + //"context" + //"io" + "fmt" + //"path/filepath" + //"sync" + "encoding/json" + "rabbitmq" + "strconv" + //agentcaller "proto" + //"github.com/pborman/uuid" + //"github.com/streadway/amqp" + //"google.golang.org/grpc" ) -func rabbitSend(c []byte, userId int){ - queueName := "coorClientQueue"+strconv.Itoa(userId) - rabbit := rabbitmq.NewRabbitMQSimple(queueName) - fmt.Println(string(c)) - rabbit.PublishSimple(c) - rabbit.Destroy() +func rabbitSend(c []byte, userId int) { + queueName := "coorClientQueue" + strconv.Itoa(userId) + rabbit := rabbitmq.NewRabbitMQSimple(queueName) + fmt.Println(string(c)) + rabbit.PublishSimple(c) + rabbit.Destroy() } - func TempCacheReport(command rabbitmq.TempCacheReport) { - fmt.Println("TempCacheReport") - fmt.Println(command.Hashs) - fmt.Println(command.Ip) - //返回消息 - //jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳 - //如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作 - //如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time + fmt.Println("TempCacheReport") + fmt.Println(command.Hashs) + fmt.Println(command.Ip) + //返回消息 + //jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳 + //-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作 + //-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time } func CoorMove(command rabbitmq.MoveCommand) { - fmt.Println("CoorMove") - fmt.Println(command.BucketName) - //查询数据库,获取冗余类型,冗余参数 - //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes - //若redundancy是rep,查询对象副本表, 获得repHash - //ids :={0} - //hashs := {repHash} - //若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID), - //查询缓存表,获得每个hash的nodeIps、TempOrPins、Times - //查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay) - //kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids - redundancy := "rep" - ecName := "ecName" - hashs := []string{"block1.json","block2.json"} - ids := []int{0,1} - fileSizeInBytes :=21 - - res:= rabbitmq.MoveRes{ - Redundancy: redundancy, - EcName : ecName, - Hashs : hashs, - Ids : ids, - FileSizeInBytes: int64(fileSizeInBytes), - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) -} + fmt.Println("CoorMove") + fmt.Println(command.BucketName) + //查询数据库,获取冗余类型,冗余参数 + //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes + //-若redundancy是rep,查询对象副本表, 获得repHash + //--ids :={0} + //--hashs := {repHash} + //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID), + //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times + //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay) + //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids + redundancy := "rep" + ecName := "ecName" + hashs := []string{"block1.json", "block2.json"} + ids := []int{0, 1} + fileSizeInBytes := 21 + res := rabbitmq.MoveRes{ + Redundancy: redundancy, + EcName: ecName, + Hashs: hashs, + Ids: ids, + FileSizeInBytes: int64(fileSizeInBytes), + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) +} func CoorEcWrite(command rabbitmq.EcWriteCommand) { - fmt.Println("CoorEcWrite") - fmt.Println(command.BucketName) - //jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp - //kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择ecN个,赋值给Ips - //jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入) - //返回消息 - res:= rabbitmq.WriteRes{ - Ips: []string{"localhost","localhost","localhost"}, - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) - + fmt.Println("CoorEcWrite") + fmt.Println(command.BucketName) + //jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp + //kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择ecN个,赋值给Ips + //jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入) + //返回消息 + res := rabbitmq.WriteRes{ + Ips: []string{"localhost", "localhost", "localhost"}, + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) + } func CoorEcWriteHash(command rabbitmq.WriteHashCommand) { - fmt.Println("CoorEcWriteHash") - fmt.Println(command.BucketName) - //jh:根据command中的信息,插入对象编码块表中的Hash字段,并完成缓存表的插入 - //返回消息 - res:= rabbitmq.WriteHashRes{ - MetaCode: 0, - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) + fmt.Println("CoorEcWriteHash") + fmt.Println(command.BucketName) + //jh:根据command中的信息,插入对象编码块表中的Hash字段,并完成缓存表的插入 + //返回消息 + res := rabbitmq.WriteHashRes{ + MetaCode: 0, + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) } func CoorRead(command rabbitmq.ReadCommand) { - fmt.Println("CoorRead") - fmt.Println(command.BucketName) - //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes - //若redundancy是rep,查询对象副本表, 获得repHash - //ids :={0} - //hashs := {repHash} - //若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID), - //查询缓存表,获得每个hash的nodeIps、TempOrPins、Times - //kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)确定hashs、ids - //返回消息 - res:= rabbitmq.ReadRes{ - Redundancy: "rep", - Ips: []string{"localhost","localhost"}, - Hashs: []string{"block1.json","block2.json"}, - BlockIds: []int{0,1}, - EcName: "ecName", - FileSizeInBytes: 21, - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) + fmt.Println("CoorRead") + fmt.Println(command.BucketName) + //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes + //-若redundancy是rep,查询对象副本表, 获得repHash + //--ids :={0} + //--hashs := {repHash} + //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID), + //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times + //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)确定hashs、ids + //返回消息 + res := rabbitmq.ReadRes{ + Redundancy: "rep", + Ips: []string{"localhost", "localhost"}, + Hashs: []string{"block1.json", "block2.json"}, + BlockIds: []int{0, 1}, + EcName: "ecName", + FileSizeInBytes: 21, + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) } func CoorRepWriteHash(command rabbitmq.WriteHashCommand) { - fmt.Println("CoorRepWriteHash") - fmt.Println(command.BucketName) - //jh:根据command中的信息,插入对象副本表中的Hash字段,并完成缓存表的插入 - //返回消息 - res:= rabbitmq.WriteHashRes{ - MetaCode: 0, - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) + fmt.Println("CoorRepWriteHash") + fmt.Println(command.BucketName) + //jh:根据command中的信息,插入对象副本表中的Hash字段,并完成缓存表的插入 + //返回消息 + res := rabbitmq.WriteHashRes{ + MetaCode: 0, + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) } func CoorRepWrite(command rabbitmq.RepWriteCommand) { - fmt.Println("CoorRepWrite") - fmt.Println(command.BucketName) - //jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp; - //kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择numRep个,赋值给Ips - //jh:完成对象表、对象副本表的插入(对象副本表的Hash字段先不插入) - //返回消息 - res:= rabbitmq.WriteRes{ - Ips: []string{"localhost","localhost","localhost"}, - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) -} \ No newline at end of file + fmt.Println("CoorRepWrite") + fmt.Println(command.BucketName) + //jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp; + //kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择numRep个,赋值给Ips + //jh:完成对象表、对象副本表的插入(对象副本表的Hash字段先不插入) + //返回消息 + res := rabbitmq.WriteRes{ + Ips: []string{"localhost", "localhost", "localhost"}, + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) +} + +func HeartReport(command rabbitmq.HeartReport) { + //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus +} diff --git a/coordinator b/coordinator index e099575..9caf0b0 100755 Binary files a/coordinator and b/coordinator differ diff --git a/main.go b/main.go index 16d92c9..1dbe5c5 100644 --- a/main.go +++ b/main.go @@ -1,98 +1,96 @@ package main import ( - //"context" - //"io" - "fmt" - //"path/filepath" - //"sync" - "rabbitmq" - "time" - "encoding/json" - //agentcaller "proto" - - //"github.com/pborman/uuid" - //"github.com/streadway/amqp" - - //"google.golang.org/grpc" - + //"context" + //"io" + "fmt" + //"path/filepath" + //"sync" + "encoding/json" + "rabbitmq" + "time" + //agentcaller "proto" + //"github.com/pborman/uuid" + //"github.com/streadway/amqp" + //"google.golang.org/grpc" ) func main() { - rabbit := rabbitmq.NewRabbitMQSimple("coorQueue") - msgs:=rabbit.ConsumeSimpleQos(time.Millisecond * 500) - forever := make(chan bool) - // 启用协程处理消息 - go func() { - for d := range msgs { - // 实现我们要处理的逻辑函数 - b1:=d.Body - commandType := string(b1[0:2]) - fmt.Println(commandType) - c1:=b1[2:] - switch commandType { - case "00": - var command rabbitmq.EcWriteCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - CoorEcWrite(command) - case "01": - var command rabbitmq.WriteHashCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - CoorEcWriteHash(command) - case "02": - var command rabbitmq.ReadCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - CoorRead(command) - case "03": - var command rabbitmq.RepWriteCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - CoorRepWrite(command) - case "04": - var command rabbitmq.WriteHashCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - CoorRepWriteHash(command) - case "05": - var command rabbitmq.MoveCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - CoorMove(command) - case "06": - var command rabbitmq.TempCacheReport - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - TempCacheReport(command) - } - //log.Printf("Received a message: %s", d.Body) - //time.Sleep(duration) - //如果为true表示确认所有未确认的消息 - //如果为false表示确认当前消息 - //执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失! - err := d.Ack(false) - if err != nil { - println(err) - } - } - }() - //log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C") - <-forever + rabbit := rabbitmq.NewRabbitMQSimple("coorQueue") + msgs := rabbit.ConsumeSimpleQos(time.Millisecond * 500) + forever := make(chan bool) + // 启用协程处理消息 + go func() { + for d := range msgs { + // 实现我们要处理的逻辑函数 + b1 := d.Body + commandType := string(b1[0:2]) + fmt.Println(commandType) + c1 := b1[2:] + switch commandType { + case "00": + var command rabbitmq.EcWriteCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorEcWrite(command) + case "01": + var command rabbitmq.WriteHashCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorEcWriteHash(command) + case "02": + var command rabbitmq.ReadCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorRead(command) + case "03": + var command rabbitmq.RepWriteCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorRepWrite(command) + case "04": + var command rabbitmq.WriteHashCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorRepWriteHash(command) + case "05": + var command rabbitmq.MoveCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorMove(command) + case "06": + var command rabbitmq.TempCacheReport + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + TempCacheReport(command) + case "07": + var command rabbitmq.HeartReport + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + HeartReport(command) + } + err := d.Ack(false) + if err != nil { + println(err) + } + } + }() + <-forever -} \ No newline at end of file +}