Browse Source

增加scanner和心跳协议框架

gitlink
17BaoH 2 years ago
parent
commit
72ec2653bc
5 changed files with 235 additions and 190 deletions
  1. BIN
      agent
  2. +57
    -57
      commandHandle.go
  3. +101
    -106
      commandSever.go
  4. +46
    -0
      heartReporter.go
  5. +31
    -27
      main.go

BIN
agent View File


+ 57
- 57
commandHandle.go View File

@@ -1,63 +1,63 @@
package main

import (
//"context"
//"log"
//"os"
//"io"
//"fmt"
//"path/filepath"
//agentserver "proto"
"sync"
"rabbitmq"
"time"
"fmt"
"encoding/json"
//"net"
//"context"
//"log"
//"os"
//"io"
//"fmt"
//"path/filepath"
//agentserver "proto"
"encoding/json"
"fmt"
"rabbitmq"
"sync"
"time"
//"google.golang.org/grpc"
)


func commandHandle(wg *sync.WaitGroup) {
fmt.Println("commandHandle")
rabbit := rabbitmq.NewRabbitMQSimple("agentQueue"+LocalIp)
msgs:=rabbit.ConsumeSimpleQos(time.Millisecond * 500)
forever := make(chan bool)
// 启用协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑函数
b1:=d.Body
commandType := string(b1[0:2])
fmt.Println(commandType)
c1:=b1[2:]
switch commandType {
case "00":
var command rabbitmq.RepMoveCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
RepMove(command)
case "01":
var command rabbitmq.EcMoveCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
EcMove(command)
}
//log.Printf("Received a message: %s", d.Body)
//time.Sleep(duration)
//如果为true表示确认所有未确认的消息
//如果为false表示确认当前消息
//执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失!
err := d.Ack(false)
if err != nil {
println(err)
}
}
}()
//log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C")
<-forever
wg.Done()
}
fmt.Println("commandHandle")
rabbit := rabbitmq.NewRabbitMQSimple("agentQueue" + LocalIp)
msgs := rabbit.ConsumeSimpleQos(time.Millisecond * 500)
forever := make(chan bool)
// 启用协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑函数
b1 := d.Body
commandType := string(b1[0:2])
fmt.Println(commandType)
c1 := b1[2:]
switch commandType {
case "00":
var command rabbitmq.RepMoveCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
RepMove(command)
case "01":
var command rabbitmq.EcMoveCommand
err := json.Unmarshal(c1, &command)
if err != nil {
fmt.Printf("json.Unmarshal failed, err:%v\n", err)
}
EcMove(command)
}

//log.Printf("Received a message: %s", d.Body)
//time.Sleep(duration)
//如果为true表示确认所有未确认的消息
//如果为false表示确认当前消息
//执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失!
err := d.Ack(false)
if err != nil {
println(err)
}
}
}()
//log.Printf("[*] waiting for messages, [退出请按]To exit press CTRL+C")
<-forever
wg.Done()
}

+ 101
- 106
commandSever.go View File

@@ -1,123 +1,118 @@
package main

import (
//"context"
"fmt"
"path/filepath"
//"sync"
"encoding/json"
"strconv"
"rabbitmq"
"os"
//"context"

//agentcaller "proto"

//"github.com/pborman/uuid"
//"github.com/streadway/amqp"

//"google.golang.org/grpc"
"fmt"
"path/filepath"

//"sync"
"encoding/json"
"os"
"rabbitmq"
"strconv"
//agentcaller "proto"
//"github.com/pborman/uuid"
//"github.com/streadway/amqp"
//"google.golang.org/grpc"
)

func rabbitSend(c []byte, userId int){
queueName := "agentClientQueue"+strconv.Itoa(userId)
rabbit := rabbitmq.NewRabbitMQSimple(queueName)
fmt.Println(string(c))
rabbit.PublishSimple(c)
rabbit.Destroy()
func rabbitSend(c []byte, userId int) {
queueName := "agentClientQueue" + strconv.Itoa(userId)
rabbit := rabbitmq.NewRabbitMQSimple(queueName)
fmt.Println(string(c))
rabbit.PublishSimple(c)
rabbit.Destroy()
}

func RepMove(command rabbitmq.RepMoveCommand) {
fmt.Println("RepMove")
fmt.Println(command.Hashs)
hashs:=command.Hashs
//执行调度操作
ipfsDir := "assets"
goalDir := "assets2"
goalName := command.BucketName+":"+command.ObjectName+":"+strconv.Itoa(command.UserId)
//目标文件
fDir, err := os.Executable()
if err != nil {
panic(err)
}
fURL := filepath.Join(filepath.Dir(fDir), goalDir)
_, err = os.Stat(fURL)
if os.IsNotExist(err) {
os.MkdirAll(fURL, os.ModePerm)
}
fURL = filepath.Join(fURL, goalName)
outFile, err := os.Create(fURL)
fmt.Println(fURL)
//源文件
fURL = filepath.Join(filepath.Dir(fDir), ipfsDir)
fURL = filepath.Join(fURL, hashs[0])
inFile, _ := os.Open(fURL)
fmt.Println(fURL)
fileInfo, _ := inFile.Stat()
fmt.Println("RepMove")
fmt.Println(command.Hashs)
hashs := command.Hashs
//执行调度操作
ipfsDir := "assets"
goalDir := "assets2"
goalName := command.BucketName + ":" + command.ObjectName + ":" + strconv.Itoa(command.UserId)
//目标文件
fDir, err := os.Executable()
if err != nil {
panic(err)
}
fURL := filepath.Join(filepath.Dir(fDir), goalDir)
_, err = os.Stat(fURL)
if os.IsNotExist(err) {
os.MkdirAll(fURL, os.ModePerm)
}
fURL = filepath.Join(fURL, goalName)
outFile, err := os.Create(fURL)
fmt.Println(fURL)
//源文件
fURL = filepath.Join(filepath.Dir(fDir), ipfsDir)
fURL = filepath.Join(fURL, hashs[0])
inFile, _ := os.Open(fURL)
fmt.Println(fURL)
fileInfo, _ := inFile.Stat()
fileSizeInBytes := fileInfo.Size()
numWholePacket := fileSizeInBytes/packetSizeInBytes
lastPacketInBytes:=fileSizeInBytes%packetSizeInBytes
fmt.Println(fileSizeInBytes)
fmt.Println(numWholePacket)
fmt.Println(lastPacketInBytes)
for i:=0;int64(i)<numWholePacket;i++ {
buf := make([]byte, packetSizeInBytes)
inFile.Read(buf)
outFile.Write(buf)
}
if lastPacketInBytes>0 {
buf := make([]byte, lastPacketInBytes)
inFile.Read(buf)
outFile.Write(buf)
}
inFile.Close()
outFile.Close()
//返回消息
res:= rabbitmq.AgentMoveRes{
MoveCode: 0,
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
//向coor报告临时缓存hash
command1 := rabbitmq.TempCacheReport{
Ip : LocalIp,
Hashs: hashs,
}
c,_=json.Marshal(command1)
b:=append([]byte("06"),c...)
fmt.Println(b)
rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
rabbit.PublishSimple(b)
rabbit.Destroy()
numWholePacket := fileSizeInBytes / packetSizeInBytes
lastPacketInBytes := fileSizeInBytes % packetSizeInBytes
fmt.Println(fileSizeInBytes)
fmt.Println(numWholePacket)
fmt.Println(lastPacketInBytes)
for i := 0; int64(i) < numWholePacket; i++ {
buf := make([]byte, packetSizeInBytes)
inFile.Read(buf)
outFile.Write(buf)
}
if lastPacketInBytes > 0 {
buf := make([]byte, lastPacketInBytes)
inFile.Read(buf)
outFile.Write(buf)
}
inFile.Close()
outFile.Close()
//返回消息
res := rabbitmq.AgentMoveRes{
MoveCode: 0,
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
//向coor报告临时缓存hash
command1 := rabbitmq.TempCacheReport{
Ip: LocalIp,
Hashs: hashs,
}
c, _ = json.Marshal(command1)
b := append([]byte("06"), c...)
fmt.Println(b)
rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
rabbit.PublishSimple(b)
rabbit.Destroy()
}


func EcMove(command rabbitmq.EcMoveCommand) {
fmt.Println("EcMove")
fmt.Println(command.Hashs)
hashs:=command.Hashs
fmt.Println("EcMove")
fmt.Println(command.Hashs)
hashs := command.Hashs

//执行调度操作
//执行调度操作

//返回消息
res:= rabbitmq.WriteHashRes{
MetaCode: 0,
}
c,_:=json.Marshal(res)
rabbitSend(c, command.UserId)
//向coor报告临时缓存hash
command1:= rabbitmq.TempCacheReport{
Ip : LocalIp,
Hashs: hashs,
}
c,_=json.Marshal(command1)
b:=append([]byte("06"),c...)
fmt.Println(b)
rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
rabbit.PublishSimple(b)
rabbit.Destroy()
//返回消息
res := rabbitmq.WriteHashRes{
MetaCode: 0,
}
c, _ := json.Marshal(res)
rabbitSend(c, command.UserId)
//向coor报告临时缓存hash
command1 := rabbitmq.TempCacheReport{
Ip: LocalIp,
Hashs: hashs,
}
c, _ = json.Marshal(command1)
b := append([]byte("06"), c...)
fmt.Println(b)
rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
rabbit.PublishSimple(b)
rabbit.Destroy()
}


+ 46
- 0
heartReporter.go View File

@@ -0,0 +1,46 @@
package main

import (
//"context"
//"log"
//"os"
//"io"
//"fmt"
//"path/filepath"
//agentserver "proto"

"encoding/json"
"fmt"
"rabbitmq"
"sync"
"time"
//"google.golang.org/grpc"
)

func heartReport(wg *sync.WaitGroup) {
rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")

for {
//挨个ping其他agent(AgentIpList),记录延迟到AgentDelay
agentDelay := []int{10, 100, 200}
//访问ipfs,记录是否能正常访问,记录到ipfsStatus
ipfsStatus := true
//访问自身资源目录(obs,minio等),记录是否正常,记录到localDirStatus
localDirStatus := true
//发送心跳
command := rabbitmq.HeartReport{
Ip: "localhost",
AgentDelay: agentDelay,
IpfsStatus: ipfsStatus,
LocalDirStatus: localDirStatus,
}
c, _ := json.Marshal(command)
b := append([]byte("07"), c...)
fmt.Println(string(b))
rabbit1.PublishSimple(b)

time.Sleep(time.Minute * 5)
}
rabbit1.Destroy()
wg.Done()
}

+ 31
- 27
main.go View File

@@ -1,38 +1,42 @@
package main

import (
//"context"
// "log"
// "os"
// "io"
// "fmt"
// "path/filepath"
agentserver "proto"
"sync"
//"context"
// "log"
// "os"
// "io"
// "fmt"
// "path/filepath"
"net"
agentserver "proto"
"sync"

"google.golang.org/grpc"
)

const (
Port = ":5010"
packetSizeInBytes=10
LocalIp = "localhost"
Port = ":5010"
packetSizeInBytes = 10
LocalIp = "localhost"
)

var AgentIpList []string

func main(){
//处置协调端、客户端命令(可多建几个)
wg := sync.WaitGroup{}
wg.Add(1)
go commandHandle(&wg)
//面向客户端收发数据
lis, err := net.Listen("tcp", Port)
if err != nil {
panic(err)
}
s := grpc.NewServer()
agentserver.RegisterTranBlockOrReplicaServer(s, &anyOne{})
s.Serve(lis)
wg.Wait()
//
}
func main() {
AgentIpList = []string{"pcm01", "pcm1", "pcm2"}
//处置协调端、客户端命令(可多建几个)
wg := sync.WaitGroup{}
wg.Add(2)
go commandHandle(&wg)
go heartReport(&wg) //网络延迟感知
//面向客户端收发数据
lis, err := net.Listen("tcp", Port)
if err != nil {
panic(err)
}
s := grpc.NewServer()
agentserver.RegisterTranBlockOrReplicaServer(s, &anyOne{})
s.Serve(lis)
wg.Wait()
//
}

Loading…
Cancel
Save