Browse Source

优化grpc上传下载的代码;优化Rep读写调度过程的逻辑

gitlink
Sydonian 2 years ago
parent
commit
bcf50f1c52
1 changed files with 264 additions and 112 deletions
  1. +264
    -112
      client_command.go

+ 264
- 112
client_command.go View File

@@ -6,7 +6,6 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"sync"

"gitlink.org.cn/cloudream/client/config"
@@ -23,6 +22,7 @@ import (
"google.golang.org/grpc"

_ "google.golang.org/grpc/balancer/grpclb"
"google.golang.org/grpc/credentials/insecure"
)

func Move(bucketName string, objectName string, stgID int) error {
@@ -75,9 +75,7 @@ func Move(bucketName string, objectName string, stgID int) error {
}

func Read(localFilePath string, bucketName string, objectName string) error {
fmt.Println("read " + bucketName + "/" + objectName + " to " + localFilePath)
//获取块hash,ip,序号,编码参数等
//发送写请求,分配写入节点Ip
// TODO 此处是写死的常量
userId := 0

// 先向协调端请求文件相关的数据
@@ -96,30 +94,35 @@ func Read(localFilePath string, bucketName string, objectName string) error {
}

switch readResp.Redundancy {
case "rep":
err = repRead(readResp.FileSizeInBytes, readResp.IPs[0], readResp.Hashes[0], localFilePath)
case consts.REDUNDANCY_REP:
if len(readResp.NodeIPs) == 0 {
return fmt.Errorf("no node has this file")
}

// 随便选第一个节点下载文件
err = repRead(readResp.FileSizeInBytes, readResp.NodeIPs[0], readResp.Hashes[0], localFilePath)
if err != nil {
return fmt.Errorf("rep read failed, err: %w", err)
}

case "ec":
case consts.REDUNDANCY_EC:
// TODO EC部分的代码要考虑重构
ecRead(readResp.FileSizeInBytes, readResp.IPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath)
ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath)
}

return nil
}

func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath string) error {
grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
func repRead(fileSizeInBytes int64, nodeIP string, repHash string, localFilePath string) error {
// 连接grpc
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer conn.Close()

client := agentcaller.NewTranBlockOrReplicaClient(conn)

// 创建本地文件
curExecPath, err := os.Executable()
if err != nil {
return fmt.Errorf("get executable directory failed, err: %w", err)
@@ -144,30 +147,36 @@ func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath str
如果本地有ipfs daemon且能获取相应对象的cid,则获取对象cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock
否则,像目前一样,使用grpc向指定节点获取
*/
stream, err := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{
BlockOrReplicaHash: repHash,
// 下载文件
client := agentcaller.NewFileTransportClient(conn)
stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{
FileHash: repHash,
})
if err != nil {
return fmt.Errorf("request grpc failed, err: %w", err)
}
defer stream.CloseSend()

numPacket := (fileSizeInBytes + config.Cfg().GRCPPacketSize - 1) / config.Cfg().GRCPPacketSize
for i := int64(0); i < numPacket; i++ {
for {
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("read file data on grpc stream failed, err: %w", err)
}

err = myio.WriteAll(outputFile, resp.BlockOrReplicaData)
if err != nil {
return fmt.Errorf("write file data to local file failed, err: %w", err)
if resp.Type == agentcaller.FileDataPacketType_Data {
err = myio.WriteAll(outputFile, resp.Data)
// TODO 写入到文件失败,是否要考虑删除这个不完整的文件?
if err != nil {
return fmt.Errorf("write file data to local file failed, err: %w", err)
}

} else if resp.Type == agentcaller.FileDataPacketType_EOF {
return nil
}
}

return nil
}

func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) {
func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) {
//根据ecName获得以下参数
wg := sync.WaitGroup{}
ecPolicies := *utils.GetEcPolicy()
@@ -193,15 +202,25 @@ func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds [
blockSeq := blockIds
wg.Add(1)
for i := 0; i < len(blockSeq); i++ {
go get(blockHashs[i], ips[i], getBufs[blockSeq[i]], numPacket)
go get(blockHashs[i], nodeIPs[i], getBufs[blockSeq[i]], numPacket)
}
go decode(getBufs[:], decodeBufs[:], blockSeq, ecK, coefs, numPacket)
go persist(decodeBufs[:], numPacket, localFilePath, &wg)
wg.Wait()
}

type fileSender struct {
grpcCon *grpc.ClientConn
stream agentcaller.FileTransport_SendFileClient
nodeIP string
fileHash string
err error
}

func RepWrite(localFilePath string, bucketName string, objectName string, numRep int) error {
// TODO 此处是写死的常量
userId := 0

//获取文件大小
fileInfo, err := os.Stat(localFilePath)
if err != nil {
@@ -209,14 +228,6 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep
}
fileSizeInBytes := fileInfo.Size()

//写入对象的packet数
numWholePacket := fileSizeInBytes / config.Cfg().GRCPPacketSize
lastPacketInBytes := fileSizeInBytes % config.Cfg().GRCPPacketSize
numPacket := numWholePacket
if lastPacketInBytes > 0 {
numPacket++
}

coorClient, err := racli.NewCoordinatorClient()
if err != nil {
return fmt.Errorf("create coordinator client failed, err: %w", err)
@@ -232,26 +243,47 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep
return fmt.Errorf("coordinator RepWrite failed, err: %w", err)
}

//创建channel
loadDistributeBufs := make([]chan []byte, numRep)
for i := 0; i < numRep; i++ {
loadDistributeBufs[i] = make(chan []byte)
file, err := os.Open(localFilePath)
if err != nil {
return fmt.Errorf("open file %s failed, err: %w", localFilePath, err)
}
defer file.Close()

//正式开始写入
hashs := make([]string, numRep)
go loadDistribute(localFilePath, loadDistributeBufs[:], numWholePacket, lastPacketInBytes) //从本地文件系统加载数据
var wg sync.WaitGroup
wg.Add(numRep)
/*
TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足
如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象
否则,像目前一样,使用grpc向指定节点获取
*/

senders := make([]fileSender, numRep)

// 建立grpc连接,发送请求
startSendFile(numRep, senders, repWriteResp.NodeIPs)

// 向每个节点发送数据
err = sendFileData(file, numRep, senders)
if err != nil {
return err
}

// 发送EOF消息,并获得FileHash
sendFinish(numRep, senders)

// 收集发送成功的节点以及返回的hash
var sucNodeIPs []string
var sucFileHashes []string
for i := 0; i < numRep; i++ {
//TODO xh: send的第一个参数不需要了
// TODO2 见上
go send("rep.json"+strconv.Itoa(i), repWriteResp.IPs[i], loadDistributeBufs[i], numPacket, &wg, hashs, i) //"block1.json"这样参数不需要
sender := &senders[i]

if sender.err == nil {
sucNodeIPs = append(sucNodeIPs, sender.nodeIP)
sucFileHashes = append(sucFileHashes, sender.fileHash)
}
}
wg.Wait()

// 记录写入的文件的Hash
writeRepHashResp, err := coorClient.WriteRepHash(bucketName, objectName, hashs, repWriteResp.IPs, userId)
// TODO 如果一个都没有写成功,那么是否要发送这个请求?
writeRepHashResp, err := coorClient.WriteRepHash(bucketName, objectName, sucFileHashes, sucNodeIPs, userId)
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
@@ -262,9 +294,142 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep
return nil
}

func startSendFile(numRep int, senders []fileSender, nodeIPs []string) {
for i := 0; i < numRep; i++ {
sender := &senders[i]

sender.nodeIP = nodeIPs[i]

grpcAddr := fmt.Sprintf("%s:%d", nodeIPs[i], config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))

if err != nil {
sender.err = fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
continue
}

client := agentcaller.NewFileTransportClient(conn)
stream, err := client.SendFile(context.Background())
if err != nil {
conn.Close()
sender.err = fmt.Errorf("request to send file failed, err: %w", err)
continue
}

sender.grpcCon = conn
sender.stream = stream
}
}

func sendFileData(file *os.File, numRep int, senders []fileSender) error {

// 共用的发送数据缓冲区
buf := make([]byte, 2048)

for {
// 读取文件数据
readCnt, err := file.Read(buf)

// 文件读取完毕
if err == io.EOF {
break
}

if err != nil {
// 读取失败则断开所有连接
for i := 0; i < numRep; i++ {
sender := &senders[i]

if sender.err != nil {
continue
}

sender.stream.CloseSend()
sender.grpcCon.Close()
sender.err = fmt.Errorf("read file data failed, err: %w", err)
}

return fmt.Errorf("read file data failed, err: %w", err)
}

// 并行的向每个节点发送数据
hasSender := false
var sendWg sync.WaitGroup
for i := 0; i < numRep; i++ {
sender := &senders[i]

// 发生了错误的跳过
if sender.err != nil {
continue
}

hasSender = true

sendWg.Add(1)
go func() {
err := sender.stream.Send(&agentcaller.FileDataPacket{
Type: agentcaller.FileDataPacketType_Data,
Data: buf[:readCnt],
})

// 发生错误则关闭连接
if err != nil {
sender.stream.CloseSend()
sender.grpcCon.Close()
sender.err = fmt.Errorf("send file data failed, err: %w", err)
}

sendWg.Done()
}()
}

// 等待向每个节点发送数据结束
sendWg.Wait()

// 如果所有节点都发送失败,则不要再继续读取文件数据了
if !hasSender {
break
}
}
return nil
}

func sendFinish(numRep int, senders []fileSender) {
for i := 0; i < numRep; i++ {
sender := &senders[i]

// 发生了错误的跳过
if sender.err != nil {
continue
}

err := sender.stream.Send(&agentcaller.FileDataPacket{
Type: agentcaller.FileDataPacketType_EOF,
})
if err != nil {
sender.stream.CloseSend()
sender.grpcCon.Close()
sender.err = fmt.Errorf("send file data failed, err: %w", err)
continue
}

resp, err := sender.stream.CloseAndRecv()
if err != nil {
sender.err = fmt.Errorf("receive response failed, err: %w", err)
sender.grpcCon.Close()
continue
}

sender.fileHash = resp.FileHash
sender.grpcCon.Close()
}
}

func EcWrite(localFilePath string, bucketName string, objectName string, ecName string) error {
fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName)

// TODO 需要参考RepWrite函数的代码逻辑,做好错误处理

//获取文件大小
fileInfo, err := os.Stat(localFilePath)
if err != nil {
@@ -317,12 +482,6 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName
for i := 0; i < ecN; i++ {
encodeBufs[i] = make(chan []byte)
}
blockNames := make([]string, ecN)
for i := 0; i < ecN; i++ {
blockNames[i] = (bucketName + "_" + objectName + "_" + strconv.Itoa(i))
print(blockNames[i])
print("miemiemie")
}
hashs := make([]string, ecN)
//正式开始写入
go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据
@@ -332,12 +491,12 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName
wg.Add(ecN)

for i := 0; i < ecN; i++ {
go send(blockNames[i], ecWriteResp.IPs[i], encodeBufs[i], numPacket, &wg, hashs, i)
go send(ecWriteResp.NodeIPs[i], encodeBufs[i], numPacket, &wg, hashs, i)
}
wg.Wait()

//第二轮通讯:插入元数据hashs
writeRepHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.IPs, userId)
writeRepHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.NodeIPs, userId)
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
@@ -348,33 +507,6 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName
return nil
}

func loadDistribute(localFilePath string, loadDistributeBufs []chan []byte, numWholePacket int64, lastPacketInBytes int64) {
fmt.Println("loadDistribute " + localFilePath)
file, _ := os.Open(localFilePath)
for i := 0; int64(i) < numWholePacket; i++ {
buf := make([]byte, config.Cfg().GRCPPacketSize)
_, err := file.Read(buf)
if err != nil && err != io.EOF {
break
}
for j := 0; j < len(loadDistributeBufs); j++ {
loadDistributeBufs[j] <- buf
}
}
if lastPacketInBytes > 0 {
buf := make([]byte, lastPacketInBytes)
file.Read(buf)
for j := 0; j < len(loadDistributeBufs); j++ {
loadDistributeBufs[j] <- buf
}
}
fmt.Println("load over")
for i := 0; i < len(loadDistributeBufs); i++ {
close(loadDistributeBufs[i])
}
file.Close()
}

func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSizeInBytes int64) {
fmt.Println("load " + localFilePath)
file, _ := os.Open(localFilePath)
@@ -478,70 +610,90 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int
}
}

func send(blockName string, ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) {
fmt.Println("send " + blockName)
func send(ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) error {
/*
TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足
如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象
否则,像目前一样,使用grpc向指定节点获取
*/
//rpc相关
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort), grpc.WithInsecure())

// TODO 如果发生错误,需要考虑将错误传递出去
defer wg.Done()

grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer conn.Close()

client := agentcaller.NewTranBlockOrReplicaClient(conn)
stream, err := client.SendBlockOrReplica(context.Background())
client := agentcaller.NewFileTransportClient(conn)
stream, err := client.SendFile(context.Background())
if err != nil {
panic(err)
return fmt.Errorf("request to send file failed, err: %w", err)
}

for i := 0; int64(i) < numPacket; i++ {
buf := <-inBuf
fmt.Println(buf)
err := stream.Send(&agentcaller.BlockOrReplica{
BlockOrReplicaName: blockName,
BlockOrReplicaHash: blockName,
BlockOrReplicaData: buf,

err := stream.Send(&agentcaller.FileDataPacket{
Code: agentcaller.FileDataPacket_OK,
Data: buf,
})
if err != nil && err != io.EOF {
panic(err)

if err != nil {
stream.CloseSend()
return fmt.Errorf("send file data failed, err: %w", err)
}
}
res, err := stream.CloseAndRecv()
fmt.Println(res)
hashs[idx] = res.BlockOrReplicaHash
conn.Close()
wg.Done()
return

err = stream.Send(&agentcaller.FileDataPacket{
Code: agentcaller.FileDataPacket_EOF,
})

if err != nil {
stream.CloseSend()
return fmt.Errorf("send file data failed, err: %w", err)
}

resp, err := stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("receive response failed, err: %w", err)
}

hashs[idx] = resp.FileHash
return nil
}

func get(blockHash string, ip string, getBuf chan []byte, numPacket int64) {
//rpc相关
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort), grpc.WithInsecure())
func get(blockHash string, nodeIP string, getBuf chan []byte, numPacket int64) error {
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer conn.Close()
/*
TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid
如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock
否则,像目前一样,使用grpc向指定节点获取
*/
client := agentcaller.NewTranBlockOrReplicaClient(conn)
client := agentcaller.NewFileTransportClient(conn)
//rpc get
stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{
BlockOrReplicaHash: blockHash,
// TODO 要考虑读取失败后,如何中断后续解码过程
stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{
FileHash: blockHash,
})
fmt.Println(numPacket)
for i := 0; int64(i) < numPacket; i++ {
fmt.Println(i)
// TODO 同上
res, _ := stream.Recv()
fmt.Println(res.BlockOrReplicaData)
getBuf <- res.BlockOrReplicaData
}

close(getBuf)
conn.Close()
return nil
}

func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {


Loading…
Cancel
Save