Browse Source

完成RepWrite中的客户端,协调端通讯

gitlink
17BaoH 2 years ago
commit
8fe88e4d39
5 changed files with 175 additions and 0 deletions
  1. +68
    -0
      commandServer.go
  2. BIN
      coordinator
  3. +14
    -0
      go.mod
  4. +2
    -0
      go.sum
  5. +91
    -0
      main.go

+ 68
- 0
commandServer.go View File

@@ -0,0 +1,68 @@
package main

import (
//"context"
//"io"
"fmt"
//"path/filepath"
//"sync"
"encoding/json"
"strconv"
"rabbitmq"

//agentcaller "proto"

//"github.com/pborman/uuid"
//"github.com/streadway/amqp"

//"google.golang.org/grpc"

)

func CoorEcWrite(command rabbitmq.EcWriteCommand) {
}

func CoorEcWriteHash(command rabbitmq.WriteHashCommand) {

}

func CoorEcRead(command rabbitmq.EcReadCommand) {

}

func CoorRepWrite(command rabbitmq.RepWriteCommand) {
fmt.Println(command.BucketName)
//返回消息
res:= rabbitmq.WriteRes{
Ips: []string{"localhost","localhost","localhost"},
}
c2,_:=json.Marshal(res)
queueName := "clientQueue"+strconv.Itoa(command.UserId)
rabbit := rabbitmq.NewRabbitMQSimple(queueName)
fmt.Println(string(c2))
rabbit.PublishSimple(c2)
rabbit.Destroy()
}

func CoorRepWriteHash(command rabbitmq.WriteHashCommand) {
fmt.Println(command.BucketName)
//返回消息
res:= rabbitmq.WriteHashRes{
MetaCode: 0,
}
c2,_:=json.Marshal(res)
queueName := "clientQueue"+strconv.Itoa(command.UserId)
rabbit := rabbitmq.NewRabbitMQSimple(queueName)
fmt.Println(string(c2))
rabbit.PublishSimple(c2)
rabbit.Destroy()
}

func CoorRepRead(command rabbitmq.RepReadCommand) {

}

BIN
coordinator View File


+ 14
- 0
go.mod View File

@@ -0,0 +1,14 @@
module coordinator

go 1.18

require proto v0.0.0

require (
github.com/streadway/amqp v1.0.0
rabbitmq v0.0.0
)

replace proto => ../proto

replace rabbitmq => ../rabbitmq

+ 2
- 0
go.sum View File

@@ -0,0 +1,2 @@
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=

+ 91
- 0
main.go View File

@@ -0,0 +1,91 @@
package main

import (
//"context"
//"io"
"fmt"
//"path/filepath"
//"sync"
"rabbitmq"
"time"
"encoding/json"
//agentcaller "proto"

//"github.com/pborman/uuid"
//"github.com/streadway/amqp"

//"google.golang.org/grpc"

)

func main() {
rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
msgs:=rabbit.ConsumeSimpleQos(time.Millisecond * 500)
forever := make(chan bool)
// 启用协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑函数
b1:=d.Body
commandType := string(b1[0:2])
fmt.Println(commandType)
c1:=b1[2:]
switch commandType {
case "00":
var command rabbitmq.EcWriteCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorEcWrite(command)
case "01":
var command rabbitmq.WriteHashCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorEcWriteHash(command)
case "02":
var command rabbitmq.EcReadCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorEcRead(command)
case "03":
var command rabbitmq.RepWriteCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRepWrite(command)
case "04":
var command rabbitmq.WriteHashCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRepWriteHash(command)
case "05":
var command rabbitmq.RepReadCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
CoorRepRead(command)
}
//log.Printf("Received a message: %s", d.Body)
//time.Sleep(duration)
//如果为true表示确认所有未确认的消息
//如果为false表示确认当前消息
//执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失!
err := d.Ack(false)
if err != nil {
println(err)
}
}
}()
//log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C")
<-forever

}

Loading…
Cancel
Save