You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

command_service.go 8.8 kB


  1. package main
  2. import (
  3. log "github.com/sirupsen/logrus"
  4. mydb "gitlink.org.cn/cloudream/db"
  5. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  6. "gitlink.org.cn/cloudream/utils"
  7. "gitlink.org.cn/cloudream/utils/consts"
  8. "gitlink.org.cn/cloudream/utils/consts/errorcode"
  9. )
  10. type CommandService struct {
  11. db *mydb.DB
  12. }
  13. func NewCommandService(db *mydb.DB) *CommandService {
  14. return &CommandService{
  15. db: db,
  16. }
  17. }
  18. func (service *CommandService) Read(msg *ramsg.ReadCommand) ramsg.ReadResp {
  19. var hashes []string
  20. blockIDs := []int{0}
  21. // 查询文件对象
  22. object, err := service.db.QueryObjectByID(msg.BucketID, msg.ObjectID)
  23. if err != nil {
  24. log.WithField("BucketID", msg.BucketID).
  25. WithField("ObjectID", msg.ObjectID).
  26. Warnf("query Object failed, err: %s", err.Error())
  27. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
  28. }
  29. var nodeIPs []string
  30. //-若redundancy是rep,查询对象副本表, 获得repHash
  31. if object.Redundancy == consts.REDUNDANCY_REP {
  32. objectRep, err := service.db.QueryObjectRep(object.ObjectID)
  33. if err != nil {
  34. log.WithField("ObjectID", object.ObjectID).
  35. Warnf("query ObjectRep failed, err: %s", err.Error())
  36. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
  37. }
  38. hashes = append(hashes, objectRep.RepHash)
  39. nodes, err := service.db.QueryCacheNodeByBlockHash(objectRep.RepHash)
  40. if err != nil {
  41. log.WithField("RepHash", objectRep.RepHash).
  42. Warnf("query Cache failed, err: %s", err.Error())
  43. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed")
  44. }
  45. for _, node := range nodes {
  46. nodeIPs = append(nodeIPs, node.IP)
  47. }
  48. } else {
  49. blocks, err := service.db.QueryObjectBlock(object.ObjectID)
  50. if err != nil {
  51. log.WithField("ObjectID", object.ObjectID).
  52. Warnf("query Object Block failed, err: %s", err.Error())
  53. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object Block failed")
  54. }
  55. ecPolicies := *utils.GetEcPolicy()
  56. ecPolicy := ecPolicies[*object.ECName]
  57. ecN := ecPolicy.GetN()
  58. ecK := ecPolicy.GetK()
  59. nodeIPs = make([]string, ecN)
  60. hashes = make([]string, ecN)
  61. for _, tt := range blocks {
  62. id := tt.InnerID
  63. hash := tt.BlockHash
  64. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  65. nodes, err := service.db.QueryCacheNodeByBlockHash(hash)
  66. if err != nil {
  67. log.WithField("BlockHash", hash).
  68. Warnf("query Cache failed, err: %s", err.Error())
  69. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed")
  70. }
  71. if len(nodes) == 0 {
  72. log.WithField("BlockHash", hash).
  73. Warnf("No node cache the block data for the BlockHash")
  74. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  75. }
  76. nodeIPs[id] = nodes[0].IP
  77. }
  78. //这里也有和上面一样的问题
  79. for i := 1; i < ecK; i++ {
  80. blockIDs = append(blockIDs, i)
  81. }
  82. }
  83. return ramsg.NewCoorReadRespOK(
  84. object.Redundancy,
  85. nodeIPs,
  86. hashes,
  87. blockIDs,
  88. object.ECName,
  89. object.FileSizeInBytes,
  90. )
  91. }
  92. func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp {
  93. //查询数据库,获取冗余类型,冗余参数
  94. //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
  95. //-若redundancy是rep,查询对象副本表, 获得repHash
  96. //--ids :={0}
  97. //--hashs := {repHash}
  98. //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
  99. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  100. //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
  101. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  102. // 查询用户关联的存储服务
  103. stg, err := service.db.QueryUserStorage(msg.UserID, msg.StorageID)
  104. if err != nil {
  105. log.WithField("UserID", msg.UserID).
  106. WithField("StorageID", msg.StorageID).
  107. Warnf("query storage directory failed, err: %s", err.Error())
  108. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query storage directory failed")
  109. }
  110. // 查询文件对象
  111. object, err := service.db.QueryObjectByFullName(msg.BucketName, msg.ObjectName)
  112. if err != nil {
  113. log.WithField("BucketName", msg.BucketName).
  114. WithField("ObjectName", msg.ObjectName).
  115. Warnf("query Object failed, err: %s", err.Error())
  116. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
  117. }
  118. //-若redundancy是rep,查询对象副本表, 获得repHash
  119. var hashs []string
  120. ids := []int{0}
  121. if object.Redundancy == consts.REDUNDANCY_REP {
  122. objectRep, err := service.db.QueryObjectRep(object.ObjectID)
  123. if err != nil {
  124. log.Warnf("query ObjectRep failed, err: %s", err.Error())
  125. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
  126. }
  127. hashs = append(hashs, objectRep.RepHash)
  128. } else {
  129. blockHashs, err := service.db.QueryObjectBlock(object.ObjectID)
  130. if err != nil {
  131. log.Warnf("query ObjectBlock failed, err: %s", err.Error())
  132. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectBlock failed")
  133. }
  134. ecPolicies := *utils.GetEcPolicy()
  135. ecPolicy := ecPolicies[*object.ECName]
  136. ecN := ecPolicy.GetN()
  137. ecK := ecPolicy.GetK()
  138. ids = make([]int, ecK)
  139. for i := 0; i < ecN; i++ {
  140. hashs = append(hashs, "-1")
  141. }
  142. for i := 0; i < ecK; i++ {
  143. ids[i] = i
  144. }
  145. hashs = make([]string, ecN)
  146. for _, tt := range blockHashs {
  147. id := tt.InnerID
  148. hash := tt.BlockHash
  149. hashs[id] = hash
  150. }
  151. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  152. /*for id,hash := range blockHashs{
  153. //type Cache struct {NodeIP string,TempOrPin bool,Cachetime string}
  154. Cache := Query_Cache(hash)
  155. //利用Time_trans()函数可将Cache[i].Cachetime转化为时间戳格式
  156. //--查询节点延迟表,得到command.Destination与各个nodeIps的延迟,存到一个map类型中(Delay)
  157. Delay := make(map[string]int) // 延迟集合
  158. for i:=0; i<len(Cache); i++{
  159. Delay[Cache[i].NodeIP] = Query_NodeDelay(Destination, Cache[i].NodeIP)
  160. }
  161. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  162. }*/
  163. }
  164. return ramsg.NewCoorMoveRespOK(
  165. stg.NodeID,
  166. stg.Directory,
  167. object.Redundancy,
  168. object.ECName,
  169. hashs,
  170. ids,
  171. object.FileSizeInBytes,
  172. )
  173. }
  174. func (service *CommandService) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteResp {
  175. // TODO 需要在此处判断同名对象是否存在。等到WriteRepHash时再判断一次。
  176. // 此次的判断只作为参考,具体是否成功还是看WriteRepHash的结果
  177. //查询用户可用的节点IP
  178. nodes, err := service.db.QueryUserNodes(msg.UserID)
  179. if err != nil {
  180. log.Warnf("query user nodes failed, err: %s", err.Error())
  181. return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user nodes failed")
  182. }
  183. if len(nodes) < msg.ReplicateNumber {
  184. log.WithField("UserID", msg.UserID).
  185. WithField("ReplicateNumber", msg.ReplicateNumber).
  186. Warnf("user nodes are not enough")
  187. return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "user nodes are not enough")
  188. }
  189. numRep := msg.ReplicateNumber
  190. ids := make([]int, numRep)
  191. ips := make([]string, numRep)
  192. //随机选取numRep个nodeIp
  193. start := utils.GetRandInt(len(nodes))
  194. for i := 0; i < numRep; i++ {
  195. index := (start + i) % len(nodes)
  196. ids[i] = nodes[index].NodeID
  197. ips[i] = nodes[index].IP
  198. }
  199. return ramsg.NewCoorWriteRespOK(ids, ips)
  200. }
  201. func (service *CommandService) WriteRepHash(msg *ramsg.WriteRepHashCommand) ramsg.WriteHashResp {
  202. _, err := service.db.CreateRepObject(msg.BucketName, msg.ObjectName, msg.FileSizeInBytes, msg.ReplicateNumber, msg.NodeIDs, msg.Hashes)
  203. if err != nil {
  204. log.WithField("BucketName", msg.BucketName).
  205. WithField("ObjectName", msg.ObjectName).
  206. Warnf("create rep object failed, err: %s", err.Error())
  207. return ramsg.NewCoorWriteHashRespFailed(errorcode.OPERATION_FAILED, "create rep object failed")
  208. }
  209. return ramsg.NewCoorWriteHashRespOK()
  210. }
  211. func (service *CommandService) TempCacheReport(msg *ramsg.TempCacheReport) {
  212. service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID)
  213. }
  214. func (service *CommandService) AgentStatusReport(msg *ramsg.AgentStatusReport) {
  215. //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus
  216. //根据command中的Ip,插入节点延迟表
  217. // TODO
  218. /*
  219. ips := utils.GetAgentIps()
  220. Insert_NodeDelay(msg.IP, ips, msg.AgentDelay)
  221. //从配置表里读取节点地域NodeLocation
  222. //插入节点表的NodeStatus
  223. Insert_Node(msg.IP, msg.IP, msg.IPFSStatus, msg.LocalDirStatus)
  224. */
  225. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。