Browse Source

增加scanner和心跳协议框架

gitlink
17BaoH 2 years ago
parent
commit
742299253a
3 changed files with 206 additions and 210 deletions
  1. +117
    -119
      commandServer.go
  2. BIN
      coordinator
  3. +89
    -91
      main.go

+ 117
- 119
commandServer.go View File

@@ -1,145 +1,143 @@
package main

import (
//"context"
//"io"
"fmt"
//"path/filepath"
//"sync"
"encoding/json"
"strconv"
"rabbitmq"

//agentcaller "proto"

//"github.com/pborman/uuid"
//"github.com/streadway/amqp"

//"google.golang.org/grpc"

//"context"
//"io"
"fmt"
//"path/filepath"
//"sync"
"encoding/json"
"rabbitmq"
"strconv"
//agentcaller "proto"
//"github.com/pborman/uuid"
//"github.com/streadway/amqp"
//"google.golang.org/grpc"
)

func rabbitSend(c []byte, userId int){
queueName := "coorClientQueue"+strconv.Itoa(userId)
rabbit := rabbitmq.NewRabbitMQSimple(queueName)
fmt.Println(string(c))
rabbit.PublishSimple(c)
rabbit.Destroy()
func rabbitSend(c []byte, userId int) {
queueName := "coorClientQueue" + strconv.Itoa(userId)
rabbit := rabbitmq.NewRabbitMQSimple(queueName)
fmt.Println(string(c))
rabbit.PublishSimple(c)
rabbit.Destroy()
}


func TempCacheReport(command rabbitmq.TempCacheReport) {
fmt.Println("TempCacheReport")
fmt.Println(command.Hashs)
fmt.Println(command.Ip)
//返回消息
//jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳
//如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作
//如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time
fmt.Println("TempCacheReport")
fmt.Println(command.Hashs)
fmt.Println(command.Ip)
//返回消息
//jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳
//-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作
//-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time
}

func CoorMove(command rabbitmq.MoveCommand) {
fmt.Println("CoorMove")
fmt.Println(command.BucketName)
//查询数据库,获取冗余类型,冗余参数
//jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
//若redundancy是rep,查询对象副本表, 获得repHash
//ids :={0}
//hashs := {repHash}
//若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
//查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
//查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
//kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
redundancy := "rep"
ecName := "ecName"
hashs := []string{"block1.json","block2.json"}
ids := []int{0,1}
fileSizeInBytes :=21
res:= rabbitmq.MoveRes{
Redundancy: redundancy,
EcName : ecName,
Hashs : hashs,
Ids : ids,
FileSizeInBytes: int64(fileSizeInBytes),
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
}
fmt.Println("CoorMove")
fmt.Println(command.BucketName)
//查询数据库,获取冗余类型,冗余参数
//jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
//-若redundancy是rep,查询对象副本表, 获得repHash
//--ids :={0}
//--hashs := {repHash}
//-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
//--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
//--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
//--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
redundancy := "rep"
ecName := "ecName"
hashs := []string{"block1.json", "block2.json"}
ids := []int{0, 1}
fileSizeInBytes := 21

res := rabbitmq.MoveRes{
Redundancy: redundancy,
EcName: ecName,
Hashs: hashs,
Ids: ids,
FileSizeInBytes: int64(fileSizeInBytes),
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
}

func CoorEcWrite(command rabbitmq.EcWriteCommand) {
fmt.Println("CoorEcWrite")
fmt.Println(command.BucketName)
//jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp
//kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择ecN个,赋值给Ips
//jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入)
//返回消息
res:= rabbitmq.WriteRes{
Ips: []string{"localhost","localhost","localhost"},
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
fmt.Println("CoorEcWrite")
fmt.Println(command.BucketName)
//jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp
//kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择ecN个,赋值给Ips
//jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入)
//返回消息
res := rabbitmq.WriteRes{
Ips: []string{"localhost", "localhost", "localhost"},
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
}

func CoorEcWriteHash(command rabbitmq.WriteHashCommand) {
fmt.Println("CoorEcWriteHash")
fmt.Println(command.BucketName)
//jh:根据command中的信息,插入对象编码块表中的Hash字段,并完成缓存表的插入
//返回消息
res:= rabbitmq.WriteHashRes{
MetaCode: 0,
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
fmt.Println("CoorEcWriteHash")
fmt.Println(command.BucketName)
//jh:根据command中的信息,插入对象编码块表中的Hash字段,并完成缓存表的插入
//返回消息
res := rabbitmq.WriteHashRes{
MetaCode: 0,
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
}

func CoorRead(command rabbitmq.ReadCommand) {
fmt.Println("CoorRead")
fmt.Println(command.BucketName)
//jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
//若redundancy是rep,查询对象副本表, 获得repHash
//ids :={0}
//hashs := {repHash}
//若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
//查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
//kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)确定hashs、ids
//返回消息
res:= rabbitmq.ReadRes{
Redundancy: "rep",
Ips: []string{"localhost","localhost"},
Hashs: []string{"block1.json","block2.json"},
BlockIds: []int{0,1},
EcName: "ecName",
FileSizeInBytes: 21,
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
fmt.Println("CoorRead")
fmt.Println(command.BucketName)
//jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
//-若redundancy是rep,查询对象副本表, 获得repHash
//--ids :={0}
//--hashs := {repHash}
//-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
//--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
//--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)确定hashs、ids
//返回消息
res := rabbitmq.ReadRes{
Redundancy: "rep",
Ips: []string{"localhost", "localhost"},
Hashs: []string{"block1.json", "block2.json"},
BlockIds: []int{0, 1},
EcName: "ecName",
FileSizeInBytes: 21,
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
}

func CoorRepWriteHash(command rabbitmq.WriteHashCommand) {
fmt.Println("CoorRepWriteHash")
fmt.Println(command.BucketName)
//jh:根据command中的信息,插入对象副本表中的Hash字段,并完成缓存表的插入
//返回消息
res:= rabbitmq.WriteHashRes{
MetaCode: 0,
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
fmt.Println("CoorRepWriteHash")
fmt.Println(command.BucketName)
//jh:根据command中的信息,插入对象副本表中的Hash字段,并完成缓存表的插入
//返回消息
res := rabbitmq.WriteHashRes{
MetaCode: 0,
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
}

func CoorRepWrite(command rabbitmq.RepWriteCommand) {
fmt.Println("CoorRepWrite")
fmt.Println(command.BucketName)
//jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp;
//kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择numRep个,赋值给Ips
//jh:完成对象表、对象副本表的插入(对象副本表的Hash字段先不插入)
//返回消息
res:= rabbitmq.WriteRes{
Ips: []string{"localhost","localhost","localhost"},
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
}
fmt.Println("CoorRepWrite")
fmt.Println(command.BucketName)
//jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp;
//kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择numRep个,赋值给Ips
//jh:完成对象表、对象副本表的插入(对象副本表的Hash字段先不插入)
//返回消息
res := rabbitmq.WriteRes{
Ips: []string{"localhost", "localhost", "localhost"},
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
}

func HeartReport(command rabbitmq.HeartReport) {
//jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus
}

BIN
coordinator View File


+ 89
- 91
main.go View File

@@ -1,98 +1,96 @@
package main

import (
//"context"
//"io"
"fmt"
//"path/filepath"
//"sync"
"rabbitmq"
"time"
"encoding/json"
//agentcaller "proto"

//"github.com/pborman/uuid"
//"github.com/streadway/amqp"

//"google.golang.org/grpc"

//"context"
//"io"
"fmt"
//"path/filepath"
//"sync"
"encoding/json"
"rabbitmq"
"time"
//agentcaller "proto"
//"github.com/pborman/uuid"
//"github.com/streadway/amqp"
//"google.golang.org/grpc"
)

func main() {
rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
msgs:=rabbit.ConsumeSimpleQos(time.Millisecond * 500)
forever := make(chan bool)
// 启用协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑函数
b1:=d.Body
commandType := string(b1[0:2])
fmt.Println(commandType)
c1:=b1[2:]
switch commandType {
case "00":
var command rabbitmq.EcWriteCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorEcWrite(command)
case "01":
var command rabbitmq.WriteHashCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorEcWriteHash(command)
case "02":
var command rabbitmq.ReadCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRead(command)
case "03":
var command rabbitmq.RepWriteCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRepWrite(command)
case "04":
var command rabbitmq.WriteHashCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRepWriteHash(command)
case "05":
var command rabbitmq.MoveCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorMove(command)
case "06":
var command rabbitmq.TempCacheReport
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
TempCacheReport(command)
}
//log.Printf("Received a message: %s", d.Body)
//time.Sleep(duration)
//如果为true表示确认所有未确认的消息
//如果为false表示确认当前消息
//执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失!
err := d.Ack(false)
if err != nil {
println(err)
}
}
}()
//log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C")
<-forever
rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
msgs := rabbit.ConsumeSimpleQos(time.Millisecond * 500)
forever := make(chan bool)
// 启用协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑函数
b1 := d.Body
commandType := string(b1[0:2])
fmt.Println(commandType)
c1 := b1[2:]
switch commandType {
case "00":
var command rabbitmq.EcWriteCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorEcWrite(command)
case "01":
var command rabbitmq.WriteHashCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorEcWriteHash(command)
case "02":
var command rabbitmq.ReadCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRead(command)
case "03":
var command rabbitmq.RepWriteCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRepWrite(command)
case "04":
var command rabbitmq.WriteHashCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRepWriteHash(command)
case "05":
var command rabbitmq.MoveCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorMove(command)
case "06":
var command rabbitmq.TempCacheReport
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
TempCacheReport(command)
case "07":
var command rabbitmq.HeartReport
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
HeartReport(command)
}
err := d.Ack(false)
if err != nil {
println(err)
}
}
}()
<-forever

}
}

Loading…
Cancel
Save