You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 6.2 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package services
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "github.com/jmoiron/sqlx"
  6. "gitlink.org.cn/cloudream/common/consts/errorcode"
  7. "gitlink.org.cn/cloudream/common/models"
  8. "gitlink.org.cn/cloudream/common/pkg/logger"
  9. //"gitlink.org.cn/cloudream/common/utils"
  10. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  11. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  12. )
  13. func (svc *Service) GetStorageInfo(msg *coormsg.GetStorageInfo) (*coormsg.GetStorageInfoResp, *ramsg.CodeMessage) {
  14. stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID)
  15. if err != nil {
  16. logger.Warnf("getting user storage: %s", err.Error())
  17. return nil, ramsg.Failed(errorcode.OperationFailed, "get user storage failed")
  18. }
  19. return ramsg.ReplyOK(coormsg.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State))
  20. }
  21. func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) (*coormsg.PreMoveObjectToStorageResp, *ramsg.CodeMessage) {
  22. //查询数据库,获取冗余类型,冗余参数
  23. //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSize
  24. //-若redundancy是rep,查询对象副本表, 获得repHash
  25. //--ids :={0}
  26. //--hashs := {repHash}
  27. //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
  28. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  29. //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
  30. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  31. // 查询用户关联的存储服务
  32. stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID)
  33. if err != nil {
  34. logger.WithField("UserID", msg.UserID).
  35. WithField("StorageID", msg.StorageID).
  36. Warnf("get user Storage failed, err: %s", err.Error())
  37. return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get user Storage failed")
  38. }
  39. // 查询文件对象
  40. object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  41. if err != nil {
  42. logger.WithField("ObjectID", msg.ObjectID).
  43. Warnf("get user Object failed, err: %s", err.Error())
  44. return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get user Object failed")
  45. }
  46. //-若redundancy是rep,查询对象副本表, 获得FileHash
  47. if object.Redundancy == models.RedundancyRep {
  48. objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
  49. if err != nil {
  50. logger.Warnf("get ObjectRep failed, err: %s", err.Error())
  51. return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get ObjectRep failed")
  52. }
  53. return ramsg.ReplyOK(coormsg.NewPreMoveObjectToStorageRespBody(
  54. stg.NodeID,
  55. stg.Directory,
  56. object,
  57. models.NewRedundancyRepData(objectRep.FileHash),
  58. ))
  59. } else {
  60. // TODO 以EC_开头的Redundancy才是EC策略
  61. ecName := object.Redundancy
  62. blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  63. if err != nil {
  64. logger.WithField("ObjectID", object.ObjectID).
  65. Warnf("query Blocks failed, err: %s", err.Error())
  66. return ramsg.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "query Blocks failed")
  67. }
  68. //查询纠删码参数
  69. ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), ecName)
  70. ecc := models.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN)
  71. /*//查询每个编码块存放的所有节点
  72. respNodes := make([][]ramsg.RespNode, len(blocks))
  73. for i:=0; i<len(blocks); i++{
  74. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, blocks[i].BlockHash)
  75. if err != nil {
  76. logger.WithField("FileHash", blocks[i].BlockHash).
  77. Warnf("query Cache failed, err: %s", err.Error())
  78. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed")
  79. }
  80. var nd []ramsg.RespNode
  81. for _, node := range nodes {
  82. nd = append(nd, ramsg.NewRespNode(
  83. node.NodeID,
  84. node.ExternalIP,
  85. node.LocalIP,
  86. // LocationID 相同则认为是在同一个地域
  87. foundBelongNode && belongNode.LocationID == node.LocationID,
  88. ))
  89. }
  90. respNodes[i] = nd
  91. logger.Debugf("##%d\n", i)
  92. }
  93. */
  94. blockss := make([]models.ObjectBlock, len(blocks))
  95. for i:=0; i<len(blocks); i++{
  96. blockss[i] = models.NewObjectBlock(
  97. blocks[i].InnerID,
  98. blocks[i].BlockHash,
  99. )
  100. }
  101. fmt.Println(blockss)
  102. return ramsg.ReplyOK(coormsg.NewPreMoveObjectToStorageRespBody(
  103. stg.NodeID,
  104. stg.Directory,
  105. object.FileSize,
  106. models.NewRedundancyEcData(ecc,blockss),
  107. ))
  108. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  109. /*for id,hash := range blockHashs{
  110. //type Cache struct {NodeIP string,TempOrPin bool,Cachetime string}
  111. Cache := Query_Cache(hash)
  112. //利用Time_trans()函数可将Cache[i].Cachetime转化为时间戳格式
  113. //--查询节点延迟表,得到command.Destination与各个nodeIps的延迟,存到一个map类型中(Delay)
  114. Delay := make(map[string]int) // 延迟集合
  115. for i:=0; i<len(Cache); i++{
  116. Delay[Cache[i].NodeIP] = Query_NodeDelay(Destination, Cache[i].NodeIP)
  117. }
  118. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  119. }*/
  120. }
  121. }
  122. func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) (*coormsg.MoveObjectToStorageResp, *ramsg.CodeMessage) {
  123. // TODO: 对于的storage中已经存在的文件,直接覆盖已有文件
  124. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  125. return svc.db.StorageObject().MoveObjectTo(tx, msg.ObjectID, msg.StorageID, msg.UserID)
  126. })
  127. if err != nil {
  128. logger.WithField("UserID", msg.UserID).
  129. WithField("ObjectID", msg.ObjectID).
  130. WithField("StorageID", msg.StorageID).
  131. Warnf("user move object to storage failed, err: %s", err.Error())
  132. return ramsg.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OperationFailed, "user move object to storage failed")
  133. }
  134. return ramsg.ReplyOK(coormsg.NewMoveObjectToStorageResp())
  135. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。