You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 6.6 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package cmd
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "gitlink.org.cn/cloudream/agent/internal/config"
  9. "gitlink.org.cn/cloudream/common/consts"
  10. "gitlink.org.cn/cloudream/ec"
  11. "gitlink.org.cn/cloudream/utils"
  12. log "gitlink.org.cn/cloudream/utils/logger"
  13. "gitlink.org.cn/cloudream/utils/serder"
  14. "gitlink.org.cn/cloudream/common/consts/errorcode"
  15. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  16. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  17. )
  18. func (service *Service) MoveObjectToStorage(msg *agtmsg.MoveObjectToStorage) *agtmsg.MoveObjectToStorageResp {
  19. outFileName := utils.MakeMoveOperationFileName(msg.Body.ObjectID, msg.Body.UserID)
  20. outFileDir := filepath.Join(config.Cfg().StorageBaseDir, msg.Body.Directory)
  21. outFilePath := filepath.Join(outFileDir, outFileName)
  22. err := os.MkdirAll(outFileDir, 0644)
  23. if err != nil {
  24. log.Warnf("create file directory %s failed, err: %s", outFileDir, err.Error())
  25. return ramsg.ReplyFailed[agtmsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "create local file directory failed")
  26. }
  27. outFile, err := os.Create(outFilePath)
  28. if err != nil {
  29. log.Warnf("create file %s failed, err: %s", outFilePath, err.Error())
  30. return ramsg.ReplyFailed[agtmsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "create local file failed")
  31. }
  32. defer outFile.Close()
  33. if msg.Body.Redundancy == consts.REDUNDANCY_REP {
  34. err = service.moveRepObject(msg, outFile)
  35. if err != nil {
  36. log.Warnf("move rep object failed, err: %s", outFilePath, err.Error())
  37. return ramsg.ReplyFailed[agtmsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "move rep object failed")
  38. }
  39. } else {
  40. return ramsg.ReplyFailed[agtmsg.MoveObjectToStorageResp](errorcode.OPERATION_FAILED, "not implement yet!")
  41. }
  42. /*
  43. //向coor报告临时缓存hash
  44. coorClient, err := coorcli.NewCoordinatorClient(&config.Cfg().RabbitMQ)
  45. if err != nil {
  46. return fmt.Errorf("new coordinator client failed, err: %s", err)
  47. }
  48. defer coorClient.Close()
  49. // TODO 这里更新失败残留下的文件是否要删除?
  50. // TODO 参考数据修复功能需求里描述的流程进行上报
  51. coorClient.TempCacheReport(coormsg.NewTempCacheReportBody(config.Cfg().ID, hashs))
  52. */
  53. return ramsg.ReplyOK(agtmsg.NewMoveObjectToStorageRespBody())
  54. }
  55. func (svc *Service) moveRepObject(msg *agtmsg.MoveObjectToStorage, outFile io.WriteCloser) error {
  56. var repInfo ramsg.ObjectRepInfo
  57. err := serder.MapToObject(msg.Body.RedundancyData.(map[string]any), &repInfo)
  58. if err != nil {
  59. return fmt.Errorf("redundancy data to rep info failed, err: %w", err)
  60. }
  61. ipfsRd, err := svc.ipfs.OpenRead(repInfo.FileHash)
  62. if err != nil {
  63. return fmt.Errorf("read ipfs file failed, err: %w", err)
  64. }
  65. defer ipfsRd.Close()
  66. _, err = io.Copy(outFile, ipfsRd)
  67. if err != nil {
  68. return fmt.Errorf("copy ipfs file data to local file failed, err: %s", err)
  69. }
  70. return nil
  71. }
  72. /*
  73. func (service *Service) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.MoveObjectToStorageResp {
  74. panic("not implement yet!")
  75. wg := sync.WaitGroup{}
  76. fmt.Println("EcMove")
  77. fmt.Println(msg.Hashs)
  78. hashs := msg.Hashs
  79. fileSize := msg.FileSize
  80. blockIds := msg.IDs
  81. ecName := msg.ECName
  82. goalName := msg.BucketName + ":" + msg.ObjectName + ":" + strconv.Itoa(msg.UserID)
  83. ecPolicies := *utils.GetEcPolicy()
  84. ecPolicy := ecPolicies[ecName]
  85. ecK := ecPolicy.GetK()
  86. ecN := ecPolicy.GetN()
  87. numPacket := (fileSize + int64(ecK)*int64(config.Cfg().GRCPPacketSize) - 1) / (int64(ecK) * int64(config.Cfg().GRCPPacketSize))
  88. getBufs := make([]chan []byte, ecN)
  89. decodeBufs := make([]chan []byte, ecK)
  90. for i := 0; i < ecN; i++ {
  91. getBufs[i] = make(chan []byte)
  92. }
  93. for i := 0; i < ecK; i++ {
  94. decodeBufs[i] = make(chan []byte)
  95. }
  96. wg.Add(1)
  97. //执行调度操作
  98. // TODO 这一块需要改写以适配IPFS流式读取
  99. for i := 0; i < len(blockIds); i++ {
  100. go service.get(hashs[i], getBufs[blockIds[i]], numPacket)
  101. }
  102. go decode(getBufs[:], decodeBufs[:], blockIds, ecK, numPacket)
  103. // TODO 写入的文件路径需要带上msg中的Directory字段,参考RepMove
  104. go persist(decodeBufs[:], numPacket, goalName, &wg)
  105. wg.Wait()
  106. //向coor报告临时缓存hash
  107. coorClient, err := racli.NewCoordinatorClient()
  108. if err != nil {
  109. // TODO 日志
  110. return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed"))
  111. }
  112. defer coorClient.Close()
  113. coorClient.TempCacheReport(NodeID, hashs)
  114. return ramsg.NewAgentMoveRespOK()
  115. }
  116. */
  117. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
  118. fmt.Println("decode ")
  119. var tmpIn [][]byte
  120. var zeroPkt []byte
  121. tmpIn = make([][]byte, len(inBufs))
  122. hasBlock := map[int]bool{}
  123. for j := 0; j < len(blockSeq); j++ {
  124. hasBlock[blockSeq[j]] = true
  125. }
  126. needRepair := false //检测是否传入了所有数据块
  127. for j := 0; j < len(outBufs); j++ {
  128. if blockSeq[j] != j {
  129. needRepair = true
  130. }
  131. }
  132. enc := ec.NewRsEnc(ecK, len(inBufs))
  133. for i := 0; int64(i) < numPacket; i++ {
  134. for j := 0; j < len(inBufs); j++ { //3
  135. if hasBlock[j] {
  136. tmpIn[j] = <-inBufs[j]
  137. } else {
  138. tmpIn[j] = zeroPkt
  139. }
  140. }
  141. fmt.Printf("%v", tmpIn)
  142. if needRepair {
  143. err := enc.Repair(tmpIn)
  144. print("&&&&&")
  145. if err != nil {
  146. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  147. }
  148. }
  149. //fmt.Printf("%v",tmpIn)
  150. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  151. outBufs[j] <- tmpIn[j]
  152. }
  153. }
  154. fmt.Println("decode over")
  155. for i := 0; i < len(outBufs); i++ {
  156. close(outBufs[i])
  157. }
  158. }
  159. func (service *Service) get(blockHash string, getBuf chan []byte, numPacket int64) {
  160. /*
  161. data := CatIPFS(blockHash)
  162. for i := 0; int64(i) < numPacket; i++ {
  163. buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize])
  164. getBuf <- buf
  165. }
  166. close(getBuf)
  167. */
  168. }
  169. func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
  170. //这里的localFilePath应该是要写入的filename
  171. fDir, err := os.Executable()
  172. if err != nil {
  173. panic(err)
  174. }
  175. fURL := filepath.Join(filepath.Dir(fDir), "assets3")
  176. _, err = os.Stat(fURL)
  177. if os.IsNotExist(err) {
  178. os.MkdirAll(fURL, os.ModePerm)
  179. }
  180. file, err := os.Create(filepath.Join(fURL, localFilePath))
  181. if err != nil {
  182. return
  183. }
  184. for i := 0; int64(i) < numPacket; i++ {
  185. for j := 0; j < len(inBuf); j++ {
  186. tmp := <-inBuf[j]
  187. fmt.Println(tmp)
  188. file.Write(tmp)
  189. }
  190. }
  191. file.Close()
  192. wg.Done()
  193. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。