You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

command_service_ec.go 3.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "gitlink.org.cn/cloudream/ec"
  8. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  9. )
  10. func (service *CommandService) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.AgentMoveResp {
  11. panic("not implement yet!")
  12. /*
  13. wg := sync.WaitGroup{}
  14. fmt.Println("EcMove")
  15. fmt.Println(msg.Hashs)
  16. hashs := msg.Hashs
  17. fileSizeInBytes := msg.FileSizeInBytes
  18. blockIds := msg.IDs
  19. ecName := msg.ECName
  20. goalName := msg.BucketName + ":" + msg.ObjectName + ":" + strconv.Itoa(msg.UserID)
  21. ecPolicies := *utils.GetEcPolicy()
  22. ecPolicy := ecPolicies[ecName]
  23. ecK := ecPolicy.GetK()
  24. ecN := ecPolicy.GetN()
  25. numPacket := (fileSizeInBytes + int64(ecK)*int64(config.Cfg().GRCPPacketSize) - 1) / (int64(ecK) * int64(config.Cfg().GRCPPacketSize))
  26. getBufs := make([]chan []byte, ecN)
  27. decodeBufs := make([]chan []byte, ecK)
  28. for i := 0; i < ecN; i++ {
  29. getBufs[i] = make(chan []byte)
  30. }
  31. for i := 0; i < ecK; i++ {
  32. decodeBufs[i] = make(chan []byte)
  33. }
  34. wg.Add(1)
  35. //执行调度操作
  36. // TODO 这一块需要改写以适配IPFS流式读取
  37. for i := 0; i < len(blockIds); i++ {
  38. go service.get(hashs[i], getBufs[blockIds[i]], numPacket)
  39. }
  40. go decode(getBufs[:], decodeBufs[:], blockIds, ecK, numPacket)
  41. // TODO 写入的文件路径需要带上msg中的Directory字段,参考RepMove
  42. go persist(decodeBufs[:], numPacket, goalName, &wg)
  43. wg.Wait()
  44. //向coor报告临时缓存hash
  45. coorClient, err := racli.NewCoordinatorClient()
  46. if err != nil {
  47. // TODO 日志
  48. return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed"))
  49. }
  50. defer coorClient.Close()
  51. coorClient.TempCacheReport(NodeID, hashs)
  52. return ramsg.NewAgentMoveRespOK()
  53. */
  54. }
  55. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
  56. fmt.Println("decode ")
  57. var tmpIn [][]byte
  58. var zeroPkt []byte
  59. tmpIn = make([][]byte, len(inBufs))
  60. hasBlock := map[int]bool{}
  61. for j := 0; j < len(blockSeq); j++ {
  62. hasBlock[blockSeq[j]] = true
  63. }
  64. needRepair := false //检测是否传入了所有数据块
  65. for j := 0; j < len(outBufs); j++ {
  66. if blockSeq[j] != j {
  67. needRepair = true
  68. }
  69. }
  70. enc := ec.NewRsEnc(ecK, len(inBufs))
  71. for i := 0; int64(i) < numPacket; i++ {
  72. for j := 0; j < len(inBufs); j++ { //3
  73. if hasBlock[j] {
  74. tmpIn[j] = <-inBufs[j]
  75. } else {
  76. tmpIn[j] = zeroPkt
  77. }
  78. }
  79. fmt.Printf("%v", tmpIn)
  80. if needRepair {
  81. err := enc.Repair(tmpIn)
  82. print("&&&&&")
  83. if err != nil {
  84. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  85. }
  86. }
  87. //fmt.Printf("%v",tmpIn)
  88. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  89. outBufs[j] <- tmpIn[j]
  90. }
  91. }
  92. fmt.Println("decode over")
  93. for i := 0; i < len(outBufs); i++ {
  94. close(outBufs[i])
  95. }
  96. }
  97. func (service *CommandService) get(blockHash string, getBuf chan []byte, numPacket int64) {
  98. /*
  99. data := CatIPFS(blockHash)
  100. for i := 0; int64(i) < numPacket; i++ {
  101. buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize])
  102. getBuf <- buf
  103. }
  104. close(getBuf)
  105. */
  106. }
  107. func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
  108. //这里的localFilePath应该是要写入的filename
  109. fDir, err := os.Executable()
  110. if err != nil {
  111. panic(err)
  112. }
  113. fURL := filepath.Join(filepath.Dir(fDir), "assets3")
  114. _, err = os.Stat(fURL)
  115. if os.IsNotExist(err) {
  116. os.MkdirAll(fURL, os.ModePerm)
  117. }
  118. file, err := os.Create(filepath.Join(fURL, localFilePath))
  119. if err != nil {
  120. return
  121. }
  122. for i := 0; int64(i) < numPacket; i++ {
  123. for j := 0; j < len(inBuf); j++ {
  124. tmp := <-inBuf[j]
  125. fmt.Println(tmp)
  126. file.Write(tmp)
  127. }
  128. }
  129. file.Close()
  130. wg.Done()
  131. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。