You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

clientCommand.go 19 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path/filepath"
  9. agentcaller "proto"
  10. "rabbitmq"
  11. "strconv"
  12. "sync"
  13. "time"
  14. //"reflect"
  15. //"github.com/pborman/uuid"
  16. //"github.com/streadway/amqp"
  17. "ec"
  18. "utils"
  19. "google.golang.org/grpc"
  20. _ "google.golang.org/grpc/balancer/grpclb"
  21. )
  22. const (
  23. port = ":5010"
  24. packetSizeInBytes = 10
  25. )
  26. func Move(bucketName string, objectName string, destination string) {
  27. //将bucketName, objectName, destination发给协调端
  28. fmt.Println("move " + bucketName + "/" + objectName + " to " + destination)
  29. //获取块hash,ip,序号,编码参数等
  30. //发送写请求,分配写入节点Ip
  31. userId := 0
  32. command1 := rabbitmq.MoveCommand{
  33. BucketName: bucketName,
  34. ObjectName: objectName,
  35. UserId: userId,
  36. Destination: destination,
  37. }
  38. c1, _ := json.Marshal(command1)
  39. b1 := append([]byte("05"), c1...)
  40. fmt.Println(string(b1))
  41. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  42. rabbit1.PublishSimple(b1)
  43. //接收消息,赋值给ip, repHash, fileSizeInBytes
  44. var res1 rabbitmq.MoveRes
  45. var redundancy string
  46. var hashs []string
  47. var fileSizeInBytes int64
  48. var ecName string
  49. var ids []int
  50. queueName := "coorClientQueue" + strconv.Itoa(userId)
  51. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  52. msgs := rabbit2.ConsumeSimple(time.Millisecond, true)
  53. wg := sync.WaitGroup{}
  54. wg.Add(1)
  55. go func() {
  56. for d := range msgs {
  57. _ = json.Unmarshal(d.Body, &res1)
  58. redundancy = res1.Redundancy
  59. ids = res1.Ids
  60. hashs = res1.Hashs
  61. fileSizeInBytes = res1.FileSizeInBytes
  62. ecName = res1.EcName
  63. wg.Done()
  64. }
  65. }()
  66. wg.Wait()
  67. fmt.Println(redundancy)
  68. fmt.Println(hashs)
  69. fmt.Println(ids)
  70. fmt.Println(fileSizeInBytes)
  71. fmt.Println(ecName)
  72. //根据redundancy调用repMove和ecMove
  73. rabbit3 := rabbitmq.NewRabbitMQSimple("agentQueue" + destination)
  74. var b2 []byte
  75. switch redundancy {
  76. case "rep":
  77. command2 := rabbitmq.RepMoveCommand{
  78. Hashs: hashs,
  79. BucketName: bucketName,
  80. ObjectName: objectName,
  81. UserId: userId,
  82. FileSizeInBytes: fileSizeInBytes,
  83. }
  84. c2, _ := json.Marshal(command2)
  85. b2 = append([]byte("00"), c2...)
  86. case "ec":
  87. command2 := rabbitmq.EcMoveCommand{
  88. Hashs: hashs,
  89. Ids: ids,
  90. EcName: ecName,
  91. BucketName: bucketName,
  92. ObjectName: objectName,
  93. UserId: userId,
  94. FileSizeInBytes: fileSizeInBytes,
  95. }
  96. c2, _ := json.Marshal(command2)
  97. b2 = append([]byte("01"), c2...)
  98. }
  99. fmt.Println(b2)
  100. rabbit3.PublishSimple(b2)
  101. //接受调度成功与否的消息
  102. //接受第二轮通讯结果
  103. var res2 rabbitmq.AgentMoveRes
  104. queueName = "agentClientQueue" + strconv.Itoa(userId)
  105. rabbit4 := rabbitmq.NewRabbitMQSimple(queueName)
  106. msgs = rabbit4.ConsumeSimple(time.Millisecond, true)
  107. wg.Add(1)
  108. go func() {
  109. for d := range msgs {
  110. _ = json.Unmarshal(d.Body, &res2)
  111. if res2.MoveCode == 0 {
  112. wg.Done()
  113. fmt.Println("Move Success")
  114. }
  115. }
  116. }()
  117. wg.Wait()
  118. rabbit1.Destroy()
  119. rabbit2.Destroy()
  120. rabbit3.Destroy()
  121. rabbit4.Destroy()
  122. }
  123. func Read(localFilePath string, bucketName string, objectName string) {
  124. fmt.Println("read " + bucketName + "/" + objectName + " to " + localFilePath)
  125. //获取块hash,ip,序号,编码参数等
  126. //发送写请求,分配写入节点Ip
  127. userId := 0
  128. command1 := rabbitmq.ReadCommand{
  129. BucketName: bucketName,
  130. ObjectName: objectName,
  131. UserId: userId,
  132. }
  133. c1, _ := json.Marshal(command1)
  134. b1 := append([]byte("02"), c1...)
  135. fmt.Println(b1)
  136. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  137. rabbit1.PublishSimple(b1)
  138. //接收消息,赋值给ip, repHash, fileSizeInBytes
  139. var res1 rabbitmq.ReadRes
  140. var hashs []string
  141. var ips []string
  142. var fileSizeInBytes int64
  143. var ecName string
  144. var ids []int
  145. var redundancy string
  146. queueName := "coorClientQueue" + strconv.Itoa(userId)
  147. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  148. msgs := rabbit2.ConsumeSimple(time.Millisecond, true)
  149. wg := sync.WaitGroup{}
  150. wg.Add(1)
  151. go func() {
  152. for d := range msgs {
  153. _ = json.Unmarshal(d.Body, &res1)
  154. ips = res1.Ips
  155. hashs = res1.Hashs
  156. ids = res1.BlockIds
  157. ecName = res1.EcName
  158. fileSizeInBytes = res1.FileSizeInBytes
  159. redundancy = res1.Redundancy
  160. wg.Done()
  161. }
  162. }()
  163. wg.Wait()
  164. fmt.Println(redundancy)
  165. fmt.Println(ips)
  166. fmt.Println(hashs)
  167. fmt.Println(ids)
  168. fmt.Println(ecName)
  169. fmt.Println(fileSizeInBytes)
  170. rabbit1.Destroy()
  171. rabbit2.Destroy()
  172. switch redundancy {
  173. case "rep":
  174. repRead(fileSizeInBytes, ips[0], hashs[0], localFilePath)
  175. case "ec":
  176. ecRead(fileSizeInBytes, ips, hashs, ids, ecName, localFilePath)
  177. }
  178. }
  179. func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath string) {
  180. numPacket := (fileSizeInBytes + packetSizeInBytes - 1) / (packetSizeInBytes)
  181. fmt.Println(numPacket)
  182. //rpc相关
  183. conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
  184. if err != nil {
  185. panic(err)
  186. }
  187. client := agentcaller.NewTranBlockOrReplicaClient(conn)
  188. fDir, err := os.Executable()
  189. if err != nil {
  190. panic(err)
  191. }
  192. fURL := filepath.Join(filepath.Dir(fDir), "assets")
  193. _, err = os.Stat(fURL)
  194. if os.IsNotExist(err) {
  195. os.MkdirAll(fURL, os.ModePerm)
  196. }
  197. file, err := os.Create(filepath.Join(fURL, localFilePath))
  198. if err != nil {
  199. return
  200. }
  201. /*
  202. TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid
  203. 如果本地有ipfs daemon且能获取相应对象的cid,则获取对象cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock
  204. 否则,像目前一样,使用grpc向指定节点获取
  205. */
  206. stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{
  207. BlockOrReplicaHash: repHash,
  208. })
  209. fmt.Println(numPacket)
  210. for i := 0; int64(i) < numPacket; i++ {
  211. fmt.Println(i)
  212. res, _ := stream.Recv()
  213. fmt.Println(res.BlockOrReplicaData)
  214. file.Write(res.BlockOrReplicaData)
  215. }
  216. file.Close()
  217. conn.Close()
  218. }
  219. func RepWrite(localFilePath string, bucketName string, objectName string, numRep int) {
  220. userId := 0
  221. //获取文件大小
  222. fileInfo, _ := os.Stat(localFilePath)
  223. fileSizeInBytes := fileInfo.Size()
  224. fmt.Println(fileSizeInBytes)
  225. //写入对象的packet数
  226. numWholePacket := fileSizeInBytes / packetSizeInBytes
  227. lastPacketInBytes := fileSizeInBytes % packetSizeInBytes
  228. numPacket := numWholePacket
  229. if lastPacketInBytes > 0 {
  230. numPacket++
  231. }
  232. //发送写请求,请求Coor分配写入节点Ip
  233. //TO DO: 加入两个字段,本机IP和当前进程号
  234. command1 := rabbitmq.RepWriteCommand{
  235. BucketName: bucketName,
  236. ObjectName: objectName,
  237. FileSizeInBytes: fileSizeInBytes,
  238. NumRep: numRep,
  239. UserId: userId,
  240. }
  241. c1, _ := json.Marshal(command1)
  242. b1 := append([]byte("03"), c1...)
  243. fmt.Println(b1)
  244. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  245. rabbit1.PublishSimple(b1)
  246. var res1 rabbitmq.WriteRes
  247. var ips []string
  248. /*
  249. TODO xh: 判断writeRes里的状态码
  250. 如果有错,就报错返回,结束程序
  251. 如果没错,就把得到的IP值赋给ips
  252. */
  253. //TODO xh: queueName调整:coorClientQueue+"_"+"本机Ip"+"_"+"进程号"
  254. queueName := "coorClientQueue" + strconv.Itoa(userId)
  255. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  256. msgs := rabbit2.ConsumeSimple(time.Millisecond, true)
  257. wg := sync.WaitGroup{}
  258. wg.Add(1)
  259. go func() {
  260. for d := range msgs {
  261. _ = json.Unmarshal(d.Body, &res1)
  262. ips = res1.Ips
  263. }
  264. wg.Done()
  265. }()
  266. wg.Wait()
  267. //创建channel
  268. loadDistributeBufs := make([]chan []byte, numRep)
  269. for i := 0; i < numRep; i++ {
  270. loadDistributeBufs[i] = make(chan []byte)
  271. }
  272. //正式开始写入
  273. hashs := make([]string, numRep)
  274. go loadDistribute(localFilePath, loadDistributeBufs[:], numWholePacket, lastPacketInBytes) //从本地文件系统加载数据
  275. wg.Add(numRep)
  276. for i := 0; i < numRep; i++ {
  277. //TODO xh: send的第一个参数不需要了
  278. go send("rep.json"+strconv.Itoa(i), ips[i], loadDistributeBufs[i], numPacket, &wg, hashs, i) //"block1.json"这样参数不需要
  279. }
  280. wg.Wait()
  281. //第二轮通讯:插入元数据hashs
  282. //TODO xh: 加入pid字段
  283. command2 := rabbitmq.WriteHashCommand{
  284. BucketName: bucketName,
  285. ObjectName: objectName,
  286. Hashs: hashs,
  287. Ips: ips,
  288. UserId: userId,
  289. }
  290. c1, _ = json.Marshal(command2)
  291. b1 = append([]byte("04"), c1...)
  292. rabbit1.PublishSimple(b1)
  293. //接受第二轮通讯结果
  294. var res2 rabbitmq.WriteHashRes
  295. msgs = rabbit2.ConsumeSimple(time.Millisecond, true)
  296. wg.Add(1)
  297. go func() {
  298. for d := range msgs {
  299. _ = json.Unmarshal(d.Body, &res2)
  300. if res2.MetaCode == 0 {
  301. wg.Done()
  302. }
  303. //TODO xh: MetaCode不为零,代表插入出错,需输出错误
  304. }
  305. }()
  306. wg.Wait()
  307. rabbit1.Destroy()
  308. rabbit2.Destroy()
  309. //
  310. }
  311. func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) {
  312. //根据ecName获得以下参数
  313. wg := sync.WaitGroup{}
  314. ecPolicies := *utils.GetEcPolicy()
  315. ecPolicy := ecPolicies[ecName]
  316. fmt.Println(ecPolicy)
  317. ecK := ecPolicy.GetK()
  318. ecN := ecPolicy.GetN()
  319. var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN
  320. numPacket := (fileSizeInBytes + int64(ecK)*packetSizeInBytes - 1) / (int64(ecK) * packetSizeInBytes)
  321. fmt.Println(numPacket)
  322. //创建channel
  323. getBufs := make([]chan []byte, ecN)
  324. decodeBufs := make([]chan []byte, ecK)
  325. for i := 0; i < ecN; i++ {
  326. getBufs[i] = make(chan []byte)
  327. }
  328. for i := 0; i < ecK; i++ {
  329. decodeBufs[i] = make(chan []byte)
  330. }
  331. //从协调端获取有哪些编码块
  332. //var blockSeq = []int{0,1}
  333. blockSeq := blockIds
  334. wg.Add(1)
  335. for i := 0; i < len(blockSeq); i++ {
  336. go get(blockHashs[i], ips[i], getBufs[blockSeq[i]], numPacket)
  337. }
  338. go decode(getBufs[:], decodeBufs[:], blockSeq, ecK, coefs, numPacket)
  339. go persist(decodeBufs[:], numPacket, localFilePath, &wg)
  340. wg.Wait()
  341. }
  342. func EcWrite(localFilePath string, bucketName string, objectName string, ecName string) {
  343. fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName)
  344. //获取文件大小
  345. fileInfo, _ := os.Stat(localFilePath)
  346. fileSizeInBytes := fileInfo.Size()
  347. fmt.Println(fileSizeInBytes)
  348. //调用纠删码库,获取编码参数及生成矩阵
  349. ecPolicies := *utils.GetEcPolicy()
  350. ecPolicy := ecPolicies[ecName]
  351. ipss := utils.GetAgentIps()
  352. fmt.Println(ipss)
  353. print("@!@!@!@!@!@!")
  354. //var policy utils.EcConfig
  355. //policy = ecPolicy[0]
  356. ecK := ecPolicy.GetK()
  357. ecN := ecPolicy.GetN()
  358. //const ecK int = ecPolicy.GetK()
  359. //const ecN int = ecPolicy.GetN()
  360. var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN
  361. //计算每个块的packet数
  362. numPacket := (fileSizeInBytes + int64(ecK)*packetSizeInBytes - 1) / (int64(ecK) * packetSizeInBytes)
  363. fmt.Println(numPacket)
  364. //发送写请求,分配写入节点
  365. userId := 0
  366. //发送写请求,分配写入节点Ip
  367. command1 := rabbitmq.EcWriteCommand{
  368. BucketName: bucketName,
  369. ObjectName: objectName,
  370. FileSizeInBytes: fileSizeInBytes,
  371. EcName: ecName,
  372. UserId: userId,
  373. } //
  374. c1, _ := json.Marshal(command1)
  375. b1 := append([]byte("00"), c1...) //
  376. fmt.Println(b1)
  377. rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue")
  378. rabbit1.PublishSimple(b1)
  379. //接收消息,赋值给ips
  380. var res1 rabbitmq.WriteRes
  381. var ips []string
  382. queueName := "coorClientQueue" + strconv.Itoa(userId)
  383. rabbit2 := rabbitmq.NewRabbitMQSimple(queueName)
  384. msgs := rabbit2.ConsumeSimple(time.Millisecond, true)
  385. wg := sync.WaitGroup{}
  386. wg.Add(1)
  387. go func() {
  388. for d := range msgs {
  389. _ = json.Unmarshal(d.Body, &res1)
  390. ips = res1.Ips
  391. wg.Done()
  392. }
  393. }()
  394. wg.Wait()
  395. //创建channel
  396. loadBufs := make([]chan []byte, ecN)
  397. encodeBufs := make([]chan []byte, ecN)
  398. for i := 0; i < ecN; i++ {
  399. loadBufs[i] = make(chan []byte)
  400. }
  401. for i := 0; i < ecN; i++ {
  402. encodeBufs[i] = make(chan []byte)
  403. }
  404. blockNames := make([]string, ecN)
  405. for i := 0; i < ecN; i++ {
  406. blockNames[i] = (bucketName + "_" + objectName + "_" + strconv.Itoa(i))
  407. print(blockNames[i])
  408. print("miemiemie")
  409. }
  410. hashs := make([]string, ecN)
  411. //正式开始写入
  412. go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据
  413. go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket)
  414. wg.Add(ecN)
  415. for i := 0; i < ecN; i++ {
  416. go send(blockNames[i], ips[i], encodeBufs[i], numPacket, &wg, hashs, i)
  417. }
  418. wg.Wait()
  419. //fmt.Println(hashs)
  420. //第二轮通讯:插入元数据hashs
  421. command2 := rabbitmq.WriteHashCommand{
  422. BucketName: bucketName,
  423. ObjectName: objectName,
  424. Hashs: hashs,
  425. Ips: ips,
  426. UserId: userId,
  427. }
  428. c1, _ = json.Marshal(command2)
  429. b1 = append([]byte("01"), c1...)
  430. rabbit1.PublishSimple(b1)
  431. //接受第二轮通讯结果
  432. var res2 rabbitmq.WriteHashRes
  433. msgs = rabbit2.ConsumeSimple(time.Millisecond, true)
  434. wg.Add(1)
  435. go func() {
  436. for d := range msgs {
  437. _ = json.Unmarshal(d.Body, &res2)
  438. if res2.MetaCode == 0 {
  439. wg.Done()
  440. }
  441. }
  442. }()
  443. wg.Wait()
  444. rabbit1.Destroy()
  445. rabbit2.Destroy()
  446. //
  447. }
  448. func repMove(ip string, hash string) {
  449. //TO DO: 通过消息队列发送调度命令
  450. }
  451. func ecMove(ip string, hashs []string, ids []int, ecName string) {
  452. //TO DO: 通过消息队列发送调度命令
  453. }
  454. func loadDistribute(localFilePath string, loadDistributeBufs []chan []byte, numWholePacket int64, lastPacketInBytes int64) {
  455. fmt.Println("loadDistribute " + localFilePath)
  456. file, _ := os.Open(localFilePath)
  457. for i := 0; int64(i) < numWholePacket; i++ {
  458. buf := make([]byte, packetSizeInBytes)
  459. _, err := file.Read(buf)
  460. if err != nil && err != io.EOF {
  461. break
  462. }
  463. for j := 0; j < len(loadDistributeBufs); j++ {
  464. loadDistributeBufs[j] <- buf
  465. }
  466. }
  467. if lastPacketInBytes > 0 {
  468. buf := make([]byte, lastPacketInBytes)
  469. file.Read(buf)
  470. for j := 0; j < len(loadDistributeBufs); j++ {
  471. loadDistributeBufs[j] <- buf
  472. }
  473. }
  474. fmt.Println("load over")
  475. for i := 0; i < len(loadDistributeBufs); i++ {
  476. close(loadDistributeBufs[i])
  477. }
  478. file.Close()
  479. }
  480. func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSizeInBytes int64) {
  481. fmt.Println("load " + localFilePath)
  482. file, _ := os.Open(localFilePath)
  483. for i := 0; int64(i) < totalNumPacket; i++ {
  484. print(totalNumPacket)
  485. buf := make([]byte, packetSizeInBytes)
  486. idx := i % ecK
  487. print(len(loadBufs))
  488. _, err := file.Read(buf)
  489. loadBufs[idx] <- buf
  490. if idx == ecK-1 {
  491. print("***")
  492. for j := ecK; j < len(loadBufs); j++ {
  493. print(j)
  494. zeroPkt := make([]byte, packetSizeInBytes)
  495. fmt.Printf("%v", zeroPkt)
  496. loadBufs[j] <- zeroPkt
  497. }
  498. }
  499. if err != nil && err != io.EOF {
  500. break
  501. }
  502. }
  503. fmt.Println("load over")
  504. for i := 0; i < len(loadBufs); i++ {
  505. print(i)
  506. close(loadBufs[i])
  507. }
  508. file.Close()
  509. }
  510. func encode(inBufs []chan []byte, outBufs []chan []byte, ecK int, coefs [][]int64, numPacket int64) {
  511. fmt.Println("encode ")
  512. var tmpIn [][]byte
  513. tmpIn = make([][]byte, len(outBufs))
  514. enc := ec.NewRsEnc(ecK, len(outBufs))
  515. for i := 0; int64(i) < numPacket; i++ {
  516. for j := 0; j < len(outBufs); j++ { //3
  517. tmpIn[j] = <-inBufs[j]
  518. //print(i)
  519. //fmt.Printf("%v",tmpIn[j])
  520. //print("@#$")
  521. }
  522. enc.Encode(tmpIn)
  523. fmt.Printf("%v", tmpIn)
  524. print("$$$$$$$$$$$$$$$$$$")
  525. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  526. outBufs[j] <- tmpIn[j]
  527. }
  528. }
  529. fmt.Println("encode over")
  530. for i := 0; i < len(outBufs); i++ {
  531. close(outBufs[i])
  532. }
  533. }
  534. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, coefs [][]int64, numPacket int64) {
  535. fmt.Println("decode ")
  536. var tmpIn [][]byte
  537. var zeroPkt []byte
  538. tmpIn = make([][]byte, len(inBufs))
  539. hasBlock := map[int]bool{}
  540. for j := 0; j < len(blockSeq); j++ {
  541. hasBlock[blockSeq[j]] = true
  542. }
  543. needRepair := false //检测是否传入了所有数据块
  544. for j := 0; j < len(outBufs); j++ {
  545. if blockSeq[j] != j {
  546. needRepair = true
  547. }
  548. }
  549. enc := ec.NewRsEnc(ecK, len(inBufs))
  550. for i := 0; int64(i) < numPacket; i++ {
  551. for j := 0; j < len(inBufs); j++ { //3
  552. if hasBlock[j] {
  553. tmpIn[j] = <-inBufs[j]
  554. } else {
  555. tmpIn[j] = zeroPkt
  556. }
  557. }
  558. fmt.Printf("%v", tmpIn)
  559. if needRepair {
  560. err := enc.Repair(tmpIn)
  561. print("&&&&&")
  562. if err != nil {
  563. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  564. }
  565. }
  566. //fmt.Printf("%v",tmpIn)
  567. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  568. outBufs[j] <- tmpIn[j]
  569. }
  570. }
  571. fmt.Println("decode over")
  572. for i := 0; i < len(outBufs); i++ {
  573. close(outBufs[i])
  574. }
  575. }
  576. func send(blockName string, ip string, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int) {
  577. fmt.Println("send " + blockName)
  578. /*
  579. TO DO ss: 判断本地有没有ipfs daemon、能否与目标agent的ipfs daemon连通、本地ipfs目录空间是否充足
  580. 如果本地有ipfs daemon、能与目标agent的ipfs daemon连通、本地ipfs目录空间充足,将所有内容写入本地ipfs目录,得到对象的cid,发送cid给目标agent让其pin相应的对象
  581. 否则,像目前一样,使用grpc向指定节点获取
  582. */
  583. //rpc相关
  584. conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
  585. if err != nil {
  586. panic(err)
  587. }
  588. client := agentcaller.NewTranBlockOrReplicaClient(conn)
  589. stream, err := client.SendBlockOrReplica(context.Background())
  590. if err != nil {
  591. panic(err)
  592. }
  593. for i := 0; int64(i) < numPacket; i++ {
  594. buf := <-inBuf
  595. fmt.Println(buf)
  596. err := stream.Send(&agentcaller.BlockOrReplica{
  597. BlockOrReplicaName: blockName,
  598. BlockOrReplicaHash: blockName,
  599. BlockOrReplicaData: buf,
  600. })
  601. if err != nil && err != io.EOF {
  602. panic(err)
  603. }
  604. }
  605. res, err := stream.CloseAndRecv()
  606. fmt.Println(res)
  607. hashs[idx] = res.BlockOrReplicaHash
  608. conn.Close()
  609. wg.Done()
  610. return
  611. }
  612. func get(blockHash string, ip string, getBuf chan []byte, numPacket int64) {
  613. //rpc相关
  614. print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
  615. conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
  616. if err != nil {
  617. panic(err)
  618. }
  619. /*
  620. TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid
  621. 如果本地有ipfs daemon且能获取相应编码块的cid,则获取编码块cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock
  622. 否则,像目前一样,使用grpc向指定节点获取
  623. */
  624. client := agentcaller.NewTranBlockOrReplicaClient(conn)
  625. //rpc get
  626. stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{
  627. BlockOrReplicaHash: blockHash,
  628. })
  629. fmt.Println(numPacket)
  630. for i := 0; int64(i) < numPacket; i++ {
  631. fmt.Println(i)
  632. res, _ := stream.Recv()
  633. fmt.Println(res.BlockOrReplicaData)
  634. getBuf <- res.BlockOrReplicaData
  635. }
  636. close(getBuf)
  637. conn.Close()
  638. }
  639. func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
  640. fDir, err := os.Executable()
  641. if err != nil {
  642. panic(err)
  643. }
  644. fURL := filepath.Join(filepath.Dir(fDir), "assets")
  645. _, err = os.Stat(fURL)
  646. if os.IsNotExist(err) {
  647. os.MkdirAll(fURL, os.ModePerm)
  648. }
  649. file, err := os.Create(filepath.Join(fURL, localFilePath))
  650. if err != nil {
  651. return
  652. }
  653. for i := 0; int64(i) < numPacket; i++ {
  654. for j := 0; j < len(inBuf); j++ {
  655. tmp := <-inBuf[j]
  656. fmt.Println(tmp)
  657. file.Write(tmp)
  658. }
  659. }
  660. file.Close()
  661. wg.Done()
  662. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。

Contributors (1)