diff --git a/assets/config/config.json b/assets/config/config.json index 294676a..2b4d786 100644 --- a/assets/config/config.json +++ b/assets/config/config.json @@ -8,5 +8,10 @@ "outputFileName": "cloud-agent", "outputDirectory": "log", "level": "debug" + }, + "rabbitMQ": { + "address": "127.0.0.1:5672", + "account": "guest", + "password": "guest" } } \ No newline at end of file diff --git a/command_service.go b/command_service.go index 3298cfa..d3bce9d 100644 --- a/command_service.go +++ b/command_service.go @@ -9,7 +9,7 @@ import ( "gitlink.org.cn/cloudream/agent/internal/config" "gitlink.org.cn/cloudream/utils" - racli "gitlink.org.cn/cloudream/rabbitmq/client" + coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" ramsg "gitlink.org.cn/cloudream/rabbitmq/message" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" @@ -59,6 +59,14 @@ func (service *CommandService) RepMove(msg *agtmsg.RepMoveCommand) *agtmsg.Agent for { readCnt, err := ipfsRd.Read(buf) + if readCnt > 0 { + err = myio.WriteAll(outFile, buf[:readCnt]) + if err != nil { + log.Warnf("write data to file %s failed, err: %s", outFilePath, err.Error()) + return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "write data to file failed") + } + } + // 文件读取完毕 if err == io.EOF { break @@ -68,16 +76,10 @@ func (service *CommandService) RepMove(msg *agtmsg.RepMoveCommand) *agtmsg.Agent log.Warnf("read ipfs file %s data failed, err: %s", fileHash, err.Error()) return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file data failed") } - - err = myio.WriteAll(outFile, buf[:readCnt]) - if err != nil { - log.Warnf("write data to file %s failed, err: %s", outFilePath, err.Error()) - return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "write data to file failed") - } } //向coor报告临时缓存hash - coorClient, err := racli.NewCoordinatorClient() + coorClient, err := coorcli.NewCoordinatorClient(&config.Cfg().RabbitMQ) if err != nil { log.Warnf("new coordinator client failed, err: %s", err.Error()) return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "new coordinator client failed") diff --git a/grpc_service.go b/grpc_service.go index 77496e4..f887212 100644 --- a/grpc_service.go +++ b/grpc_service.go @@ -91,11 +91,26 @@ func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTr defer reader.Close() buf := make([]byte, 1024) + readAllCnt := 0 for { readCnt, err := reader.Read(buf) + if readCnt > 0 { + readAllCnt += readCnt + err = server.Send(&agentserver.FileDataPacket{ + Type: agentserver.FileDataPacketType_Data, + Data: buf[:readCnt], + }) + if err != nil { + log.WithField("FileHash", req.FileHash). + Warnf("send file data failed, err: %s", err.Error()) + return fmt.Errorf("send file data failed, err: %w", err) + } + } + // 文件读取完毕 if err == io.EOF { + log.WithField("FileHash", req.FileHash).Debugf("send data size %d", readAllCnt) // 发送EOF消息 server.Send(&agentserver.FileDataPacket{ Type: agentserver.FileDataPacketType_EOF, @@ -108,15 +123,5 @@ func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTr log.Warnf("read file %s data failed, err: %s", req.FileHash, err.Error()) return fmt.Errorf("read file data failed, err: %w", err) } - - err = server.Send(&agentserver.FileDataPacket{ - Type: agentserver.FileDataPacketType_Data, - Data: buf[:readCnt], - }) - if err != nil { - log.WithField("FileHash", req.FileHash). - Warnf("send file data failed, err: %s", err.Error()) - return fmt.Errorf("send file data failed, err: %w", err) - } } } diff --git a/internal/config/config.go b/internal/config/config.go index 7802e11..8b84a89 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,17 +1,19 @@ package config import ( + racfg "gitlink.org.cn/cloudream/rabbitmq/config" c "gitlink.org.cn/cloudream/utils/config" log "gitlink.org.cn/cloudream/utils/logger" ) type Config struct { - ID int `json:"id"` - GRPCListenAddress string `json:"grpcListenAddress"` - LocalIP string `json:"localIP"` - IPFSPort int `json:"ipfsPort"` - StorageBaseDir string `json:"storageBaseDir"` - Logger log.Config `json:"logger"` + ID int `json:"id"` + GRPCListenAddress string `json:"grpcListenAddress"` + LocalIP string `json:"localIP"` + IPFSPort int `json:"ipfsPort"` + StorageBaseDir string `json:"storageBaseDir"` + Logger log.Config `json:"logger"` + RabbitMQ racfg.Config `json:"rabbitMQ"` } var cfg Config diff --git a/main.go b/main.go index ee0ca5f..4cffcaf 100644 --- a/main.go +++ b/main.go @@ -47,7 +47,7 @@ func main() { // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := rasvr.NewAgentServer(NewCommandService(ipfs), config.Cfg().ID) + agtSvr, err := rasvr.NewAgentServer(NewCommandService(ipfs), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } diff --git a/status_report.go b/status_report.go index 470c84e..b6daf86 100644 --- a/status_report.go +++ b/status_report.go @@ -6,14 +6,14 @@ import ( log "github.com/sirupsen/logrus" "gitlink.org.cn/cloudream/agent/internal/config" - racli "gitlink.org.cn/cloudream/rabbitmq/client" + coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" "gitlink.org.cn/cloudream/utils" "gitlink.org.cn/cloudream/utils/consts" ) func reportStatus(wg *sync.WaitGroup) { - coorCli, err := racli.NewCoordinatorClient() + coorCli, err := coorcli.NewCoordinatorClient(&config.Cfg().RabbitMQ) if err != nil { wg.Done() log.Error("new coordinator client failed, err: %w", err)