Browse Source

优化grpc上传下载的代码;优化Rep读写调度过程的逻辑

gitlink
Sydonian 2 years ago
parent
commit
cf3a5762fb
3 changed files with 63 additions and 37 deletions
  1. +4
    -2
      command_service.go
  2. +52
    -33
      grpc_service.go
  3. +7
    -2
      main.go

+ 4
- 2
command_service.go View File

@@ -55,6 +55,7 @@ func (service *CommandService) RepMove(msg *ramsg.RepMoveCommand) ramsg.AgentMov
log.Warnf("read ipfs file %s failed, err: %s", fileHash, err.Error())
return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("read ipfs file failed"))
}
defer ipfsRd.Close()

buf := make([]byte, 1024)
for {
@@ -85,7 +86,8 @@ func (service *CommandService) RepMove(msg *ramsg.RepMoveCommand) ramsg.AgentMov
}
defer coorClient.Close()

coorClient.TempCacheReport(config.Cfg().LocalIP, hashs)
// TODO 这里更新失败残留下的文件是否要删除?
coorClient.TempCacheReport(NodeID, hashs)

return ramsg.NewAgentMoveRespOK()
}
@@ -133,7 +135,7 @@ func (service *CommandService) ECMove(msg *ramsg.ECMoveCommand) ramsg.AgentMoveR
return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed"))
}
defer coorClient.Close()
coorClient.TempCacheReport(config.Cfg().LocalIP, hashs)
coorClient.TempCacheReport(NodeID, hashs)

return ramsg.NewAgentMoveRespOK()
}


+ 52
- 33
grpc_service.go View File

@@ -12,7 +12,7 @@ import (
)

type GRPCService struct {
agentserver.TranBlockOrReplicaServer
agentserver.FileTransportServer
ipfs *ipfs.IPFS
}

@@ -22,18 +22,42 @@ func NewGPRCService(ipfs *ipfs.IPFS) *GRPCService {
}
}

func (s *GRPCService) SendBlockOrReplica(server agentserver.TranBlockOrReplica_SendBlockOrReplicaServer) error {
func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) error {
writer, err := s.ipfs.CreateFile()
if err != nil {
log.Warnf("create file failed, err: %s", err.Error())
return fmt.Errorf("create file failed, err: %w", err)
}

// 然后读取文件数据
var recvSize int64
for {
msg, err := server.Recv()

// 客户端数据发送完毕,则停止文件写入,获得文件Hash
if err == io.EOF {
// 读取客户端数据失败
// 即使err是io.EOF,只要没有收到客户端包含EOF数据包就被断开了连接,就认为接收失败
if err != nil {
// 关闭文件写入,不需要返回的hash和error
// TODO 需要研究一下通过错误中断写入后,已发送的文件数据能不能自动删除
writer.Finish(io.ErrClosedPipe)
log.WithField("ReceiveSize", recvSize).
Warnf("recv message failed, err: %s", err.Error())
return fmt.Errorf("recv message failed, err: %w", err)
}

if msg.Type == agentserver.FileDataPacketType_Data {
err = myio.WriteAll(writer, msg.Data)
if err != nil {
// 关闭文件写入,不需要返回的hash和error
writer.Finish(io.ErrClosedPipe)
log.Warnf("write data to file failed, err: %s", err.Error())
return fmt.Errorf("write data to file failed, err: %w", err)
}

recvSize += int64(len(msg.Data))

} else if msg.Type == agentserver.FileDataPacketType_EOF {
// 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash
hash, err := writer.Finish(io.EOF)
if err != nil {
log.Warnf("finish writing failed, err: %s", err.Error())
@@ -41,36 +65,24 @@ func (s *GRPCService) SendBlockOrReplica(server agentserver.TranBlockOrReplica_S
}

// 并将结果返回到客户端
server.SendAndClose(&agentserver.SendRes{
BlockOrReplicaName: msg.BlockOrReplicaName,
BlockOrReplicaHash: hash,
err = server.SendAndClose(&agentserver.SendResp{
FileHash: hash,
})
return nil
}

// 读取客户端数据失败
if err != nil {
// 关闭文件写入,不需要返回的hash和error
writer.Finish(io.ErrClosedPipe)
log.Warnf("recv message failed, err: %s", err.Error())
return fmt.Errorf("recv message failed, err: %w", err)
}
if err != nil {
// TODO 文件已经完整写入,需要考虑是否删除此文件
log.Warnf("send response failed, err: %s", err.Error())
return fmt.Errorf("send response failed, err: %w", err)
}

// 写入到文件失败
err = myio.WriteAll(writer, msg.BlockOrReplicaData)
if err != nil {
// 关闭文件写入,不需要返回的hash和error
writer.Finish(io.ErrClosedPipe)
log.Warnf("write data to file failed, err: %s", err.Error())
return fmt.Errorf("write data to file failed, err: %w", err)
return nil
}
}
}

func (s *GRPCService) GetBlockOrReplica(req *agentserver.GetReq, server agentserver.TranBlockOrReplica_GetBlockOrReplicaServer) error {
reader, err := s.ipfs.OpenRead(req.BlockOrReplicaHash)
func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTransport_GetFileServer) error {
reader, err := s.ipfs.OpenRead(req.FileHash)
if err != nil {
log.Warnf("open file %s to read failed, err: %s", req.BlockOrReplicaHash, err.Error())
log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error())
return fmt.Errorf("open file to read failed, err: %w", err)
}
defer reader.Close()
@@ -81,20 +93,27 @@ func (s *GRPCService) GetBlockOrReplica(req *agentserver.GetReq, server agentser

// 文件读取完毕
if err == io.EOF {
// 发送EOF消息
server.Send(&agentserver.FileDataPacket{
Type: agentserver.FileDataPacketType_EOF,
})
return nil
}

// io.ErrUnexpectedEOF没有读满整个buf就遇到了EOF,此时正常发送剩余数据即可。除了这两个错误之外,其他错误都中断操作
if err != io.ErrUnexpectedEOF {
log.Warnf("read file %s data failed, err: %s", req.BlockOrReplicaHash, err.Error())
log.Warnf("read file %s data failed, err: %s", req.FileHash, err.Error())
return fmt.Errorf("read file data failed, err: %w", err)
}

server.Send(&agentserver.BlockOrReplica{
BlockOrReplicaName: "json",
BlockOrReplicaHash: "json",
BlockOrReplicaData: buf[:readCnt],
err = server.Send(&agentserver.FileDataPacket{
Type: agentserver.FileDataPacketType_Data,
Data: buf[:readCnt],
})
if err != nil {
log.WithField("FileHash", req.FileHash).
Warnf("send file data failed, err: %s", err.Error())
return fmt.Errorf("send file data failed, err: %w", err)
}
}
return nil
}

+ 7
- 2
main.go View File

@@ -20,6 +20,11 @@ import (
// TODO 此数据是否在运行时会发生变化?
var AgentIpList []string

const (
// TODO 考虑从保存的配置文件中读取
NodeID = 0
)

func main() {
// TODO 放到配置里读取
AgentIpList = []string{"pcm01", "pcm1", "pcm2"}
@@ -47,7 +52,7 @@ func main() {

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
cmdSvr, err := rasvr.NewAgentServer(NewCommandService(ipfs), 0)
cmdSvr, err := rasvr.NewAgentServer(NewCommandService(ipfs), NodeID)
if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error())
}
@@ -63,7 +68,7 @@ func main() {
}

s := grpc.NewServer()
agentserver.RegisterTranBlockOrReplicaServer(s, NewGPRCService(ipfs))
agentserver.RegisterFileTransportServer(s, NewGPRCService(ipfs))
s.Serve(lis)

wg.Wait()


Loading…
Cancel
Save