You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 11 kB


  1. package cmd
  2. import (
  3. "fmt"
  4. "io/fs"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "time"
  10. "github.com/samber/lo"
  11. "gitlink.org.cn/cloudream/agent/internal/config"
  12. "gitlink.org.cn/cloudream/agent/internal/task"
  13. "gitlink.org.cn/cloudream/common/consts"
  14. "gitlink.org.cn/cloudream/common/models"
  15. "gitlink.org.cn/cloudream/common/pkg/logger"
  16. "gitlink.org.cn/cloudream/common/utils"
  17. "gitlink.org.cn/cloudream/ec"
  18. "gitlink.org.cn/cloudream/common/consts/errorcode"
  19. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  20. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  21. )
  22. func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObject) (*agtmsg.StartStorageMoveObjectResp, *ramsg.CodeMessage) {
  23. // TODO 修改文件名,可用objectname
  24. outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID)
  25. outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory, outFileName)
  26. if repRed, ok := msg.Redundancy.(models.RepRedundancyData); ok {
  27. taskID, err := service.moveRepObject(repRed, outFilePath)
  28. if err != nil {
  29. logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error())
  30. return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move rep object failed")
  31. }
  32. return ramsg.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID))
  33. } else {
  34. // TODO 处理其他备份类型
  35. return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "not implement yet!")
  36. }
  37. }
  38. func (svc *Service) moveRepObject(repData models.RepRedundancyData, outFilePath string) (string, error) {
  39. tsk := svc.taskManager.StartComparable(task.NewIPFSRead(repData.FileHash, outFilePath))
  40. return tsk.ID(), nil
  41. }
  42. func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*agtmsg.WaitStorageMoveObjectResp, *ramsg.CodeMessage) {
  43. logger.WithField("TaskID", msg.TaskID).Debugf("wait moving object")
  44. tsk := svc.taskManager.FindByID(msg.TaskID)
  45. if tsk == nil {
  46. return ramsg.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TaskNotFound, "task not found")
  47. }
  48. if msg.WaitTimeoutMs == 0 {
  49. tsk.Wait()
  50. errMsg := ""
  51. if tsk.Error() != nil {
  52. errMsg = tsk.Error().Error()
  53. }
  54. return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg))
  55. } else {
  56. if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
  57. errMsg := ""
  58. if tsk.Error() != nil {
  59. errMsg = tsk.Error().Error()
  60. }
  61. return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg))
  62. }
  63. return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(false, ""))
  64. }
  65. }
  66. func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) {
  67. dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory)
  68. infos, err := ioutil.ReadDir(dirFullPath)
  69. if err != nil {
  70. logger.Warnf("list storage directory failed, err: %s", err.Error())
  71. return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(
  72. err.Error(),
  73. nil,
  74. ))
  75. }
  76. fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() })
  77. if msg.IsComplete {
  78. return svc.checkStorageComplete(msg, fileInfos)
  79. } else {
  80. return svc.checkStorageIncrement(msg, fileInfos)
  81. }
  82. }
  83. func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) {
  84. infosMap := make(map[string]fs.FileInfo)
  85. for _, info := range fileInfos {
  86. infosMap[info.Name()] = info
  87. }
  88. var entries []agtmsg.StorageCheckRespEntry
  89. for _, obj := range msg.Objects {
  90. fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
  91. _, ok := infosMap[fileName]
  92. if ok {
  93. // 不需要做处理
  94. // 删除map中的记录,表示此记录已被检查过
  95. delete(infosMap, fileName)
  96. } else {
  97. // 只要文件不存在,就删除StorageObject表中的记录
  98. entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
  99. }
  100. }
  101. // 增量情况下,不需要对infosMap中没检查的记录进行处理
  102. return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
  103. }
  104. func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) {
  105. infosMap := make(map[string]fs.FileInfo)
  106. for _, info := range fileInfos {
  107. infosMap[info.Name()] = info
  108. }
  109. var entries []agtmsg.StorageCheckRespEntry
  110. for _, obj := range msg.Objects {
  111. fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
  112. _, ok := infosMap[fileName]
  113. if ok {
  114. // 不需要做处理
  115. // 删除map中的记录,表示此记录已被检查过
  116. delete(infosMap, fileName)
  117. } else {
  118. // 只要文件不存在,就删除StorageObject表中的记录
  119. entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
  120. }
  121. }
  122. // Storage中多出来的文件不做处理
  123. return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
  124. }
  125. /*
  126. func (service *Service) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.StartStorageMoveObjectResp {
  127. panic("not implement yet!")
  128. wg := sync.WaitGroup{}
  129. fmt.Println("EcMove")
  130. fmt.Println(msg.Hashs)
  131. hashs := msg.Hashs
  132. fileSize := msg.FileSize
  133. blockIds := msg.IDs
  134. ecName := msg.ECName
  135. goalName := msg.BucketName + ":" + msg.ObjectName + ":" + strconv.Itoa(msg.UserID)
  136. ecPolicies := *utils.GetEcPolicy()
  137. ecPolicy := ecPolicies[ecName]
  138. ecK := ecPolicy.GetK()
  139. ecN := ecPolicy.GetN()
  140. numPacket := (fileSize + int64(ecK)*int64(config.Cfg().GRCPPacketSize) - 1) / (int64(ecK) * int64(config.Cfg().GRCPPacketSize))
  141. getBufs := make([]chan []byte, ecN)
  142. decodeBufs := make([]chan []byte, ecK)
  143. for i := 0; i < ecN; i++ {
  144. getBufs[i] = make(chan []byte)
  145. }
  146. for i := 0; i < ecK; i++ {
  147. decodeBufs[i] = make(chan []byte)
  148. }
  149. wg.Add(1)
  150. //执行调度操作
  151. // TODO 这一块需要改写以适配IPFS流式读取
  152. for i := 0; i < len(blockIds); i++ {
  153. go service.get(hashs[i], getBufs[blockIds[i]], numPacket)
  154. }
  155. go decode(getBufs[:], decodeBufs[:], blockIds, ecK, numPacket)
  156. // TODO 写入的文件路径需要带上msg中的Directory字段,参考RepMove
  157. go persist(decodeBufs[:], numPacket, goalName, &wg)
  158. wg.Wait()
  159. //向coor报告临时缓存hash
  160. coorClient, err := racli.NewCoordinatorClient()
  161. if err != nil {
  162. // TODO 日志
  163. return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed"))
  164. }
  165. defer coorClient.Close()
  166. coorClient.TempCacheReport(NodeID, hashs)
  167. return ramsg.NewAgentMoveRespOK()
  168. }
  169. */
  170. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
  171. fmt.Println("decode ")
  172. var tmpIn [][]byte
  173. var zeroPkt []byte
  174. tmpIn = make([][]byte, len(inBufs))
  175. hasBlock := map[int]bool{}
  176. for j := 0; j < len(blockSeq); j++ {
  177. hasBlock[blockSeq[j]] = true
  178. }
  179. needRepair := false //检测是否传入了所有数据块
  180. for j := 0; j < len(outBufs); j++ {
  181. if blockSeq[j] != j {
  182. needRepair = true
  183. }
  184. }
  185. enc := ec.NewRsEnc(ecK, len(inBufs))
  186. for i := 0; int64(i) < numPacket; i++ {
  187. for j := 0; j < len(inBufs); j++ { //3
  188. if hasBlock[j] {
  189. tmpIn[j] = <-inBufs[j]
  190. } else {
  191. tmpIn[j] = zeroPkt
  192. }
  193. }
  194. fmt.Printf("%v", tmpIn)
  195. if needRepair {
  196. err := enc.Repair(tmpIn)
  197. print("&&&&&")
  198. if err != nil {
  199. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  200. }
  201. }
  202. //fmt.Printf("%v",tmpIn)
  203. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  204. outBufs[j] <- tmpIn[j]
  205. }
  206. }
  207. fmt.Println("decode over")
  208. for i := 0; i < len(outBufs); i++ {
  209. close(outBufs[i])
  210. }
  211. }
  212. func (service *Service) get(blockHash string, getBuf chan []byte, numPacket int64) {
  213. /*
  214. data := CatIPFS(blockHash)
  215. for i := 0; int64(i) < numPacket; i++ {
  216. buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize])
  217. getBuf <- buf
  218. }
  219. close(getBuf)
  220. */
  221. }
  222. func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
  223. //这里的localFilePath应该是要写入的filename
  224. fDir, err := os.Executable()
  225. if err != nil {
  226. panic(err)
  227. }
  228. fURL := filepath.Join(filepath.Dir(fDir), "assets3")
  229. _, err = os.Stat(fURL)
  230. if os.IsNotExist(err) {
  231. os.MkdirAll(fURL, os.ModePerm)
  232. }
  233. file, err := os.Create(filepath.Join(fURL, localFilePath))
  234. if err != nil {
  235. return
  236. }
  237. for i := 0; int64(i) < numPacket; i++ {
  238. for j := 0; j < len(inBuf); j++ {
  239. tmp := <-inBuf[j]
  240. fmt.Println(tmp)
  241. file.Write(tmp)
  242. }
  243. }
  244. file.Close()
  245. wg.Done()
  246. }
  247. func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRepObject) (*agtmsg.StartStorageUploadRepObjectResp, *ramsg.CodeMessage) {
  248. fullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.StorageDirectory, msg.FilePath)
  249. file, err := os.Open(fullPath)
  250. if err != nil {
  251. logger.Warnf("opening file %s: %s", fullPath, err.Error())
  252. return nil, ramsg.Failed(errorcode.OperationFailed, "open file failed")
  253. }
  254. fileInfo, err := file.Stat()
  255. if err != nil {
  256. file.Close()
  257. logger.Warnf("getting file %s state: %s", fullPath, err.Error())
  258. return nil, ramsg.Failed(errorcode.OperationFailed, "get file info failed")
  259. }
  260. fileSize := fileInfo.Size()
  261. uploadObject := task.UploadObject{
  262. ObjectName: msg.ObjectName,
  263. File: file,
  264. FileSize: fileSize,
  265. }
  266. uploadObjects := []task.UploadObject{uploadObject}
  267. // Task会关闭文件流
  268. tsk := svc.taskManager.StartNew(task.NewUploadRepObjects(msg.UserID, msg.BucketID, uploadObjects, msg.RepCount))
  269. return ramsg.ReplyOK(agtmsg.NewStartStorageUploadRepObjectResp(tsk.ID()))
  270. }
  271. func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *ramsg.CodeMessage) {
  272. tsk := svc.taskManager.FindByID(msg.TaskID)
  273. if tsk == nil {
  274. return nil, ramsg.Failed(errorcode.TaskNotFound, "task not found")
  275. }
  276. if msg.WaitTimeoutMs == 0 {
  277. tsk.Wait()
  278. } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
  279. return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(false, "", 0, ""))
  280. }
  281. uploadTask := tsk.Body().(*task.UploadRepObjects)
  282. uploadRet := uploadTask.Results[0]
  283. errMsg := ""
  284. if tsk.Error() != nil {
  285. errMsg = tsk.Error().Error()
  286. }
  287. if uploadRet.Error != nil {
  288. errMsg = uploadRet.Error.Error()
  289. }
  290. return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(true, errMsg, uploadRet.ObjectID, uploadRet.FileHash))
  291. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。