You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 8.9 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package cmd
  2. import (
  3. "fmt"
  4. "io/fs"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "time"
  9. "github.com/samber/lo"
  10. "gitlink.org.cn/cloudream/agent/internal/config"
  11. "gitlink.org.cn/cloudream/agent/internal/task"
  12. "gitlink.org.cn/cloudream/common/consts"
  13. "gitlink.org.cn/cloudream/common/models"
  14. "gitlink.org.cn/cloudream/common/pkg/logger"
  15. "gitlink.org.cn/cloudream/common/utils"
  16. "gitlink.org.cn/cloudream/ec"
  17. "gitlink.org.cn/cloudream/common/consts/errorcode"
  18. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  19. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  20. )
  21. func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObject) (*agtmsg.StartStorageMoveObjectResp, *ramsg.CodeMessage) {
  22. // TODO 修改文件名,可用objectname
  23. outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID)
  24. objectDir := filepath.Dir(msg.ObjectName)
  25. outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory, objectDir, outFileName)
  26. if repRed, ok := msg.Redundancy.(models.RepRedundancyData); ok {
  27. taskID, err := service.moveRepObject(repRed, outFilePath)
  28. if err != nil {
  29. logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error())
  30. return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move rep object failed")
  31. }
  32. return ramsg.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID))
  33. } else if repRed, ok := msg.Redundancy.(models.ECRedundancyData); ok {
  34. taskID, err := service.moveEcObject(msg.ObjectID, msg.FileSize, repRed, outFilePath)
  35. if err != nil {
  36. logger.Warnf("move ec object as %s failed, err: %s", outFilePath, err.Error())
  37. return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move ec object failed")
  38. }
  39. return ramsg.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID))
  40. }
  41. return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "not rep or ec object???")
  42. }
  43. func (svc *Service) moveRepObject(repData models.RepRedundancyData, outFilePath string) (string, error) {
  44. tsk := svc.taskManager.StartComparable(task.NewIPFSRead(repData.FileHash, outFilePath))
  45. return tsk.ID(), nil
  46. }
  47. func (svc *Service) moveEcObject(objID int64, fileSize int64, ecData models.ECRedundancyData, outFilePath string) (string, error) {
  48. ecK := ecData.Ec.EcK
  49. blockIDs := make([]int, ecK)
  50. hashs := make([]string, ecK)
  51. for i := 0; i < ecK; i++ {
  52. blockIDs[i] = i
  53. hashs[i] = ecData.Blocks[i].FileHash
  54. }
  55. tsk := svc.taskManager.StartComparable(task.NewEcRead(objID, fileSize, ecData.Ec, blockIDs, hashs, outFilePath))
  56. return tsk.ID(), nil
  57. }
  58. func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*agtmsg.WaitStorageMoveObjectResp, *ramsg.CodeMessage) {
  59. logger.WithField("TaskID", msg.TaskID).Debugf("wait moving object")
  60. tsk := svc.taskManager.FindByID(msg.TaskID)
  61. if tsk == nil {
  62. return ramsg.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TaskNotFound, "task not found")
  63. }
  64. if msg.WaitTimeoutMs == 0 {
  65. tsk.Wait()
  66. errMsg := ""
  67. if tsk.Error() != nil {
  68. errMsg = tsk.Error().Error()
  69. }
  70. return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg))
  71. } else {
  72. if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
  73. errMsg := ""
  74. if tsk.Error() != nil {
  75. errMsg = tsk.Error().Error()
  76. }
  77. return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg))
  78. }
  79. return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(false, ""))
  80. }
  81. }
  82. func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) {
  83. dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory)
  84. infos, err := ioutil.ReadDir(dirFullPath)
  85. if err != nil {
  86. logger.Warnf("list storage directory failed, err: %s", err.Error())
  87. return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(
  88. err.Error(),
  89. nil,
  90. ))
  91. }
  92. fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() })
  93. if msg.IsComplete {
  94. return svc.checkStorageComplete(msg, fileInfos)
  95. } else {
  96. return svc.checkStorageIncrement(msg, fileInfos)
  97. }
  98. }
  99. func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) {
  100. infosMap := make(map[string]fs.FileInfo)
  101. for _, info := range fileInfos {
  102. infosMap[info.Name()] = info
  103. }
  104. var entries []agtmsg.StorageCheckRespEntry
  105. for _, obj := range msg.Objects {
  106. fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
  107. _, ok := infosMap[fileName]
  108. if ok {
  109. // 不需要做处理
  110. // 删除map中的记录,表示此记录已被检查过
  111. delete(infosMap, fileName)
  112. } else {
  113. // 只要文件不存在,就删除StorageObject表中的记录
  114. entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
  115. }
  116. }
  117. // 增量情况下,不需要对infosMap中没检查的记录进行处理
  118. return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
  119. }
  120. func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) {
  121. infosMap := make(map[string]fs.FileInfo)
  122. for _, info := range fileInfos {
  123. infosMap[info.Name()] = info
  124. }
  125. var entries []agtmsg.StorageCheckRespEntry
  126. for _, obj := range msg.Objects {
  127. fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
  128. _, ok := infosMap[fileName]
  129. if ok {
  130. // 不需要做处理
  131. // 删除map中的记录,表示此记录已被检查过
  132. delete(infosMap, fileName)
  133. } else {
  134. // 只要文件不存在,就删除StorageObject表中的记录
  135. entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
  136. }
  137. }
  138. return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
  139. }
  140. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
  141. fmt.Println("decode ")
  142. var tmpIn [][]byte
  143. var zeroPkt []byte
  144. tmpIn = make([][]byte, len(inBufs))
  145. hasBlock := map[int]bool{}
  146. for j := 0; j < len(blockSeq); j++ {
  147. hasBlock[blockSeq[j]] = true
  148. }
  149. needRepair := false //检测是否传入了所有数据块
  150. for j := 0; j < len(outBufs); j++ {
  151. if blockSeq[j] != j {
  152. needRepair = true
  153. }
  154. }
  155. enc := ec.NewRsEnc(ecK, len(inBufs))
  156. for i := 0; int64(i) < numPacket; i++ {
  157. for j := 0; j < len(inBufs); j++ { //3
  158. if hasBlock[j] {
  159. tmpIn[j] = <-inBufs[j]
  160. } else {
  161. tmpIn[j] = zeroPkt
  162. }
  163. }
  164. if needRepair {
  165. err := enc.Repair(tmpIn)
  166. if err != nil {
  167. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  168. }
  169. }
  170. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  171. outBufs[j] <- tmpIn[j]
  172. }
  173. }
  174. for i := 0; i < len(outBufs); i++ {
  175. close(outBufs[i])
  176. }
  177. }
  178. func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRepObject) (*agtmsg.StartStorageUploadRepObjectResp, *ramsg.CodeMessage) {
  179. fullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.StorageDirectory, msg.FilePath)
  180. file, err := os.Open(fullPath)
  181. if err != nil {
  182. logger.Warnf("opening file %s: %s", fullPath, err.Error())
  183. return nil, ramsg.Failed(errorcode.OperationFailed, "open file failed")
  184. }
  185. fileInfo, err := file.Stat()
  186. if err != nil {
  187. file.Close()
  188. logger.Warnf("getting file %s state: %s", fullPath, err.Error())
  189. return nil, ramsg.Failed(errorcode.OperationFailed, "get file info failed")
  190. }
  191. fileSize := fileInfo.Size()
  192. uploadObject := task.UploadObject{
  193. ObjectName: msg.ObjectName,
  194. File: file,
  195. FileSize: fileSize,
  196. }
  197. uploadObjects := []task.UploadObject{uploadObject}
  198. // Task会关闭文件流
  199. tsk := svc.taskManager.StartNew(task.NewUploadRepObjects(msg.UserID, msg.BucketID, uploadObjects, msg.RepCount))
  200. return ramsg.ReplyOK(agtmsg.NewStartStorageUploadRepObjectResp(tsk.ID()))
  201. }
  202. func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *ramsg.CodeMessage) {
  203. tsk := svc.taskManager.FindByID(msg.TaskID)
  204. if tsk == nil {
  205. return nil, ramsg.Failed(errorcode.TaskNotFound, "task not found")
  206. }
  207. if msg.WaitTimeoutMs == 0 {
  208. tsk.Wait()
  209. } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
  210. return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(false, "", 0, ""))
  211. }
  212. uploadTask := tsk.Body().(*task.UploadRepObjects)
  213. uploadRet := uploadTask.Results[0]
  214. errMsg := ""
  215. if tsk.Error() != nil {
  216. errMsg = tsk.Error().Error()
  217. }
  218. if uploadRet.Error != nil {
  219. errMsg = uploadRet.Error.Error()
  220. }
  221. return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(true, errMsg, uploadRet.ObjectID, uploadRet.FileHash))
  222. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。