You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

clientCommand.go 18 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. package main
  2. import (
  3. "context"
  4. "io"
  5. "os"
  6. "fmt"
  7. "encoding/json"
  8. "path/filepath"
  9. "sync"
  10. "strconv"
  11. "rabbitmq"
  12. "time"
  13. agentcaller "proto"
  14. //"github.com/pborman/uuid"
  15. //"github.com/streadway/amqp"
  16. "google.golang.org/grpc"
  17. _ "google.golang.org/grpc/balancer/grpclb"
  18. )
  19. const (
  20. port = ":5010"
  21. packetSizeInBytes=10
  22. )
  23. func Move(bucketName string, objectName string, destination string){
  24. //将bucketName, objectName, destination发给协调端
  25. fmt.Println("move "+bucketName+"/"+objectName+" to "+destination)
  26. //获取块hash,ip,序号,编码参数等
  27. //发送写请求,分配写入节点Ip
  28. userId:=0
  29. command1:= rabbitmq.MoveCommand{
  30. BucketName: bucketName,
  31. ObjectName: objectName,
  32. UserId: userId,
  33. Destination: destination,
  34. }
  35. c1,_:=json.Marshal(command1)
  36. b1:=append([]byte("05"),c1...)
  37. fmt.Println(string(b1))
  38. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  39. rabbit1.PublishSimple(b1)
  40. //接收消息,赋值给ip, repHash, fileSizeInBytes
  41. var res1 rabbitmq.MoveRes
  42. var redundancy string
  43. var hashs []string
  44. var fileSizeInBytes int64
  45. var ecName string
  46. var ids []int
  47. queueName := "coorClientQueue"+strconv.Itoa(userId)
  48. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  49. msgs:=rabbit2.ConsumeSimple(time.Millisecond, true)
  50. wg := sync.WaitGroup{}
  51. wg.Add(1)
  52. go func() {
  53. for d := range msgs {
  54. _ = json.Unmarshal(d.Body, &res1)
  55. redundancy=res1.Redundancy
  56. ids=res1.Ids
  57. hashs=res1.Hashs
  58. fileSizeInBytes=res1.FileSizeInBytes
  59. ecName=res1.EcName
  60. wg.Done()
  61. }
  62. }()
  63. wg.Wait()
  64. fmt.Println(redundancy)
  65. fmt.Println(hashs)
  66. fmt.Println(ids)
  67. fmt.Println(fileSizeInBytes)
  68. fmt.Println(ecName)
  69. //根据redundancy调用repMove和ecMove
  70. rabbit3 := rabbitmq.NewRabbitMQSimple("agentQueue"+destination)
  71. var b2 []byte
  72. switch redundancy {
  73. case "rep":
  74. command2:= rabbitmq.RepMoveCommand{
  75. Hashs: hashs,
  76. BucketName: bucketName,
  77. ObjectName: objectName,
  78. UserId: userId,
  79. }
  80. c2,_:=json.Marshal(command2)
  81. b2=append([]byte("00"),c2...)
  82. case "ec":
  83. command2:= rabbitmq.EcMoveCommand{
  84. Hashs: hashs,
  85. Ids: ids,
  86. EcName: ecName,
  87. BucketName: bucketName,
  88. ObjectName: objectName,
  89. UserId: userId,
  90. }
  91. c2,_:=json.Marshal(command2)
  92. b2=append([]byte("01"),c2...)
  93. }
  94. fmt.Println(b2)
  95. rabbit3.PublishSimple(b2)
  96. //接受调度成功与否的消息
  97. //接受第二轮通讯结果
  98. var res2 rabbitmq.AgentMoveRes
  99. queueName = "agentClientQueue"+strconv.Itoa(userId)
  100. rabbit4 := rabbitmq.NewRabbitMQSimple(queueName)
  101. msgs=rabbit4.ConsumeSimple(time.Millisecond, true)
  102. wg.Add(1)
  103. go func() {
  104. for d := range msgs {
  105. _ = json.Unmarshal(d.Body, &res2)
  106. if(res2.MoveCode==0){
  107. wg.Done()
  108. fmt.Println("Move Success")
  109. }
  110. }
  111. }()
  112. wg.Wait()
  113. rabbit1.Destroy()
  114. rabbit2.Destroy()
  115. rabbit3.Destroy()
  116. rabbit4.Destroy()
  117. }
  118. func Read(localFilePath string, bucketName string, objectName string){
  119. fmt.Println("read "+bucketName+"/"+objectName+" to "+localFilePath)
  120. //获取块hash,ip,序号,编码参数等
  121. //发送写请求,分配写入节点Ip
  122. userId:=0
  123. command1:= rabbitmq.ReadCommand{
  124. BucketName: bucketName,
  125. ObjectName: objectName,
  126. UserId: userId,
  127. }
  128. c1,_:=json.Marshal(command1)
  129. b1:=append([]byte("02"),c1...)
  130. fmt.Println(b1)
  131. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  132. rabbit1.PublishSimple(b1)
  133. //接收消息,赋值给ip, repHash, fileSizeInBytes
  134. var res1 rabbitmq.ReadRes
  135. var hashs []string
  136. var ips []string
  137. var fileSizeInBytes int64
  138. var ecName string
  139. var ids []int
  140. var redundancy string
  141. queueName := "coorClientQueue"+strconv.Itoa(userId)
  142. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  143. msgs:=rabbit2.ConsumeSimple(time.Millisecond, true)
  144. wg := sync.WaitGroup{}
  145. wg.Add(1)
  146. go func() {
  147. for d := range msgs {
  148. _ = json.Unmarshal(d.Body, &res1)
  149. ips=res1.Ips
  150. hashs=res1.Hashs
  151. ids=res1.BlockIds
  152. ecName=res1.EcName
  153. fileSizeInBytes=res1.FileSizeInBytes
  154. redundancy=res1.Redundancy
  155. wg.Done()
  156. }
  157. }()
  158. wg.Wait()
  159. fmt.Println(redundancy)
  160. fmt.Println(ips)
  161. fmt.Println(hashs)
  162. fmt.Println(ids)
  163. fmt.Println(ecName)
  164. fmt.Println(fileSizeInBytes)
  165. rabbit1.Destroy()
  166. rabbit2.Destroy()
  167. switch redundancy {
  168. case "rep":
  169. repRead(fileSizeInBytes, ips[0], hashs[0], localFilePath)
  170. case "ec":
  171. ecRead(fileSizeInBytes, ips, hashs, ids, ecName, localFilePath)
  172. }
  173. }
  174. func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath string){
  175. numPacket := (fileSizeInBytes+packetSizeInBytes-1)/(packetSizeInBytes)
  176. fmt.Println(numPacket)
  177. //rpc相关
  178. conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
  179. if err != nil {
  180. panic(err)
  181. }
  182. client := agentcaller.NewTranBlockOrReplicaClient(conn)
  183. fDir, err := os.Executable()
  184. if err != nil {
  185. panic(err)
  186. }
  187. fURL := filepath.Join(filepath.Dir(fDir), "assets")
  188. _, err = os.Stat(fURL)
  189. if os.IsNotExist(err) {
  190. os.MkdirAll(fURL, os.ModePerm)
  191. }
  192. file, err := os.Create(filepath.Join(fURL, localFilePath))
  193. if err != nil {
  194. return
  195. }
  196. stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{
  197. BlockOrReplicaHash: repHash,
  198. })
  199. fmt.Println(numPacket)
  200. for i:=0;int64(i)<numPacket;i++{
  201. fmt.Println(i)
  202. res, _:= stream.Recv()
  203. fmt.Println(res.BlockOrReplicaData)
  204. file.Write(res.BlockOrReplicaData)
  205. }
  206. file.Close()
  207. conn.Close()
  208. }
  209. func RepWrite(localFilePath string, bucketName string, objectName string, numRep int){
  210. userId:=0
  211. //获取文件大小
  212. fileInfo,_ := os.Stat(localFilePath)
  213. fileSizeInBytes := fileInfo.Size()
  214. fmt.Println(fileSizeInBytes)
  215. //计算每个块的packet数
  216. numWholePacket := fileSizeInBytes/packetSizeInBytes
  217. lastPacketInBytes:=fileSizeInBytes%packetSizeInBytes
  218. numPacket:=numWholePacket
  219. if lastPacketInBytes>0 {
  220. numPacket++
  221. }
  222. //发送写请求,分配写入节点Ip
  223. command1:= rabbitmq.RepWriteCommand{
  224. BucketName: bucketName,
  225. ObjectName: objectName,
  226. FileSizeInBytes: fileSizeInBytes,
  227. NumRep: numRep,
  228. UserId: userId,
  229. }
  230. c1,_:=json.Marshal(command1)
  231. b1:=append([]byte("03"),c1...)
  232. fmt.Println(b1)
  233. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  234. rabbit1.PublishSimple(b1)
  235. //接收消息,赋值给ips
  236. var res1 rabbitmq.WriteRes
  237. var ips []string
  238. queueName := "coorClientQueue"+strconv.Itoa(userId)
  239. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  240. msgs:=rabbit2.ConsumeSimple(time.Millisecond, true)
  241. wg := sync.WaitGroup{}
  242. wg.Add(1)
  243. go func() {
  244. for d := range msgs {
  245. _ = json.Unmarshal(d.Body, &res1)
  246. ips=res1.Ips
  247. wg.Done()
  248. }
  249. }()
  250. wg.Wait()
  251. //创建channel
  252. loadDistributeBufs:=make([]chan []byte,numRep)
  253. for i := 0; i < numRep; i++ {
  254. loadDistributeBufs[i] = make(chan []byte)
  255. }
  256. //正式开始写入
  257. hashs:= make([]string, numRep)
  258. go loadDistribute(localFilePath, loadDistributeBufs[:], numWholePacket, lastPacketInBytes)//从本地文件系统加载数据
  259. wg.Add(numRep)
  260. for i:=0;i<numRep;i++ {
  261. go send("rep.json"+strconv.Itoa(i), ips[i], loadDistributeBufs[i], numPacket, &wg, hashs, i)//"block1.json"这样参数不需要
  262. }
  263. wg.Wait()
  264. //第二轮通讯:插入元数据hashs
  265. command2:= rabbitmq.WriteHashCommand{
  266. BucketName: bucketName,
  267. ObjectName: objectName,
  268. Hashs: hashs,
  269. Ips: ips,
  270. UserId: userId,
  271. }
  272. c1,_=json.Marshal(command2)
  273. b1=append([]byte("04"),c1...)
  274. rabbit1.PublishSimple(b1)
  275. //接受第二轮通讯结果
  276. var res2 rabbitmq.WriteHashRes
  277. msgs=rabbit2.ConsumeSimple(time.Millisecond, true)
  278. wg.Add(1)
  279. go func() {
  280. for d := range msgs {
  281. _ = json.Unmarshal(d.Body, &res2)
  282. if(res2.MetaCode==0){
  283. wg.Done()
  284. }
  285. }
  286. }()
  287. wg.Wait()
  288. rabbit1.Destroy()
  289. rabbit2.Destroy()
  290. //
  291. }
  292. func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds []int, ecName string, localFilePath string){
  293. //根据ecName获得以下参数
  294. wg := sync.WaitGroup{}
  295. const ecK int = 2
  296. const ecN int = 3
  297. var coefs = [][]int64 {{1,1,1},{1,2,3}}//2应替换为ecK,3应替换为ecN
  298. numPacket := (fileSizeInBytes+int64(ecK)*packetSizeInBytes-1)/(int64(ecK)*packetSizeInBytes)
  299. fmt.Println(numPacket)
  300. //创建channel
  301. var getBufs [ecK]chan []byte
  302. var decodeBufs [ecK]chan []byte
  303. for i := 0; i < ecK; i++ {
  304. getBufs[i] = make(chan []byte)
  305. }
  306. for i := 0; i < ecK; i++ {
  307. decodeBufs[i] = make(chan []byte)
  308. }
  309. wg.Add(1)
  310. go get(blockHashs[0], ips[0], getBufs[0], numPacket)
  311. go get(blockHashs[1], ips[1], getBufs[1], numPacket)
  312. go encode(getBufs[:], decodeBufs[:], coefs, numPacket)
  313. go persist(decodeBufs[:], numPacket, localFilePath, &wg)
  314. wg.Wait()
  315. }
  316. func EcWrite(localFilePath string, bucketName string, objectName string, ecName string){
  317. fmt.Println("write "+localFilePath+" as "+bucketName+"/"+objectName)
  318. //获取文件大小
  319. fileInfo,_ := os.Stat(localFilePath)
  320. fileSizeInBytes := fileInfo.Size()
  321. fmt.Println(fileSizeInBytes)
  322. //调用纠删码库,获取编码参数及生成矩阵
  323. const ecK int = 2
  324. const ecN int = 3
  325. var coefs = [][]int64 {{1,1,1},{1,2,3}}//2应替换为ecK,3应替换为ecN
  326. //计算每个块的packet数
  327. numPacket := (fileSizeInBytes+int64(ecK)*packetSizeInBytes-1)/(int64(ecK)*packetSizeInBytes)
  328. fmt.Println(numPacket)
  329. //发送写请求,分配写入节点
  330. userId :=0
  331. //发送写请求,分配写入节点Ip
  332. command1:= rabbitmq.EcWriteCommand{
  333. BucketName: bucketName,
  334. ObjectName: objectName,
  335. FileSizeInBytes: fileSizeInBytes,
  336. EcName: ecName,
  337. UserId: userId,
  338. }//
  339. c1,_:=json.Marshal(command1)
  340. b1:=append([]byte("00"),c1...)//
  341. fmt.Println(b1)
  342. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  343. rabbit1.PublishSimple(b1)
  344. //接收消息,赋值给ips
  345. var res1 rabbitmq.WriteRes
  346. var ips []string
  347. queueName := "coorClientQueue"+strconv.Itoa(userId)
  348. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  349. msgs:=rabbit2.ConsumeSimple(time.Millisecond, true)
  350. wg := sync.WaitGroup{}
  351. wg.Add(1)
  352. go func() {
  353. for d := range msgs {
  354. _ = json.Unmarshal(d.Body, &res1)
  355. ips=res1.Ips
  356. wg.Done()
  357. }
  358. }()
  359. wg.Wait()
  360. //创建channel
  361. var loadBufs [ecK]chan []byte
  362. var encodeBufs [ecN]chan []byte
  363. for i := 0; i < ecK; i++ {
  364. loadBufs[i] = make(chan []byte)
  365. }
  366. for i := 0; i < ecN; i++ {
  367. encodeBufs[i] = make(chan []byte)
  368. }
  369. //正式开始写入
  370. go load(localFilePath, loadBufs[:], numPacket*int64(ecK), fileSizeInBytes)//从本地文件系统加载数据
  371. go encode(loadBufs[:], encodeBufs[:], coefs, numPacket)
  372. wg.Add(3)
  373. hashs:= make([]string, 3)
  374. go send("block1.json", ips[0], encodeBufs[0], numPacket, &wg, hashs, 0)//"block1.json"这样参数不需要
  375. go send("block2.json", ips[1], encodeBufs[1], numPacket, &wg, hashs, 1)
  376. go send("block3.json", ips[2], encodeBufs[2], numPacket, &wg, hashs, 2)
  377. wg.Wait()
  378. //第二轮通讯:插入元数据hashs
  379. command2:= rabbitmq.WriteHashCommand{
  380. BucketName: bucketName,
  381. ObjectName: objectName,
  382. Hashs: hashs,
  383. Ips: ips,
  384. UserId: userId,
  385. }
  386. c1,_=json.Marshal(command2)
  387. b1=append([]byte("01"),c1...)
  388. rabbit1.PublishSimple(b1)
  389. //接受第二轮通讯结果
  390. var res2 rabbitmq.WriteHashRes
  391. msgs=rabbit2.ConsumeSimple(time.Millisecond, true)
  392. wg.Add(1)
  393. go func() {
  394. for d := range msgs {
  395. _ = json.Unmarshal(d.Body, &res2)
  396. if(res2.MetaCode==0){
  397. wg.Done()
  398. }
  399. }
  400. }()
  401. wg.Wait()
  402. rabbit1.Destroy()
  403. rabbit2.Destroy()
  404. //
  405. }
  406. func repMove(ip string, hash string){
  407. //通过消息队列发送调度命令
  408. }
  409. func ecMove(ip string, hashs []string, ids []int, ecName string){
  410. //通过消息队列发送调度命令
  411. }
  412. func loadDistribute(localFilePath string, loadDistributeBufs []chan []byte, numWholePacket int64, lastPacketInBytes int64){
  413. fmt.Println("loadDistribute "+ localFilePath)
  414. file, _ := os.Open(localFilePath)
  415. for i:=0;int64(i)<numWholePacket;i++ {
  416. buf := make([]byte, packetSizeInBytes)
  417. _, err := file.Read(buf)
  418. if err != nil && err != io.EOF {
  419. break
  420. }
  421. for j:=0;j<len(loadDistributeBufs);j++ {
  422. loadDistributeBufs[j]<-buf
  423. }
  424. }
  425. if lastPacketInBytes>0 {
  426. buf := make([]byte, lastPacketInBytes)
  427. file.Read(buf)
  428. for j:=0;j<len(loadDistributeBufs);j++ {
  429. loadDistributeBufs[j]<-buf
  430. }
  431. }
  432. fmt.Println("load over")
  433. for i:=0;i<len(loadDistributeBufs);i++{
  434. close(loadDistributeBufs[i])
  435. }
  436. file.Close()
  437. }
  438. func load(localFilePath string, loadBufs []chan []byte, totalNumPacket int64, fileSizeInBytes int64){
  439. fmt.Println("load "+ localFilePath)
  440. file, _ := os.Open(localFilePath)
  441. for i:=0;int64(i)<totalNumPacket;i++ {
  442. buf := make([]byte, packetSizeInBytes)
  443. _, err := file.Read(buf)
  444. if err != nil && err != io.EOF {
  445. break
  446. }
  447. idx:=i%len(loadBufs)
  448. loadBufs[idx]<-buf
  449. }
  450. fmt.Println("load over")
  451. for i:=0;i<len(loadBufs);i++{
  452. close(loadBufs[i])
  453. }
  454. file.Close()
  455. }
  456. func encode(inBufs []chan []byte, outBufs []chan []byte, coefs [][]int64, numPacket int64){
  457. fmt.Println("encode ")
  458. var tmpIn [][]byte
  459. tmpIn = make([][]byte, len(inBufs))
  460. for i := 0; int64(i) < numPacket; i++ {
  461. for j :=0; j < len(inBufs); j++ {//2
  462. tmpIn[j]=<-inBufs[j]
  463. //fmt.Println(tmpIn[j])
  464. }
  465. for j := 0; j < len(outBufs); j++{
  466. tmp := make([]byte, packetSizeInBytes)
  467. for k := 0; k < packetSizeInBytes; k++{
  468. for w := 0; w < len(inBufs); w++{
  469. //示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  470. /*fmt.Println(w)
  471. fmt.Println(k)
  472. fmt.Println(i)
  473. fmt.Println(tmpIn[w])
  474. fmt.Println(tmpIn[w][k])
  475. fmt.Println("-----")*/
  476. tmp[k] = tmp[k]+tmpIn[w][k]
  477. }
  478. }
  479. outBufs[j]<-tmp
  480. }
  481. }
  482. fmt.Println("encode over")
  483. for i:=0;i<len(outBufs);i++{
  484. close(outBufs[i])
  485. }
  486. }
  487. func send(blockhash string, ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int){
  488. fmt.Println("send "+blockhash)
  489. //rpc相关
  490. conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
  491. if err != nil {
  492. panic(err)
  493. }
  494. client := agentcaller.NewTranBlockOrReplicaClient(conn)
  495. stream, err := client.SendBlockOrReplica(context.Background())
  496. if err != nil {
  497. panic(err)
  498. }
  499. for i:=0;int64(i)<numPacket;i++{
  500. buf:=<-inBuf
  501. fmt.Println(buf)
  502. err:=stream.Send(&agentcaller.BlockOrReplica{
  503. BlockOrReplicaName: blockhash,
  504. BlockOrReplicaHash: blockhash,
  505. BlockOrReplicaData: buf,
  506. })
  507. if err != nil && err != io.EOF{
  508. panic(err)
  509. }
  510. }
  511. res, err := stream.CloseAndRecv()
  512. fmt.Println(res)
  513. hashs[idx]=res.BlockOrReplicaHash
  514. conn.Close()
  515. wg.Done()
  516. return
  517. }
  518. func get(blockHash string, ip string, getBuf chan []byte, numPacket int64){
  519. //rpc相关
  520. conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
  521. if err != nil {
  522. panic(err)
  523. }
  524. client := agentcaller.NewTranBlockOrReplicaClient(conn)
  525. //rpc get
  526. stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{
  527. BlockOrReplicaHash: blockHash,
  528. })
  529. fmt.Println(numPacket)
  530. for i:=0;int64(i)<numPacket;i++{
  531. fmt.Println(i)
  532. res, _:= stream.Recv()
  533. fmt.Println(res.BlockOrReplicaData)
  534. getBuf<-res.BlockOrReplicaData
  535. }
  536. close(getBuf)
  537. conn.Close()
  538. }
  539. func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup){
  540. fDir, err := os.Executable()
  541. if err != nil {
  542. panic(err)
  543. }
  544. fURL := filepath.Join(filepath.Dir(fDir), "assets")
  545. _, err = os.Stat(fURL)
  546. if os.IsNotExist(err) {
  547. os.MkdirAll(fURL, os.ModePerm)
  548. }
  549. file, err := os.Create(filepath.Join(fURL, localFilePath))
  550. if err != nil {
  551. return
  552. }
  553. for i := 0; int64(i) < numPacket; i++ {
  554. for j := 0; j < len(inBuf); j++{
  555. tmp:=<-inBuf[j]
  556. fmt.Println(tmp)
  557. file.Write(tmp)
  558. }
  559. }
  560. file.Close()
  561. wg.Done()
  562. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。

Contributors (1)