You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

command_service.go 7.3 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strconv"
  7. "sync"
  8. "gitlink.org.cn/cloudream/agent/config"
  9. "gitlink.org.cn/cloudream/ec"
  10. "gitlink.org.cn/cloudream/utils"
  11. racli "gitlink.org.cn/cloudream/rabbitmq/client"
  12. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  13. "gitlink.org.cn/cloudream/utils/consts/errorcode"
  14. )
  15. type CommandService struct {
  16. }
  17. func (service *CommandService) RepMove(msg *ramsg.RepMoveCommand) ramsg.AgentMoveResp {
  18. /*fmt.Println("RepMove")
  19. fmt.Println(command.Hashs)
  20. hashs := command.Hashs
  21. //执行调度操作
  22. ipfsDir := "assets"
  23. goalDir := "assets2"
  24. goalName := command.BucketName + ":" + command.ObjectName + ":" + strconv.Itoa(command.UserId)
  25. //目标文件
  26. fDir, err := os.Executable()
  27. if err != nil {
  28. panic(err)
  29. }
  30. fURL := filepath.Join(filepath.Dir(fDir), goalDir)
  31. _, err = os.Stat(fURL)
  32. if os.IsNotExist(err) {
  33. os.MkdirAll(fURL, os.ModePerm)
  34. }
  35. fURL = filepath.Join(fURL, goalName)
  36. outFile, err := os.Create(fURL)
  37. fmt.Println(fURL)
  38. //源文件
  39. fURL = filepath.Join(filepath.Dir(fDir), ipfsDir)
  40. fURL = filepath.Join(fURL, hashs[0])
  41. inFile, _ := os.Open(fURL)
  42. fmt.Println(fURL)
  43. fileInfo, _ := inFile.Stat()
  44. fileSizeInBytes := fileInfo.Size()
  45. numWholePacket := fileSizeInBytes / config.Cfg().GRCPPacketSize
  46. lastPacketInBytes := fileSizeInBytes % config.Cfg().GRCPPacketSize
  47. fmt.Println(fileSizeInBytes)
  48. fmt.Println(numWholePacket)
  49. fmt.Println(lastPacketInBytes)
  50. for i := 0; int64(i) < numWholePacket; i++ {
  51. buf := make([]byte, config.Cfg().GRCPPacketSize)
  52. inFile.Read(buf)
  53. outFile.Write(buf)
  54. }
  55. if lastPacketInBytes > 0 {
  56. buf := make([]byte, lastPacketInBytes)
  57. inFile.Read(buf)
  58. outFile.Write(buf)
  59. }
  60. inFile.Close()
  61. outFile.Close()
  62. //返回消息
  63. res := rabbitmq.AgentMoveRes{
  64. MoveCode: 0,
  65. }
  66. c, _ := json.Marshal(res)
  67. rabbitSend(c, command.UserId)
  68. //向coor报告临时缓存hash
  69. command1 := rabbitmq.TempCacheReport{
  70. Ip: LocalIp,
  71. Hashs: hashs,
  72. }
  73. c, _ = json.Marshal(command1)
  74. b := append([]byte("06"), c...)
  75. fmt.Println(b)
  76. rabbit := rabbitmq.NewRabbitMQSimple("coorQueue")
  77. rabbit.PublishSimple(b)
  78. rabbit.Destroy()*/
  79. fmt.Println("RepMove")
  80. fmt.Println(msg.Hashs)
  81. hashs := msg.Hashs
  82. fileSizeInBytes := msg.FileSizeInBytes
  83. //执行调度操作
  84. //TODO xh: 调度到BackID对应的dir中,即goalDir改为传过来的agentMoveResp.dir
  85. goalDir := "assets2"
  86. goalName := msg.BucketName + ":" + msg.ObjectName + ":" + strconv.Itoa(msg.UserID)
  87. //目标文件
  88. fDir, err := os.Executable()
  89. if err != nil {
  90. panic(err)
  91. }
  92. fURL := filepath.Join(filepath.Dir(fDir), goalDir)
  93. _, err = os.Stat(fURL)
  94. if os.IsNotExist(err) {
  95. os.MkdirAll(fURL, os.ModePerm)
  96. }
  97. fURL = filepath.Join(fURL, goalName)
  98. outFile, err := os.Create(fURL)
  99. fmt.Println(fURL)
  100. //源文件
  101. data := CatIPFS(hashs[0])
  102. numWholePacket := fileSizeInBytes / int64(config.Cfg().GRCPPacketSize)
  103. lastPacketInBytes := fileSizeInBytes % int64(config.Cfg().GRCPPacketSize)
  104. fmt.Println(fileSizeInBytes)
  105. fmt.Println(numWholePacket)
  106. fmt.Println(lastPacketInBytes)
  107. for i := 0; int64(i) < numWholePacket; i++ {
  108. buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize])
  109. outFile.Write(buf)
  110. }
  111. if lastPacketInBytes > 0 {
  112. buf := []byte(data[numWholePacket*int64(config.Cfg().GRCPPacketSize) : numWholePacket*int64(config.Cfg().GRCPPacketSize)+lastPacketInBytes])
  113. outFile.Write(buf)
  114. }
  115. outFile.Close()
  116. //向coor报告临时缓存hash
  117. coorClient, err := racli.NewCoordinatorClient()
  118. if err != nil {
  119. // TODO 日志
  120. return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed"))
  121. }
  122. defer coorClient.Close()
  123. coorClient.TempCacheReport(config.Cfg().LocalIP, hashs)
  124. return ramsg.NewAgentMoveRespOK()
  125. }
  126. func (service *CommandService) ECMove(msg *ramsg.ECMoveCommand) ramsg.AgentMoveResp {
  127. wg := sync.WaitGroup{}
  128. fmt.Println("EcMove")
  129. fmt.Println(msg.Hashs)
  130. hashs := msg.Hashs
  131. fileSizeInBytes := msg.FileSizeInBytes
  132. blockIds := msg.IDs
  133. ecName := msg.ECName
  134. goalName := msg.BucketName + ":" + msg.ObjectName + ":" + strconv.Itoa(msg.UserID)
  135. ecPolicies := *utils.GetEcPolicy()
  136. ecPolicy := ecPolicies[ecName]
  137. ecK := ecPolicy.GetK()
  138. ecN := ecPolicy.GetN()
  139. numPacket := (fileSizeInBytes + int64(ecK)*int64(config.Cfg().GRCPPacketSize) - 1) / (int64(ecK) * int64(config.Cfg().GRCPPacketSize))
  140. getBufs := make([]chan []byte, ecN)
  141. decodeBufs := make([]chan []byte, ecK)
  142. for i := 0; i < ecN; i++ {
  143. getBufs[i] = make(chan []byte)
  144. }
  145. for i := 0; i < ecK; i++ {
  146. decodeBufs[i] = make(chan []byte)
  147. }
  148. wg.Add(1)
  149. //执行调度操作
  150. for i := 0; i < len(blockIds); i++ {
  151. go get(hashs[i], getBufs[blockIds[i]], numPacket)
  152. }
  153. go decode(getBufs[:], decodeBufs[:], blockIds, ecK, numPacket)
  154. go persist(decodeBufs[:], numPacket, goalName, &wg)
  155. wg.Wait()
  156. //向coor报告临时缓存hash
  157. coorClient, err := racli.NewCoordinatorClient()
  158. if err != nil {
  159. // TODO 日志
  160. return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed"))
  161. }
  162. defer coorClient.Close()
  163. coorClient.TempCacheReport(config.Cfg().LocalIP, hashs)
  164. return ramsg.NewAgentMoveRespOK()
  165. }
  166. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
  167. fmt.Println("decode ")
  168. var tmpIn [][]byte
  169. var zeroPkt []byte
  170. tmpIn = make([][]byte, len(inBufs))
  171. hasBlock := map[int]bool{}
  172. for j := 0; j < len(blockSeq); j++ {
  173. hasBlock[blockSeq[j]] = true
  174. }
  175. needRepair := false //检测是否传入了所有数据块
  176. for j := 0; j < len(outBufs); j++ {
  177. if blockSeq[j] != j {
  178. needRepair = true
  179. }
  180. }
  181. enc := ec.NewRsEnc(ecK, len(inBufs))
  182. for i := 0; int64(i) < numPacket; i++ {
  183. for j := 0; j < len(inBufs); j++ { //3
  184. if hasBlock[j] {
  185. tmpIn[j] = <-inBufs[j]
  186. } else {
  187. tmpIn[j] = zeroPkt
  188. }
  189. }
  190. fmt.Printf("%v", tmpIn)
  191. if needRepair {
  192. err := enc.Repair(tmpIn)
  193. print("&&&&&")
  194. if err != nil {
  195. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  196. }
  197. }
  198. //fmt.Printf("%v",tmpIn)
  199. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  200. outBufs[j] <- tmpIn[j]
  201. }
  202. }
  203. fmt.Println("decode over")
  204. for i := 0; i < len(outBufs); i++ {
  205. close(outBufs[i])
  206. }
  207. }
  208. func get(blockHash string, getBuf chan []byte, numPacket int64) {
  209. data := CatIPFS(blockHash)
  210. for i := 0; int64(i) < numPacket; i++ {
  211. buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize])
  212. getBuf <- buf
  213. }
  214. close(getBuf)
  215. }
  216. func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
  217. //这里的localFilePath应该是要写入的filename
  218. fDir, err := os.Executable()
  219. if err != nil {
  220. panic(err)
  221. }
  222. fURL := filepath.Join(filepath.Dir(fDir), "assets3")
  223. _, err = os.Stat(fURL)
  224. if os.IsNotExist(err) {
  225. os.MkdirAll(fURL, os.ModePerm)
  226. }
  227. file, err := os.Create(filepath.Join(fURL, localFilePath))
  228. if err != nil {
  229. return
  230. }
  231. for i := 0; int64(i) < numPacket; i++ {
  232. for j := 0; j < len(inBuf); j++ {
  233. tmp := <-inBuf[j]
  234. fmt.Println(tmp)
  235. file.Write(tmp)
  236. }
  237. }
  238. file.Close()
  239. wg.Done()
  240. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。