Browse Source

注释EC相关的函数

gitlink
Sydonian 2 years ago
parent
commit
f041d47d20
2 changed files with 142 additions and 141 deletions
  1. +1
    -1
      client_command.go
  2. +141
    -140
      client_command_ec.go

+ 1
- 1
client_command.go View File

@@ -104,7 +104,7 @@ func Read(localFilePath string, bucketName string, objectName string) error {

case consts.REDUNDANCY_EC:
// TODO EC部分的代码要考虑重构
//ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath)
ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath)
}

return nil


+ 141
- 140
client_command_ec.go View File

@@ -1,8 +1,6 @@
package main

/*
import (
"context"
"fmt"
"io"
"os"
@@ -11,94 +9,92 @@ import (

"gitlink.org.cn/cloudream/client/config"
"gitlink.org.cn/cloudream/ec"
agentcaller "gitlink.org.cn/cloudream/proto"
racli "gitlink.org.cn/cloudream/rabbitmq/client"
"gitlink.org.cn/cloudream/utils"
"gitlink.org.cn/cloudream/utils/consts/errorcode"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func EcWrite(localFilePath string, bucketName string, objectName string, ecName string) error {
fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName)
panic("not implement yet!")
/*
fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName)

// TODO 需要参考RepWrite函数的代码逻辑,做好错误处理
// TODO 需要参考RepWrite函数的代码逻辑,做好错误处理

//获取文件大小
fileInfo, err := os.Stat(localFilePath)
if err != nil {
return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err)
}
fileSizeInBytes := fileInfo.Size()
//调用纠删码库,获取编码参数及生成矩阵
ecPolicies := *utils.GetEcPolicy()
ecPolicy := ecPolicies[ecName]
ipss := utils.GetAgentIps()
fmt.Println(ipss)
print("@!@!@!@!@!@!")
//var policy utils.EcConfig
//policy = ecPolicy[0]
ecK := ecPolicy.GetK()
ecN := ecPolicy.GetN()
//const ecK int = ecPolicy.GetK()
//const ecN int = ecPolicy.GetN()
var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN
//计算每个块的packet数
numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
fmt.Println(numPacket)
userId := 0
coorClient, err := racli.NewCoordinatorClient()
if err != nil {
return fmt.Errorf("create coordinator client failed, err: %w", err)
}
defer coorClient.Close()
//获取文件大小
fileInfo, err := os.Stat(localFilePath)
if err != nil {
return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err)
}
fileSizeInBytes := fileInfo.Size()
//调用纠删码库,获取编码参数及生成矩阵
ecPolicies := *utils.GetEcPolicy()
ecPolicy := ecPolicies[ecName]
ipss := utils.GetAgentIps()
fmt.Println(ipss)
print("@!@!@!@!@!@!")
//var policy utils.EcConfig
//policy = ecPolicy[0]
ecK := ecPolicy.GetK()
ecN := ecPolicy.GetN()
//const ecK int = ecPolicy.GetK()
//const ecN int = ecPolicy.GetN()
var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN
//计算每个块的packet数
numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
fmt.Println(numPacket)
userId := 0
coorClient, err := racli.NewCoordinatorClient()
if err != nil {
return fmt.Errorf("create coordinator client failed, err: %w", err)
}
defer coorClient.Close()

//发送写请求,请求Coor分配写入节点Ip
ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSizeInBytes, ecName, userId)
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if ecWriteResp.ErrorCode != errorcode.OK {
return fmt.Errorf("coordinator ECWrite failed, err: %w", err)
}
//发送写请求,请求Coor分配写入节点Ip
ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSizeInBytes, ecName, userId)
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if ecWriteResp.ErrorCode != errorcode.OK {
return fmt.Errorf("coordinator ECWrite failed, err: %w", err)
}

//创建channel
loadBufs := make([]chan []byte, ecN)
encodeBufs := make([]chan []byte, ecN)
for i := 0; i < ecN; i++ {
loadBufs[i] = make(chan []byte)
}
for i := 0; i < ecN; i++ {
encodeBufs[i] = make(chan []byte)
}
hashs := make([]string, ecN)
//正式开始写入
go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据
go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket)
//创建channel
loadBufs := make([]chan []byte, ecN)
encodeBufs := make([]chan []byte, ecN)
for i := 0; i < ecN; i++ {
loadBufs[i] = make(chan []byte)
}
for i := 0; i < ecN; i++ {
encodeBufs[i] = make(chan []byte)
}
hashs := make([]string, ecN)
//正式开始写入
go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据
go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket)

var wg sync.WaitGroup
wg.Add(ecN)
var wg sync.WaitGroup
wg.Add(ecN)

for i := 0; i < ecN; i++ {
go send(ecWriteResp.NodeIPs[i], encodeBufs[i], numPacket, &wg, hashs, i)
}
wg.Wait()
for i := 0; i < ecN; i++ {
go send(ecWriteResp.NodeIPs[i], encodeBufs[i], numPacket, &wg, hashs, i)
}
wg.Wait()

//第二轮通讯:插入元数据hashs
writeECHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.NodeIPs, userId)
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if writeECHashResp.ErrorCode != errorcode.OK {
return fmt.Errorf("coordinator WriteECHash failed, err: %w", err)
}
//第二轮通讯:插入元数据hashs
writeECHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.NodeIPs, userId)
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
if writeECHashResp.ErrorCode != errorcode.OK {
return fmt.Errorf("coordinator WriteECHash failed, err: %w", err)
}

return nil
return nil
*/
}

func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSizeInBytes int64) {
@@ -205,86 +201,92 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int
}

func send(ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) error {
panic("not implement yet!")

// TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足
// 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象
// 否则,像目前一样,使用grpc向指定节点获取
/*
// TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足
// 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象
// 否则,像目前一样,使用grpc向指定节点获取

// TODO 如果发生错误,需要考虑将错误传递出去
defer wg.Done()
// TODO 如果发生错误,需要考虑将错误传递出去
defer wg.Done()

grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer conn.Close()
grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer conn.Close()

client := agentcaller.NewFileTransportClient(conn)
stream, err := client.SendFile(context.Background())
if err != nil {
return fmt.Errorf("request to send file failed, err: %w", err)
}
client := agentcaller.NewFileTransportClient(conn)
stream, err := client.SendFile(context.Background())
if err != nil {
return fmt.Errorf("request to send file failed, err: %w", err)
}

for i := 0; int64(i) < numPacket; i++ {
buf := <-inBuf
for i := 0; int64(i) < numPacket; i++ {
buf := <-inBuf

err := stream.Send(&agentcaller.FileDataPacket{
Code: agentcaller.FileDataPacket_OK,
Data: buf,
err := stream.Send(&agentcaller.FileDataPacket{
Code: agentcaller.FileDataPacket_OK,
Data: buf,
})

if err != nil {
stream.CloseSend()
return fmt.Errorf("send file data failed, err: %w", err)
}
}

err = stream.Send(&agentcaller.FileDataPacket{
Code: agentcaller.FileDataPacket_EOF,
})

if err != nil {
stream.CloseSend()
return fmt.Errorf("send file data failed, err: %w", err)
}
}

err = stream.Send(&agentcaller.FileDataPacket{
Code: agentcaller.FileDataPacket_EOF,
})

if err != nil {
stream.CloseSend()
return fmt.Errorf("send file data failed, err: %w", err)
}

resp, err := stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("receive response failed, err: %w", err)
}
resp, err := stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("receive response failed, err: %w", err)
}

hashs[idx] = resp.FileHash
return nil
hashs[idx] = resp.FileHash
return nil
*/
}

func get(blockHash string, nodeIP string, getBuf chan []byte, numPacket int64) error {
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer conn.Close()
// TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid
// 如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock
// 否则,像目前一样,使用grpc向指定节点获取
client := agentcaller.NewFileTransportClient(conn)
//rpc get
// TODO 要考虑读取失败后,如何中断后续解码过程
stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{
FileHash: blockHash,
})
panic("not implement yet!")
/*
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer conn.Close()
// TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid
// 如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock
// 否则,像目前一样,使用grpc向指定节点获取
client := agentcaller.NewFileTransportClient(conn)
//rpc get
// TODO 要考虑读取失败后,如何中断后续解码过程
stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{
FileHash: blockHash,
})

for i := 0; int64(i) < numPacket; i++ {
fmt.Println(i)
// TODO 同上
res, _ := stream.Recv()
fmt.Println(res.BlockOrReplicaData)
getBuf <- res.BlockOrReplicaData
}
for i := 0; int64(i) < numPacket; i++ {
fmt.Println(i)
// TODO 同上
res, _ := stream.Recv()
fmt.Println(res.BlockOrReplicaData)
getBuf <- res.BlockOrReplicaData
}

close(getBuf)
return nil
close(getBuf)
return nil
*/
}

func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
@@ -346,4 +348,3 @@ func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockI
go persist(decodeBufs[:], numPacket, localFilePath, &wg)
wg.Wait()
}
*/

Loading…
Cancel
Save