diff --git a/agent b/agent index a672dbe..9149aa0 100755 Binary files a/agent and b/agent differ diff --git a/commandHandle.go b/commandHandle.go index 6397f74..ed4398b 100644 --- a/commandHandle.go +++ b/commandHandle.go @@ -1,63 +1,63 @@ package main + import ( - //"context" - //"log" - //"os" - //"io" - //"fmt" - //"path/filepath" - //agentserver "proto" - "sync" - "rabbitmq" - "time" - "fmt" - "encoding/json" - //"net" + //"context" + //"log" + //"os" + //"io" + //"fmt" + //"path/filepath" + //agentserver "proto" + "encoding/json" + "fmt" + "rabbitmq" + "sync" + "time" //"google.golang.org/grpc" ) - func commandHandle(wg *sync.WaitGroup) { - fmt.Println("commandHandle") - rabbit := rabbitmq.NewRabbitMQSimple("agentQueue"+LocalIp) - msgs:=rabbit.ConsumeSimpleQos(time.Millisecond * 500) - forever := make(chan bool) - // 启用协程处理消息 - go func() { - for d := range msgs { - // 实现我们要处理的逻辑函数 - b1:=d.Body - commandType := string(b1[0:2]) - fmt.Println(commandType) - c1:=b1[2:] - switch commandType { - case "00": - var command rabbitmq.RepMoveCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - RepMove(command) - case "01": - var command rabbitmq.EcMoveCommand - err := json.Unmarshal(c1, &command) - if err != nil { - fmt.Printf("json.Unmarshal failed, err:%v\n", err) - } - EcMove(command) - } - //log.Printf("Received a message: %s", d.Body) - //time.Sleep(duration) - //如果为true表示确认所有未确认的消息 - //如果为false表示确认当前消息 - //执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失! - err := d.Ack(false) - if err != nil { - println(err) - } - } - }() - //log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C") - <-forever - wg.Done() -} \ No newline at end of file + fmt.Println("commandHandle") + rabbit := rabbitmq.NewRabbitMQSimple("agentQueue" + LocalIp) + msgs := rabbit.ConsumeSimpleQos(time.Millisecond * 500) + forever := make(chan bool) + // 启用协程处理消息 + go func() { + for d := range msgs { + // 实现我们要处理的逻辑函数 + b1 := d.Body + commandType := string(b1[0:2]) + fmt.Println(commandType) + c1 := b1[2:] + switch commandType { + case "00": + var command rabbitmq.RepMoveCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + RepMove(command) + case "01": + var command rabbitmq.EcMoveCommand + err := json.Unmarshal(c1, &command) + if err != nil { + fmt.Printf("json.Unmarshal failed, err:%v\n", err) + } + EcMove(command) + } + + //log.Printf("Received a message: %s", d.Body) + //time.Sleep(duration) + //如果为true表示确认所有未确认的消息 + //如果为false表示确认当前消息 + //执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失! + err := d.Ack(false) + if err != nil { + println(err) + } + } + }() + //log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C") + <-forever + wg.Done() +} diff --git a/commandSever.go b/commandSever.go index 8654929..2ab5dc9 100644 --- a/commandSever.go +++ b/commandSever.go @@ -1,123 +1,118 @@ package main import ( - //"context" - - "fmt" - "path/filepath" - //"sync" - "encoding/json" - "strconv" - "rabbitmq" - "os" + //"context" - //agentcaller "proto" - - //"github.com/pborman/uuid" - //"github.com/streadway/amqp" - - //"google.golang.org/grpc" + "fmt" + "path/filepath" + //"sync" + "encoding/json" + "os" + "rabbitmq" + "strconv" + //agentcaller "proto" + //"github.com/pborman/uuid" + //"github.com/streadway/amqp" + //"google.golang.org/grpc" ) -func rabbitSend(c []byte, userId int){ - queueName := "agentClientQueue"+strconv.Itoa(userId) - rabbit := rabbitmq.NewRabbitMQSimple(queueName) - fmt.Println(string(c)) - rabbit.PublishSimple(c) - rabbit.Destroy() +func rabbitSend(c []byte, userId int) { + queueName := "agentClientQueue" + strconv.Itoa(userId) + rabbit := rabbitmq.NewRabbitMQSimple(queueName) + fmt.Println(string(c)) + rabbit.PublishSimple(c) + rabbit.Destroy() } func RepMove(command rabbitmq.RepMoveCommand) { - fmt.Println("RepMove") - fmt.Println(command.Hashs) - hashs:=command.Hashs - //执行调度操作 - ipfsDir := "assets" - goalDir := "assets2" - goalName := command.BucketName+":"+command.ObjectName+":"+strconv.Itoa(command.UserId) - //目标文件 - fDir, err := os.Executable() - if err != nil { - panic(err) - } - fURL := filepath.Join(filepath.Dir(fDir), goalDir) - - _, err = os.Stat(fURL) - if os.IsNotExist(err) { - os.MkdirAll(fURL, os.ModePerm) - } - fURL = filepath.Join(fURL, goalName) - outFile, err := os.Create(fURL) - - fmt.Println(fURL) - //源文件 - fURL = filepath.Join(filepath.Dir(fDir), ipfsDir) - fURL = filepath.Join(fURL, hashs[0]) - inFile, _ := os.Open(fURL) - fmt.Println(fURL) - fileInfo, _ := inFile.Stat() + fmt.Println("RepMove") + fmt.Println(command.Hashs) + hashs := command.Hashs + //执行调度操作 + ipfsDir := "assets" + goalDir := "assets2" + goalName := command.BucketName + ":" + command.ObjectName + ":" + strconv.Itoa(command.UserId) + //目标文件 + fDir, err := os.Executable() + if err != nil { + panic(err) + } + fURL := filepath.Join(filepath.Dir(fDir), goalDir) + + _, err = os.Stat(fURL) + if os.IsNotExist(err) { + os.MkdirAll(fURL, os.ModePerm) + } + fURL = filepath.Join(fURL, goalName) + outFile, err := os.Create(fURL) + + fmt.Println(fURL) + //源文件 + fURL = filepath.Join(filepath.Dir(fDir), ipfsDir) + fURL = filepath.Join(fURL, hashs[0]) + inFile, _ := os.Open(fURL) + fmt.Println(fURL) + fileInfo, _ := inFile.Stat() fileSizeInBytes := fileInfo.Size() - numWholePacket := fileSizeInBytes/packetSizeInBytes - lastPacketInBytes:=fileSizeInBytes%packetSizeInBytes - fmt.Println(fileSizeInBytes) - fmt.Println(numWholePacket) - fmt.Println(lastPacketInBytes) - for i:=0;int64(i)0 { - buf := make([]byte, lastPacketInBytes) - inFile.Read(buf) - outFile.Write(buf) - } - inFile.Close() - outFile.Close() - //返回消息 - res:= rabbitmq.AgentMoveRes{ - MoveCode: 0, - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) - //向coor报告临时缓存hash - command1 := rabbitmq.TempCacheReport{ - Ip : LocalIp, - Hashs: hashs, - } - c,_=json.Marshal(command1) - b:=append([]byte("06"),c...) - fmt.Println(b) - rabbit := rabbitmq.NewRabbitMQSimple("coorQueue") - rabbit.PublishSimple(b) - rabbit.Destroy() + numWholePacket := fileSizeInBytes / packetSizeInBytes + lastPacketInBytes := fileSizeInBytes % packetSizeInBytes + fmt.Println(fileSizeInBytes) + fmt.Println(numWholePacket) + fmt.Println(lastPacketInBytes) + for i := 0; int64(i) < numWholePacket; i++ { + buf := make([]byte, packetSizeInBytes) + inFile.Read(buf) + outFile.Write(buf) + } + if lastPacketInBytes > 0 { + buf := make([]byte, lastPacketInBytes) + inFile.Read(buf) + outFile.Write(buf) + } + inFile.Close() + outFile.Close() + //返回消息 + res := rabbitmq.AgentMoveRes{ + MoveCode: 0, + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) + //向coor报告临时缓存hash + command1 := rabbitmq.TempCacheReport{ + Ip: LocalIp, + Hashs: hashs, + } + c, _ = json.Marshal(command1) + b := append([]byte("06"), c...) + fmt.Println(b) + rabbit := rabbitmq.NewRabbitMQSimple("coorQueue") + rabbit.PublishSimple(b) + rabbit.Destroy() } - func EcMove(command rabbitmq.EcMoveCommand) { - fmt.Println("EcMove") - fmt.Println(command.Hashs) - hashs:=command.Hashs + fmt.Println("EcMove") + fmt.Println(command.Hashs) + hashs := command.Hashs - //执行调度操作 + //执行调度操作 - //返回消息 - res:= rabbitmq.WriteHashRes{ - MetaCode: 0, - } - c,_:=json.Marshal(res) - rabbitSend(c, command.UserId) - //向coor报告临时缓存hash - command1:= rabbitmq.TempCacheReport{ - Ip : LocalIp, - Hashs: hashs, - } - c,_=json.Marshal(command1) - b:=append([]byte("06"),c...) - fmt.Println(b) - rabbit := rabbitmq.NewRabbitMQSimple("coorQueue") - rabbit.PublishSimple(b) - rabbit.Destroy() + //返回消息 + res := rabbitmq.WriteHashRes{ + MetaCode: 0, + } + c, _ := json.Marshal(res) + rabbitSend(c, command.UserId) + //向coor报告临时缓存hash + command1 := rabbitmq.TempCacheReport{ + Ip: LocalIp, + Hashs: hashs, + } + c, _ = json.Marshal(command1) + b := append([]byte("06"), c...) + fmt.Println(b) + rabbit := rabbitmq.NewRabbitMQSimple("coorQueue") + rabbit.PublishSimple(b) + rabbit.Destroy() } - diff --git a/heartReporter.go b/heartReporter.go new file mode 100644 index 0000000..f4a010b --- /dev/null +++ b/heartReporter.go @@ -0,0 +1,46 @@ +package main + +import ( + //"context" + //"log" + //"os" + //"io" + //"fmt" + //"path/filepath" + //agentserver "proto" + + "encoding/json" + "fmt" + "rabbitmq" + "sync" + "time" + //"google.golang.org/grpc" +) + +func heartReport(wg *sync.WaitGroup) { + rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue") + + for { + //挨个ping其他agent(AgentIpList),记录延迟到AgentDelay + agentDelay := []int{10, 100, 200} + //访问ipfs,记录是否能正常访问,记录到ipfsStatus + ipfsStatus := true + //访问自身资源目录(obs,minio等),记录是否正常,记录到localDirStatus + localDirStatus := true + //发送心跳 + command := rabbitmq.HeartReport{ + Ip: "localhost", + AgentDelay: agentDelay, + IpfsStatus: ipfsStatus, + LocalDirStatus: localDirStatus, + } + c, _ := json.Marshal(command) + b := append([]byte("07"), c...) + fmt.Println(string(b)) + rabbit1.PublishSimple(b) + + time.Sleep(time.Minute * 5) + } + rabbit1.Destroy() + wg.Done() +} diff --git a/main.go b/main.go index 21c4dc1..9fe03a6 100644 --- a/main.go +++ b/main.go @@ -1,38 +1,42 @@ package main import ( - //"context" -// "log" -// "os" -// "io" -// "fmt" -// "path/filepath" - agentserver "proto" - "sync" + //"context" + // "log" + // "os" + // "io" + // "fmt" + // "path/filepath" "net" + agentserver "proto" + "sync" + "google.golang.org/grpc" ) const ( - Port = ":5010" - packetSizeInBytes=10 - LocalIp = "localhost" + Port = ":5010" + packetSizeInBytes = 10 + LocalIp = "localhost" ) +var AgentIpList []string -func main(){ - //处置协调端、客户端命令(可多建几个) - wg := sync.WaitGroup{} - wg.Add(1) - go commandHandle(&wg) - //面向客户端收发数据 - lis, err := net.Listen("tcp", Port) - if err != nil { - panic(err) - } - s := grpc.NewServer() - agentserver.RegisterTranBlockOrReplicaServer(s, &anyOne{}) - s.Serve(lis) - wg.Wait() - // -} \ No newline at end of file +func main() { + AgentIpList = []string{"pcm01", "pcm1", "pcm2"} + //处置协调端、客户端命令(可多建几个) + wg := sync.WaitGroup{} + wg.Add(2) + go commandHandle(&wg) + go heartReport(&wg) //网络延迟感知 + //面向客户端收发数据 + lis, err := net.Listen("tcp", Port) + if err != nil { + panic(err) + } + s := grpc.NewServer() + agentserver.RegisterTranBlockOrReplicaServer(s, &anyOne{}) + s.Serve(lis) + wg.Wait() + // +}