From f041d47d2017ff84a46c002480dafc3f299b58d6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 20 Apr 2023 10:05:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B3=A8=E9=87=8AEC=E7=9B=B8=E5=85=B3=E7=9A=84?= =?UTF-8?q?=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client_command.go | 2 +- client_command_ec.go | 281 ++++++++++++++++++++++--------------------- 2 files changed, 142 insertions(+), 141 deletions(-) diff --git a/client_command.go b/client_command.go index 7278b01..d19f952 100644 --- a/client_command.go +++ b/client_command.go @@ -104,7 +104,7 @@ func Read(localFilePath string, bucketName string, objectName string) error { case consts.REDUNDANCY_EC: // TODO EC部分的代码要考虑重构 - //ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath) + ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath) } return nil diff --git a/client_command_ec.go b/client_command_ec.go index 89c96a9..896d4ef 100644 --- a/client_command_ec.go +++ b/client_command_ec.go @@ -1,8 +1,6 @@ package main -/* import ( - "context" "fmt" "io" "os" @@ -11,94 +9,92 @@ import ( "gitlink.org.cn/cloudream/client/config" "gitlink.org.cn/cloudream/ec" - agentcaller "gitlink.org.cn/cloudream/proto" - racli "gitlink.org.cn/cloudream/rabbitmq/client" "gitlink.org.cn/cloudream/utils" - "gitlink.org.cn/cloudream/utils/consts/errorcode" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) func EcWrite(localFilePath string, bucketName string, objectName string, ecName string) error { - fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName) + panic("not implement yet!") + /* + fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName) - // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 + // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 - //获取文件大小 - fileInfo, err := os.Stat(localFilePath) - if err != nil { - return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err) - } - fileSizeInBytes := fileInfo.Size() - - //调用纠删码库,获取编码参数及生成矩阵 - ecPolicies := *utils.GetEcPolicy() - ecPolicy := ecPolicies[ecName] - - ipss := utils.GetAgentIps() - fmt.Println(ipss) - print("@!@!@!@!@!@!") - - //var policy utils.EcConfig - //policy = ecPolicy[0] - ecK := ecPolicy.GetK() - ecN := ecPolicy.GetN() - //const ecK int = ecPolicy.GetK() - //const ecN int = ecPolicy.GetN() - var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN - - //计算每个块的packet数 - numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize) - fmt.Println(numPacket) - - userId := 0 - coorClient, err := racli.NewCoordinatorClient() - if err != nil { - return fmt.Errorf("create coordinator client failed, err: %w", err) - } - defer coorClient.Close() + //获取文件大小 + fileInfo, err := os.Stat(localFilePath) + if err != nil { + return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err) + } + fileSizeInBytes := fileInfo.Size() + + //调用纠删码库,获取编码参数及生成矩阵 + ecPolicies := *utils.GetEcPolicy() + ecPolicy := ecPolicies[ecName] + + ipss := utils.GetAgentIps() + fmt.Println(ipss) + print("@!@!@!@!@!@!") + + //var policy utils.EcConfig + //policy = ecPolicy[0] + ecK := ecPolicy.GetK() + ecN := ecPolicy.GetN() + //const ecK int = ecPolicy.GetK() + //const ecN int = ecPolicy.GetN() + var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN + + //计算每个块的packet数 + numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize) + fmt.Println(numPacket) + + userId := 0 + coorClient, err := racli.NewCoordinatorClient() + if err != nil { + return fmt.Errorf("create coordinator client failed, err: %w", err) + } + defer coorClient.Close() - //发送写请求,请求Coor分配写入节点Ip - ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSizeInBytes, ecName, userId) - if err != nil { - return fmt.Errorf("request to coordinator failed, err: %w", err) - } - if ecWriteResp.ErrorCode != errorcode.OK { - return fmt.Errorf("coordinator ECWrite failed, err: %w", err) - } + //发送写请求,请求Coor分配写入节点Ip + ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSizeInBytes, ecName, userId) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if ecWriteResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator ECWrite failed, err: %w", err) + } - //创建channel - loadBufs := make([]chan []byte, ecN) - encodeBufs := make([]chan []byte, ecN) - for i := 0; i < ecN; i++ { - loadBufs[i] = make(chan []byte) - } - for i := 0; i < ecN; i++ { - encodeBufs[i] = make(chan []byte) - } - hashs := make([]string, ecN) - //正式开始写入 - go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据 - go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) + //创建channel + loadBufs := make([]chan []byte, ecN) + encodeBufs := make([]chan []byte, ecN) + for i := 0; i < ecN; i++ { + loadBufs[i] = make(chan []byte) + } + for i := 0; i < ecN; i++ { + encodeBufs[i] = make(chan []byte) + } + hashs := make([]string, ecN) + //正式开始写入 + go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据 + go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) - var wg sync.WaitGroup - wg.Add(ecN) + var wg sync.WaitGroup + wg.Add(ecN) - for i := 0; i < ecN; i++ { - go send(ecWriteResp.NodeIPs[i], encodeBufs[i], numPacket, &wg, hashs, i) - } - wg.Wait() + for i := 0; i < ecN; i++ { + go send(ecWriteResp.NodeIPs[i], encodeBufs[i], numPacket, &wg, hashs, i) + } + wg.Wait() - //第二轮通讯:插入元数据hashs - writeECHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.NodeIPs, userId) - if err != nil { - return fmt.Errorf("request to coordinator failed, err: %w", err) - } - if writeECHashResp.ErrorCode != errorcode.OK { - return fmt.Errorf("coordinator WriteECHash failed, err: %w", err) - } + //第二轮通讯:插入元数据hashs + writeECHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.NodeIPs, userId) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if writeECHashResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator WriteECHash failed, err: %w", err) + } - return nil + return nil + */ } func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSizeInBytes int64) { @@ -205,86 +201,92 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int } func send(ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) error { + panic("not implement yet!") - // TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足 - // 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象 - // 否则,像目前一样,使用grpc向指定节点获取 + /* + // TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足 + // 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象 + // 否则,像目前一样,使用grpc向指定节点获取 - // TODO 如果发生错误,需要考虑将错误传递出去 - defer wg.Done() + // TODO 如果发生错误,需要考虑将错误传递出去 + defer wg.Done() - grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - defer conn.Close() + grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort) + conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) + } + defer conn.Close() - client := agentcaller.NewFileTransportClient(conn) - stream, err := client.SendFile(context.Background()) - if err != nil { - return fmt.Errorf("request to send file failed, err: %w", err) - } + client := agentcaller.NewFileTransportClient(conn) + stream, err := client.SendFile(context.Background()) + if err != nil { + return fmt.Errorf("request to send file failed, err: %w", err) + } - for i := 0; int64(i) < numPacket; i++ { - buf := <-inBuf + for i := 0; int64(i) < numPacket; i++ { + buf := <-inBuf - err := stream.Send(&agentcaller.FileDataPacket{ - Code: agentcaller.FileDataPacket_OK, - Data: buf, + err := stream.Send(&agentcaller.FileDataPacket{ + Code: agentcaller.FileDataPacket_OK, + Data: buf, + }) + + if err != nil { + stream.CloseSend() + return fmt.Errorf("send file data failed, err: %w", err) + } + } + + err = stream.Send(&agentcaller.FileDataPacket{ + Code: agentcaller.FileDataPacket_EOF, }) if err != nil { stream.CloseSend() return fmt.Errorf("send file data failed, err: %w", err) } - } - - err = stream.Send(&agentcaller.FileDataPacket{ - Code: agentcaller.FileDataPacket_EOF, - }) - if err != nil { - stream.CloseSend() - return fmt.Errorf("send file data failed, err: %w", err) - } - - resp, err := stream.CloseAndRecv() - if err != nil { - return fmt.Errorf("receive response failed, err: %w", err) - } + resp, err := stream.CloseAndRecv() + if err != nil { + return fmt.Errorf("receive response failed, err: %w", err) + } - hashs[idx] = resp.FileHash - return nil + hashs[idx] = resp.FileHash + return nil + */ } func get(blockHash string, nodeIP string, getBuf chan []byte, numPacket int64) error { - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - defer conn.Close() - // TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid - // 如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock - // 否则,像目前一样,使用grpc向指定节点获取 - client := agentcaller.NewFileTransportClient(conn) - //rpc get - // TODO 要考虑读取失败后,如何中断后续解码过程 - stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{ - FileHash: blockHash, - }) + panic("not implement yet!") + /* + grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) + conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) + } + defer conn.Close() + // TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid + // 如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock + // 否则,像目前一样,使用grpc向指定节点获取 + client := agentcaller.NewFileTransportClient(conn) + //rpc get + // TODO 要考虑读取失败后,如何中断后续解码过程 + stream, err := client.GetFile(context.Background(), &agentcaller.GetReq{ + FileHash: blockHash, + }) - for i := 0; int64(i) < numPacket; i++ { - fmt.Println(i) - // TODO 同上 - res, _ := stream.Recv() - fmt.Println(res.BlockOrReplicaData) - getBuf <- res.BlockOrReplicaData - } + for i := 0; int64(i) < numPacket; i++ { + fmt.Println(i) + // TODO 同上 + res, _ := stream.Recv() + fmt.Println(res.BlockOrReplicaData) + getBuf <- res.BlockOrReplicaData + } - close(getBuf) - return nil + close(getBuf) + return nil + */ } func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) { @@ -346,4 +348,3 @@ func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockI go persist(decodeBufs[:], numPacket, localFilePath, &wg) wg.Wait() } -*/