diff --git a/assets/test b/assets/test deleted file mode 100644 index 58d0564..0000000 Binary files a/assets/test and /dev/null differ diff --git a/assets/test1 b/assets/test1 deleted file mode 100644 index c2f5519..0000000 Binary files a/assets/test1 and /dev/null differ diff --git a/assets/test2 b/assets/test2 deleted file mode 100644 index 7d3913e..0000000 --- a/assets/test2 +++ /dev/null @@ -1 +0,0 @@ -tbdfhjlnprtbdfhjlnpr \ No newline at end of file diff --git a/assets/test3 b/assets/test3 deleted file mode 100644 index d61f7d2..0000000 --- a/assets/test3 +++ /dev/null @@ -1 +0,0 @@ -122345678901234567890 \ No newline at end of file diff --git a/assets/testin b/assets/testin deleted file mode 100644 index e69de29..0000000 diff --git a/assets/testin2 b/assets/testin2 deleted file mode 100644 index e69de29..0000000 diff --git a/client b/client deleted file mode 100755 index af43397..0000000 Binary files a/client and /dev/null differ diff --git a/clientCommand.go b/clientCommand.go index 795caab..8382dc7 100644 --- a/clientCommand.go +++ b/clientCommand.go @@ -213,7 +213,11 @@ func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath str if err != nil { return } - + /* + TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid + 如果本地有ipfs daemon且能获取相应对象的cid,则获取对象cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock + 否则,像目前一样,使用grpc向指定节点获取 + */ stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{ BlockOrReplicaHash: repHash, }) @@ -235,7 +239,7 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep fileSizeInBytes := fileInfo.Size() fmt.Println(fileSizeInBytes) - //计算每个块的packet数 + //写入对象的packet数 numWholePacket := fileSizeInBytes / packetSizeInBytes lastPacketInBytes := fileSizeInBytes % packetSizeInBytes numPacket := numWholePacket @@ -243,7 +247,8 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep numPacket++ } - //发送写请求,分配写入节点Ip + //发送写请求,请求Coor分配写入节点Ip + //TO DO: 加入两个字段,本机IP和当前进程号 command1 := rabbitmq.RepWriteCommand{ BucketName: bucketName, ObjectName: objectName, @@ -257,9 +262,15 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue") rabbit1.PublishSimple(b1) - //接收消息,赋值给ips var res1 rabbitmq.WriteRes var ips []string + /* + TODO xh: 判断writeRes里的状态码 + 如果有错,就报错返回,结束程序 + 如果没错,就把得到的IP值赋给ips + */ + + //TODO xh: queueName调整:coorClientQueue+"_"+"本机Ip"+"_"+"进程号" queueName := "coorClientQueue" + strconv.Itoa(userId) rabbit2 := rabbitmq.NewRabbitMQSimple(queueName) msgs := rabbit2.ConsumeSimple(time.Millisecond, true) @@ -269,8 +280,8 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep for d := range msgs { _ = json.Unmarshal(d.Body, &res1) ips = res1.Ips - wg.Done() } + wg.Done() }() wg.Wait() @@ -285,11 +296,13 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep go loadDistribute(localFilePath, loadDistributeBufs[:], numWholePacket, lastPacketInBytes) //从本地文件系统加载数据 wg.Add(numRep) for i := 0; i < numRep; i++ { + //TODO xh: send的第一个参数不需要了 go send("rep.json"+strconv.Itoa(i), ips[i], loadDistributeBufs[i], numPacket, &wg, hashs, i) //"block1.json"这样参数不需要 } wg.Wait() //第二轮通讯:插入元数据hashs + //TODO xh: 加入pid字段 command2 := rabbitmq.WriteHashCommand{ BucketName: bucketName, ObjectName: objectName, @@ -311,6 +324,7 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep if res2.MetaCode == 0 { wg.Done() } + //TODO xh: MetaCode不为零,代表插入出错,需输出错误 } }() wg.Wait() @@ -470,11 +484,11 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName } func repMove(ip string, hash string) { - //通过消息队列发送调度命令 + //TO DO: 通过消息队列发送调度命令 } func ecMove(ip string, hashs []string, ids []int, ecName string) { - //通过消息队列发送调度命令 + //TO DO: 通过消息队列发送调度命令 } func loadDistribute(localFilePath string, loadDistributeBufs []chan []byte, numWholePacket int64, lastPacketInBytes int64) { @@ -609,6 +623,11 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int func send(blockName string, ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) { fmt.Println("send " + blockName) + /* + TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足 + 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象 + 否则,像目前一样,使用grpc向指定节点获取 + */ //rpc相关 conn, err := grpc.Dial(ip+port, grpc.WithInsecure()) if err != nil { @@ -647,6 +666,11 @@ func get(blockHash string, ip string, getBuf chan []byte, numPacket int64) { if err != nil { panic(err) } + /* + TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid + 如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock + 否则,像目前一样,使用grpc向指定节点获取 + */ client := agentcaller.NewTranBlockOrReplicaClient(conn) //rpc get stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{ diff --git a/go.mod b/go.mod index 1448a8b..aa58fe3 100644 --- a/go.mod +++ b/go.mod @@ -2,34 +2,12 @@ module client go 1.18 -require proto v0.0.0 - -replace proto => ../proto - -replace rabbitmq => ../rabbitmq - -replace ec => ../ec - -replace utils => ../utils - -require ( - utils v0.0.0-00010101000000-000000000000 - ec v0.0.0-00010101000000-000000000000 - google.golang.org/grpc v1.53.0 - rabbitmq v0.0.0 -) +require google.golang.org/grpc v1.53.0 require ( - github.com/beevik/etree v1.1.0 // indirect - github.com/go-ping/ping v1.1.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/klauspost/cpuid/v2 v2.1.1 // indirect - github.com/klauspost/reedsolomon v1.11.7 // indirect - github.com/streadway/amqp v1.0.0 // indirect golang.org/x/net v0.5.0 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.4.0 // indirect golang.org/x/text v0.6.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect diff --git a/go.sum b/go.sum index 212a1b2..c5ea9bc 100644 --- a/go.sum +++ b/go.sum @@ -1,37 +1,15 @@ -github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs= -github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= -github.com/go-ping/ping v1.1.0 h1:3MCGhVX4fyEUuhsfwPrsEdQw6xspHkv5zHsiSoDFZYw= -github.com/go-ping/ping v1.1.0/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/cpuid/v2 v2.1.1 h1:t0wUqjowdm8ezddV5k0tLWVklVuvLJpoHeb4WBdydm0= -github.com/klauspost/cpuid/v2 v2.1.1/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= -github.com/klauspost/reedsolomon v1.11.7 h1:9uaHU0slncktTEEg4+7Vl7q7XUNMBUOK4R9gnKhMjAU= -github.com/klauspost/reedsolomon v1.11.7/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A= -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= -golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= diff --git a/main.go b/main.go index b5f6e81..679653e 100644 --- a/main.go +++ b/main.go @@ -20,26 +20,36 @@ import ( ) func main() { + //TODO xh:加载配置文件,获取packetSizeInBytes、grpc port、ipfs port、最大副本数、本机公网Ip等信息,参照src/utils/config.go + args := os.Args arg_num := len(os.Args) for i := 0; i < arg_num; i++ { fmt.Println(args[i]) } - + //TODO: 改为交互式client,输入用户名及秘钥后进入交互界面 switch args[1] { case "ecWrite": EcWrite(args[2], args[3], args[4], args[5]) + //TODO: 写入对象时,Coor判断对象是否已存在,如果存在,则直接返回 case "write": numRep, _ := strconv.Atoi(args[5]) - if numRep <= 0 || numRep > 10 { - print("write::InputError!") + if numRep <= 0 || numRep > 10 { //TODO xh:10改为从配置文件中读出的最大副本数 + print("write::InputError!") //TODO xh:优化提示语 } else { RepWrite(args[2], args[3], args[4], numRep) } - case "read": Read(args[2], args[3], args[4]) case "move": Move(args[2], args[3], args[4]) //bucket object destination } + /* + TO DO: + 1. ls命令,显示用户指定桶下的所有对象,及相关的元数据 + 2. rm命令,用户指定bucket和object名,执行删除操作 + 3. update命令,用户发起对象更新命令,查询元数据,判断对象的冗余方式,删除旧对象(unpin所有的副本或编码块),写入新对象 + 4. ipfsStat命令,查看本地有无ipfsdaemon,ipfs目录的使用率 + 5. ipfsFlush命令,unpin本地ipfs目录中的所有cid(block) + */ } diff --git a/test b/test deleted file mode 100644 index 8b56eea..0000000 --- a/test +++ /dev/null @@ -1 +0,0 @@ -zkxx4567890123456789qazwsxedcr diff --git a/test1 b/test1 deleted file mode 100644 index 601f891..0000000 --- a/test1 +++ /dev/null @@ -1 +0,0 @@ -In the flood of darkness, hope is the light. It brings comfort, faith, and confidence. It gives us guidance when we are lost, and gives support when we are afraid. And the moment we give up hope, we give up our lives. The world we live in is disintegrating into a place of malice and hatred, where we need hope and find it harder. In this world of fear, hope to find better, but easier said than done, the more meaningful life of faith will make life \ No newline at end of file