From bcf50f1c52639e303e69fb78f8a3ccf56c987aec Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 19 Apr 2023 17:20:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96grpc=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E4=B8=8B=E8=BD=BD=E7=9A=84=E4=BB=A3=E7=A0=81=EF=BC=9B=E4=BC=98?= =?UTF-8?q?=E5=8C=96Rep=E8=AF=BB=E5=86=99=E8=B0=83=E5=BA=A6=E8=BF=87?= =?UTF-8?q?=E7=A8=8B=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client_command.go | 376 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 264 insertions(+), 112 deletions(-) diff --git a/client_command.go b/client_command.go index c9cfedb..2b008b0 100644 --- a/client_command.go +++ b/client_command.go @@ -6,7 +6,6 @@ import ( "io" "os" "path/filepath" - "strconv" "sync" "gitlink.org.cn/cloudream/client/config" @@ -23,6 +22,7 @@ import ( "google.golang.org/grpc" _ "google.golang.org/grpc/balancer/grpclb" + "google.golang.org/grpc/credentials/insecure" ) func Move(bucketName string, objectName string, stgID int) error { @@ -75,9 +75,7 @@ func Move(bucketName string, objectName string, stgID int) error { } func Read(localFilePath string, bucketName string, objectName string) error { - fmt.Println("read " + bucketName + "/" + objectName + " to " + localFilePath) - //获取块hash,ip,序号,编码参数等 - //发送写请求,分配写入节点Ip + // TODO 此处是写死的常量 userId := 0 // 先向协调端请求文件相关的数据 @@ -96,30 +94,35 @@ func Read(localFilePath string, bucketName string, objectName string) error { } switch readResp.Redundancy { - case "rep": - err = repRead(readResp.FileSizeInBytes, readResp.IPs[0], readResp.Hashes[0], localFilePath) + case consts.REDUNDANCY_REP: + if len(readResp.NodeIPs) == 0 { + return fmt.Errorf("no node has this file") + } + + // 随便选第一个节点下载文件 + err = repRead(readResp.FileSizeInBytes, readResp.NodeIPs[0], readResp.Hashes[0], localFilePath) if err != nil { return fmt.Errorf("rep read failed, err: %w", err) } - case "ec": + case consts.REDUNDANCY_EC: // TODO EC部分的代码要考虑重构 - ecRead(readResp.FileSizeInBytes, readResp.IPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath) + ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath) } return nil } -func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath string) error { - grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) +func repRead(fileSizeInBytes int64, nodeIP string, repHash string, localFilePath string) error { + // 连接grpc + grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) + conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) } defer conn.Close() - client := agentcaller.NewTranBlockOrReplicaClient(conn) - + // 创建本地文件 curExecPath, err := os.Executable() if err != nil { return fmt.Errorf("get executable directory failed, err: %w", err) @@ -144,30 +147,36 @@ func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath str 如果本地有ipfs daemon且能获取相应对象的cid,则获取对象cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock 否则,像目前一样,使用grpc向指定节点获取 */ - stream, err := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{ - BlockOrReplicaHash: repHash, + // 下载文件 + client := agentcaller.NewFileTransportClient(conn) + stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{ + FileHash: repHash, }) if err != nil { return fmt.Errorf("request grpc failed, err: %w", err) } + defer stream.CloseSend() - numPacket := (fileSizeInBytes + config.Cfg().GRCPPacketSize - 1) / config.Cfg().GRCPPacketSize - for i := int64(0); i < numPacket; i++ { + for { resp, err := stream.Recv() if err != nil { return fmt.Errorf("read file data on grpc stream failed, err: %w", err) } - err = myio.WriteAll(outputFile, resp.BlockOrReplicaData) - if err != nil { - return fmt.Errorf("write file data to local file failed, err: %w", err) + if resp.Type == agentcaller.FileDataPacketType_Data { + err = myio.WriteAll(outputFile, resp.Data) + // TODO 写入到文件失败,是否要考虑删除这个不完整的文件? + if err != nil { + return fmt.Errorf("write file data to local file failed, err: %w", err) + } + + } else if resp.Type == agentcaller.FileDataPacketType_EOF { + return nil } } - - return nil } -func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) { +func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) { //根据ecName获得以下参数 wg := sync.WaitGroup{} ecPolicies := *utils.GetEcPolicy() @@ -193,15 +202,25 @@ func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds [ blockSeq := blockIds wg.Add(1) for i := 0; i < len(blockSeq); i++ { - go get(blockHashs[i], ips[i], getBufs[blockSeq[i]], numPacket) + go get(blockHashs[i], nodeIPs[i], getBufs[blockSeq[i]], numPacket) } go decode(getBufs[:], decodeBufs[:], blockSeq, ecK, coefs, numPacket) go persist(decodeBufs[:], numPacket, localFilePath, &wg) wg.Wait() } +type fileSender struct { + grpcCon *grpc.ClientConn + stream agentcaller.FileTransport_SendFileClient + nodeIP string + fileHash string + err error +} + func RepWrite(localFilePath string, bucketName string, objectName string, numRep int) error { + // TODO 此处是写死的常量 userId := 0 + //获取文件大小 fileInfo, err := os.Stat(localFilePath) if err != nil { @@ -209,14 +228,6 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep } fileSizeInBytes := fileInfo.Size() - //写入对象的packet数 - numWholePacket := fileSizeInBytes / config.Cfg().GRCPPacketSize - lastPacketInBytes := fileSizeInBytes % config.Cfg().GRCPPacketSize - numPacket := numWholePacket - if lastPacketInBytes > 0 { - numPacket++ - } - coorClient, err := racli.NewCoordinatorClient() if err != nil { return fmt.Errorf("create coordinator client failed, err: %w", err) @@ -232,26 +243,47 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep return fmt.Errorf("coordinator RepWrite failed, err: %w", err) } - //创建channel - loadDistributeBufs := make([]chan []byte, numRep) - for i := 0; i < numRep; i++ { - loadDistributeBufs[i] = make(chan []byte) + file, err := os.Open(localFilePath) + if err != nil { + return fmt.Errorf("open file %s failed, err: %w", localFilePath, err) } + defer file.Close() - //正式开始写入 - hashs := make([]string, numRep) - go loadDistribute(localFilePath, loadDistributeBufs[:], numWholePacket, lastPacketInBytes) //从本地文件系统加载数据 - var wg sync.WaitGroup - wg.Add(numRep) + /* + TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足 + 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象 + 否则,像目前一样,使用grpc向指定节点获取 + */ + + senders := make([]fileSender, numRep) + + // 建立grpc连接,发送请求 + startSendFile(numRep, senders, repWriteResp.NodeIPs) + + // 向每个节点发送数据 + err = sendFileData(file, numRep, senders) + if err != nil { + return err + } + + // 发送EOF消息,并获得FileHash + sendFinish(numRep, senders) + + // 收集发送成功的节点以及返回的hash + var sucNodeIPs []string + var sucFileHashes []string for i := 0; i < numRep; i++ { - //TODO xh: send的第一个参数不需要了 - // TODO2 见上 - go send("rep.json"+strconv.Itoa(i), repWriteResp.IPs[i], loadDistributeBufs[i], numPacket, &wg, hashs, i) //"block1.json"这样参数不需要 + sender := &senders[i] + + if sender.err == nil { + sucNodeIPs = append(sucNodeIPs, sender.nodeIP) + sucFileHashes = append(sucFileHashes, sender.fileHash) + } } - wg.Wait() // 记录写入的文件的Hash - writeRepHashResp, err := coorClient.WriteRepHash(bucketName, objectName, hashs, repWriteResp.IPs, userId) + // TODO 如果一个都没有写成功,那么是否要发送这个请求? + writeRepHashResp, err := coorClient.WriteRepHash(bucketName, objectName, sucFileHashes, sucNodeIPs, userId) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } @@ -262,9 +294,142 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep return nil } +func startSendFile(numRep int, senders []fileSender, nodeIPs []string) { + for i := 0; i < numRep; i++ { + sender := &senders[i] + + sender.nodeIP = nodeIPs[i] + + grpcAddr := fmt.Sprintf("%s:%d", nodeIPs[i], config.Cfg().GRPCPort) + conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + + if err != nil { + sender.err = fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) + continue + } + + client := agentcaller.NewFileTransportClient(conn) + stream, err := client.SendFile(context.Background()) + if err != nil { + conn.Close() + sender.err = fmt.Errorf("request to send file failed, err: %w", err) + continue + } + + sender.grpcCon = conn + sender.stream = stream + } +} + +func sendFileData(file *os.File, numRep int, senders []fileSender) error { + + // 共用的发送数据缓冲区 + buf := make([]byte, 2048) + + for { + // 读取文件数据 + readCnt, err := file.Read(buf) + + // 文件读取完毕 + if err == io.EOF { + break + } + + if err != nil { + // 读取失败则断开所有连接 + for i := 0; i < numRep; i++ { + sender := &senders[i] + + if sender.err != nil { + continue + } + + sender.stream.CloseSend() + sender.grpcCon.Close() + sender.err = fmt.Errorf("read file data failed, err: %w", err) + } + + return fmt.Errorf("read file data failed, err: %w", err) + } + + // 并行的向每个节点发送数据 + hasSender := false + var sendWg sync.WaitGroup + for i := 0; i < numRep; i++ { + sender := &senders[i] + + // 发生了错误的跳过 + if sender.err != nil { + continue + } + + hasSender = true + + sendWg.Add(1) + go func() { + err := sender.stream.Send(&agentcaller.FileDataPacket{ + Type: agentcaller.FileDataPacketType_Data, + Data: buf[:readCnt], + }) + + // 发生错误则关闭连接 + if err != nil { + sender.stream.CloseSend() + sender.grpcCon.Close() + sender.err = fmt.Errorf("send file data failed, err: %w", err) + } + + sendWg.Done() + }() + } + + // 等待向每个节点发送数据结束 + sendWg.Wait() + + // 如果所有节点都发送失败,则不要再继续读取文件数据了 + if !hasSender { + break + } + } + return nil +} + +func sendFinish(numRep int, senders []fileSender) { + for i := 0; i < numRep; i++ { + sender := &senders[i] + + // 发生了错误的跳过 + if sender.err != nil { + continue + } + + err := sender.stream.Send(&agentcaller.FileDataPacket{ + Type: agentcaller.FileDataPacketType_EOF, + }) + if err != nil { + sender.stream.CloseSend() + sender.grpcCon.Close() + sender.err = fmt.Errorf("send file data failed, err: %w", err) + continue + } + + resp, err := sender.stream.CloseAndRecv() + if err != nil { + sender.err = fmt.Errorf("receive response failed, err: %w", err) + sender.grpcCon.Close() + continue + } + + sender.fileHash = resp.FileHash + sender.grpcCon.Close() + } +} + func EcWrite(localFilePath string, bucketName string, objectName string, ecName string) error { fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName) + // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 + //获取文件大小 fileInfo, err := os.Stat(localFilePath) if err != nil { @@ -317,12 +482,6 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName for i := 0; i < ecN; i++ { encodeBufs[i] = make(chan []byte) } - blockNames := make([]string, ecN) - for i := 0; i < ecN; i++ { - blockNames[i] = (bucketName + "_" + objectName + "_" + strconv.Itoa(i)) - print(blockNames[i]) - print("miemiemie") - } hashs := make([]string, ecN) //正式开始写入 go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据 @@ -332,12 +491,12 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName wg.Add(ecN) for i := 0; i < ecN; i++ { - go send(blockNames[i], ecWriteResp.IPs[i], encodeBufs[i], numPacket, &wg, hashs, i) + go send(ecWriteResp.NodeIPs[i], encodeBufs[i], numPacket, &wg, hashs, i) } wg.Wait() //第二轮通讯:插入元数据hashs - writeRepHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.IPs, userId) + writeRepHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.NodeIPs, userId) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } @@ -348,33 +507,6 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName return nil } -func loadDistribute(localFilePath string, loadDistributeBufs []chan []byte, numWholePacket int64, lastPacketInBytes int64) { - fmt.Println("loadDistribute " + localFilePath) - file, _ := os.Open(localFilePath) - for i := 0; int64(i) < numWholePacket; i++ { - buf := make([]byte, config.Cfg().GRCPPacketSize) - _, err := file.Read(buf) - if err != nil && err != io.EOF { - break - } - for j := 0; j < len(loadDistributeBufs); j++ { - loadDistributeBufs[j] <- buf - } - } - if lastPacketInBytes > 0 { - buf := make([]byte, lastPacketInBytes) - file.Read(buf) - for j := 0; j < len(loadDistributeBufs); j++ { - loadDistributeBufs[j] <- buf - } - } - fmt.Println("load over") - for i := 0; i < len(loadDistributeBufs); i++ { - close(loadDistributeBufs[i]) - } - file.Close() -} - func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSizeInBytes int64) { fmt.Println("load " + localFilePath) file, _ := os.Open(localFilePath) @@ -478,70 +610,90 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int } } -func send(blockName string, ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) { - fmt.Println("send " + blockName) +func send(ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) error { /* TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象 否则,像目前一样,使用grpc向指定节点获取 */ - //rpc相关 - conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort), grpc.WithInsecure()) + + // TODO 如果发生错误,需要考虑将错误传递出去 + defer wg.Done() + + grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort) + conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - panic(err) + return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) } + defer conn.Close() - client := agentcaller.NewTranBlockOrReplicaClient(conn) - stream, err := client.SendBlockOrReplica(context.Background()) + client := agentcaller.NewFileTransportClient(conn) + stream, err := client.SendFile(context.Background()) if err != nil { - panic(err) + return fmt.Errorf("request to send file failed, err: %w", err) } + for i := 0; int64(i) < numPacket; i++ { buf := <-inBuf - fmt.Println(buf) - err := stream.Send(&agentcaller.BlockOrReplica{ - BlockOrReplicaName: blockName, - BlockOrReplicaHash: blockName, - BlockOrReplicaData: buf, + + err := stream.Send(&agentcaller.FileDataPacket{ + Code: agentcaller.FileDataPacket_OK, + Data: buf, }) - if err != nil && err != io.EOF { - panic(err) + + if err != nil { + stream.CloseSend() + return fmt.Errorf("send file data failed, err: %w", err) } } - res, err := stream.CloseAndRecv() - fmt.Println(res) - hashs[idx] = res.BlockOrReplicaHash - conn.Close() - wg.Done() - return + + err = stream.Send(&agentcaller.FileDataPacket{ + Code: agentcaller.FileDataPacket_EOF, + }) + + if err != nil { + stream.CloseSend() + return fmt.Errorf("send file data failed, err: %w", err) + } + + resp, err := stream.CloseAndRecv() + if err != nil { + return fmt.Errorf("receive response failed, err: %w", err) + } + + hashs[idx] = resp.FileHash + return nil } -func get(blockHash string, ip string, getBuf chan []byte, numPacket int64) { - //rpc相关 - print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") - conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort), grpc.WithInsecure()) +func get(blockHash string, nodeIP string, getBuf chan []byte, numPacket int64) error { + grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) + conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - panic(err) + return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) } + defer conn.Close() /* TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid 如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock 否则,像目前一样,使用grpc向指定节点获取 */ - client := agentcaller.NewTranBlockOrReplicaClient(conn) + client := agentcaller.NewFileTransportClient(conn) //rpc get - stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{ - BlockOrReplicaHash: blockHash, + // TODO 要考虑读取失败后,如何中断后续解码过程 + stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{ + FileHash: blockHash, }) - fmt.Println(numPacket) + for i := 0; int64(i) < numPacket; i++ { fmt.Println(i) + // TODO 同上 res, _ := stream.Recv() fmt.Println(res.BlockOrReplicaData) getBuf <- res.BlockOrReplicaData } + close(getBuf) - conn.Close() + return nil } func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {