commit 8fe88e4d3947e2b9135ee524b1e886aac469ae7e Author: 17BaoH <963030965@qq.com> Date: Tue Mar 21 22:21:47 2023 +0800 完成RepWrite中的客户端,协调端通讯 diff --git a/commandServer.go b/commandServer.go new file mode 100644 index 0000000..5f75bde --- /dev/null +++ b/commandServer.go @@ -0,0 +1,68 @@ +package main + +import ( + //"context" + //"io" + "fmt" + //"path/filepath" + //"sync" + "encoding/json" + "strconv" + "rabbitmq" + + //agentcaller "proto" + + //"github.com/pborman/uuid" + //"github.com/streadway/amqp" + + //"google.golang.org/grpc" + +) + +func CoorEcWrite(command rabbitmq.EcWriteCommand) { + +} + +func CoorEcWriteHash(command rabbitmq.WriteHashCommand) { + +} + +func CoorEcRead(command rabbitmq.EcReadCommand) { + +} + +func CoorRepWrite(command rabbitmq.RepWriteCommand) { + fmt.Println(command.BucketName) + + //返回消息 + res:= rabbitmq.WriteRes{ + Ips: []string{"localhost","localhost","localhost"}, + } + c2,_:=json.Marshal(res) + + queueName := "clientQueue"+strconv.Itoa(command.UserId) + rabbit := rabbitmq.NewRabbitMQSimple(queueName) + fmt.Println(string(c2)) + rabbit.PublishSimple(c2) + rabbit.Destroy() +} + +func CoorRepWriteHash(command rabbitmq.WriteHashCommand) { + fmt.Println(command.BucketName) + + //返回消息 + res:= rabbitmq.WriteHashRes{ + MetaCode: 0, + } + c2,_:=json.Marshal(res) + + queueName := "clientQueue"+strconv.Itoa(command.UserId) + rabbit := rabbitmq.NewRabbitMQSimple(queueName) + fmt.Println(string(c2)) + rabbit.PublishSimple(c2) + rabbit.Destroy() +} + +func CoorRepRead(command rabbitmq.RepReadCommand) { + +} \ No newline at end of file diff --git a/coordinator b/coordinator new file mode 100755 index 0000000..ff4698f Binary files /dev/null and b/coordinator differ diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a7ed0f7 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module coordinator + +go 1.18 + +require proto v0.0.0 + +require ( + github.com/streadway/amqp v1.0.0 + rabbitmq v0.0.0 +) + +replace proto => ../proto + +replace rabbitmq => ../rabbitmq diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..75f2157 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/main.go b/main.go new file mode 100644 index 0000000..d016993 --- /dev/null +++ b/main.go @@ -0,0 +1,91 @@ +package main + +import ( + //"context" + //"io" + "fmt" + //"path/filepath" + //"sync" + "rabbitmq" + "time" + "encoding/json" + //agentcaller "proto" + + //"github.com/pborman/uuid" + //"github.com/streadway/amqp" + + //"google.golang.org/grpc" + +) + +func main() { + rabbit := rabbitmq.NewRabbitMQSimple("coorQueue") + msgs:=rabbit.ConsumeSimpleQos(time.Millisecond * 500) + forever := make(chan bool) + // 启用协程处理消息 + go func() { + for d := range msgs { + // 实现我们要处理的逻辑函数 + b1:=d.Body + commandType := string(b1[0:2]) + fmt.Println(commandType) + c1:=b1[2:] + switch commandType { + case "00": + var command rabbitmq.EcWriteCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorEcWrite(command) + case "01": + var command rabbitmq.WriteHashCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorEcWriteHash(command) + case "02": + var command rabbitmq.EcReadCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorEcRead(command) + case "03": + var command rabbitmq.RepWriteCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorRepWrite(command) + case "04": + var command rabbitmq.WriteHashCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorRepWriteHash(command) + case "05": + var command rabbitmq.RepReadCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + CoorRepRead(command) + } + //log.Printf("Received a message: %s", d.Body) + //time.Sleep(duration) + //如果为true表示确认所有未确认的消息 + //如果为false表示确认当前消息 + //执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失! + err := d.Ack(false) + if err != nil { + println(err) + } + } + }() + //log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C") + <-forever + +} \ No newline at end of file