You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 14 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package services
  2. import (
  3. "database/sql"
  4. "errors"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/models"
  9. "gitlink.org.cn/cloudream/common/pkg/logger"
  10. "gitlink.org.cn/cloudream/db/model"
  11. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  12. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  13. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  14. )
  15. func (svc *Service) GetObjectsByDirName(msg *coormsg.GetObjectsByDirName) (*coormsg.GetObjectsResp, *ramsg.CodeMessage) {
  16. //查询dirName下所有文件
  17. objects, err := svc.db.Object().GetByDirName(svc.db.SQLCtx(), msg.DirName)
  18. if err != nil {
  19. logger.WithField("DirName", msg.DirName).
  20. Warnf("query dirname failed, err: %s", err.Error())
  21. return ramsg.ReplyFailed[coormsg.GetObjectsResp](errorcode.OperationFailed, "get objects failed")
  22. }
  23. return ramsg.ReplyOK(coormsg.NewGetObjectsResp(objects))
  24. }
  25. func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *ramsg.CodeMessage) {
  26. // 查询文件对象
  27. object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  28. if err != nil {
  29. logger.WithField("ObjectID", msg.ObjectID).
  30. Warnf("query Object failed, err: %s", err.Error())
  31. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Object failed")
  32. }
  33. // 查询客户端所属节点
  34. foundBelongNode := true
  35. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  36. if err == sql.ErrNoRows {
  37. foundBelongNode = false
  38. } else if err != nil {
  39. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  40. Warnf("query client belong node failed, err: %s", err.Error())
  41. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query client belong node failed")
  42. }
  43. logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID)
  44. //-若redundancy是rep,查询对象副本表, 获得FileHash
  45. if object.Redundancy == models.RedundancyRep {
  46. objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
  47. if err != nil {
  48. logger.WithField("ObjectID", object.ObjectID).
  49. Warnf("get ObjectRep failed, err: %s", err.Error())
  50. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query ObjectRep failed")
  51. }
  52. // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
  53. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash)
  54. if err != nil {
  55. logger.WithField("FileHash", objectRep.FileHash).
  56. Warnf("query Cache failed, err: %s", err.Error())
  57. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed")
  58. }
  59. var respNodes []ramsg.RespNode
  60. for _, node := range nodes {
  61. respNodes = append(respNodes, ramsg.NewRespNode(
  62. node.NodeID,
  63. node.ExternalIP,
  64. node.LocalIP,
  65. // LocationID 相同则认为是在同一个地域
  66. foundBelongNode && belongNode.LocationID == node.LocationID,
  67. ))
  68. }
  69. return ramsg.ReplyOK(coormsg.NewPreDownloadObjectResp(
  70. object.FileSize,
  71. ramsg.NewRespRepRedundancyData(objectRep.FileHash, respNodes),
  72. ))
  73. } else {
  74. // TODO 参考上面进行重写
  75. ecName := object.Redundancy
  76. blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  77. if err != nil {
  78. logger.WithField("ObjectID", object.ObjectID).
  79. Warnf("query Blocks failed, err: %s", err.Error())
  80. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Blocks failed")
  81. }
  82. logger.Debugf(blocks[4].BlockHash)
  83. //查询纠删码参数
  84. ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), ecName)
  85. ecc := ramsg.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN)
  86. //查询每个编码块存放的所有节点
  87. respNodes := make([][]ramsg.RespNode, len(blocks))
  88. for i := 0; i < len(blocks); i++ {
  89. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, blocks[i].BlockHash)
  90. if err != nil {
  91. logger.WithField("FileHash", blocks[i].BlockHash).
  92. Warnf("query Cache failed, err: %s", err.Error())
  93. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed")
  94. }
  95. var nd []ramsg.RespNode
  96. for _, node := range nodes {
  97. nd = append(nd, ramsg.NewRespNode(
  98. node.NodeID,
  99. node.ExternalIP,
  100. node.LocalIP,
  101. // LocationID 相同则认为是在同一个地域
  102. foundBelongNode && belongNode.LocationID == node.LocationID,
  103. ))
  104. }
  105. respNodes[i] = nd
  106. logger.Debugf("##%d\n", i)
  107. }
  108. var blockss []ramsg.RespObjectBlock
  109. for i := 0; i < len(blocks); i++ {
  110. blockss = append(blockss, ramsg.NewRespObjectBlock(
  111. blocks[i].InnerID,
  112. blocks[i].BlockHash,
  113. ))
  114. }
  115. return ramsg.ReplyOK(coormsg.NewPreDownloadObjectResp(
  116. object.FileSize,
  117. ramsg.NewRespEcRedundancyData(ecc, blockss, respNodes),
  118. ))
  119. }
  120. }
  121. func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *ramsg.CodeMessage) {
  122. // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
  123. // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
  124. isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID)
  125. if err != nil {
  126. logger.WithField("BucketID", msg.BucketID).
  127. Warnf("check bucket available failed, err: %s", err.Error())
  128. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "check bucket available failed")
  129. }
  130. if !isBucketAvai {
  131. logger.WithField("BucketID", msg.BucketID).
  132. Warnf("bucket is not available to user")
  133. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "bucket is not available to user")
  134. }
  135. _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName)
  136. if err == nil {
  137. logger.WithField("BucketID", msg.BucketID).
  138. WithField("ObjectName", msg.ObjectName).
  139. Warnf("object with given Name and BucketID already exists")
  140. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "object with given Name and BucketID already exists")
  141. }
  142. if !errors.Is(err, sql.ErrNoRows) {
  143. logger.WithField("BucketID", msg.BucketID).
  144. WithField("ObjectName", msg.ObjectName).
  145. Warnf("get object by name failed, err: %s", err.Error())
  146. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "get object by name failed")
  147. }
  148. //查询用户可用的节点IP
  149. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  150. if err != nil {
  151. logger.WithField("UserID", msg.UserID).
  152. Warnf("query user nodes failed, err: %s", err.Error())
  153. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query user nodes failed")
  154. }
  155. // 查询客户端所属节点
  156. foundBelongNode := true
  157. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  158. if err == sql.ErrNoRows {
  159. foundBelongNode = false
  160. } else if err != nil {
  161. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  162. Warnf("query client belong node failed, err: %s", err.Error())
  163. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query client belong node failed")
  164. }
  165. var respNodes []ramsg.RespNode
  166. for _, node := range nodes {
  167. respNodes = append(respNodes, ramsg.NewRespNode(
  168. node.NodeID,
  169. node.ExternalIP,
  170. node.LocalIP,
  171. // LocationID 相同则认为是在同一个地域
  172. foundBelongNode && belongNode.LocationID == node.LocationID,
  173. ))
  174. }
  175. return ramsg.ReplyOK(coormsg.NewPreUploadResp(respNodes))
  176. }
  177. func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *ramsg.CodeMessage) {
  178. var objID int64
  179. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  180. var err error
  181. objID, err = svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash, msg.DirName)
  182. return err
  183. })
  184. if err != nil {
  185. logger.WithField("BucketName", msg.BucketID).
  186. WithField("ObjectName", msg.ObjectName).
  187. Warnf("create rep object failed, err: %s", err.Error())
  188. return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OperationFailed, "create rep object failed")
  189. }
  190. // 紧急任务
  191. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  192. if err != nil {
  193. logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
  194. }
  195. return ramsg.ReplyOK(coormsg.NewCreateObjectResp(objID))
  196. }
  197. func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *ramsg.CodeMessage) {
  198. // TODO 检查用户是否有Object的权限
  199. // 获取对象信息
  200. obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  201. if err != nil {
  202. logger.WithField("ObjectID", msg.ObjectID).
  203. Warnf("get object failed, err: %s", err.Error())
  204. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object failed")
  205. }
  206. if obj.Redundancy != models.RedundancyRep {
  207. logger.WithField("ObjectID", msg.ObjectID).
  208. Warnf("this object is not a rep object")
  209. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "this object is not a rep object")
  210. }
  211. // 获取对象Rep信息
  212. objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  213. if err != nil {
  214. logger.WithField("ObjectID", msg.ObjectID).
  215. Warnf("get object rep failed, err: %s", err.Error())
  216. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object rep failed")
  217. }
  218. //查询用户可用的节点IP
  219. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  220. if err != nil {
  221. logger.WithField("UserID", msg.UserID).
  222. Warnf("query user nodes failed, err: %s", err.Error())
  223. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query user nodes failed")
  224. }
  225. // 查询客户端所属节点
  226. foundBelongNode := true
  227. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  228. if err == sql.ErrNoRows {
  229. foundBelongNode = false
  230. } else if err != nil {
  231. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  232. Warnf("query client belong node failed, err: %s", err.Error())
  233. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query client belong node failed")
  234. }
  235. // 查询保存了旧文件的节点信息
  236. cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash)
  237. if err != nil {
  238. logger.Warnf("find caching file user nodes failed, err: %s", err.Error())
  239. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "find caching file user nodes failed")
  240. }
  241. var retNodes []coormsg.PreUpdateRepObjectRespNode
  242. for _, node := range nodes {
  243. retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode(
  244. node.NodeID,
  245. node.ExternalIP,
  246. node.LocalIP,
  247. // LocationID 相同则认为是在同一个地域
  248. foundBelongNode && belongNode.LocationID == node.LocationID,
  249. // 此节点存储了对象旧文件
  250. lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }),
  251. ))
  252. }
  253. return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes))
  254. }
  255. func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *ramsg.CodeMessage) {
  256. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  257. return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash)
  258. })
  259. if err != nil {
  260. logger.WithField("ObjectID", msg.ObjectID).
  261. Warnf("update rep object failed, err: %s", err.Error())
  262. return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OperationFailed, "update rep object failed")
  263. }
  264. // 紧急任务
  265. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  266. if err != nil {
  267. logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error())
  268. }
  269. return ramsg.ReplyOK(coormsg.NewUpdateRepObjectResp())
  270. }
  271. func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *ramsg.CodeMessage) {
  272. isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  273. if err != nil {
  274. logger.WithField("UserID", msg.UserID).
  275. WithField("ObjectID", msg.ObjectID).
  276. Warnf("check object available failed, err: %s", err.Error())
  277. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "check object available failed")
  278. }
  279. if !isAva {
  280. logger.WithField("UserID", msg.UserID).
  281. WithField("ObjectID", msg.ObjectID).
  282. Warnf("object is not available to the user")
  283. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "object is not available to the user")
  284. }
  285. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  286. return svc.db.Object().SoftDelete(tx, msg.ObjectID)
  287. })
  288. if err != nil {
  289. logger.WithField("UserID", msg.UserID).
  290. WithField("ObjectID", msg.ObjectID).
  291. Warnf("set object deleted failed, err: %s", err.Error())
  292. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "set object deleted failed")
  293. }
  294. stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID)
  295. if err != nil {
  296. logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
  297. return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
  298. }
  299. // 不追求及时、准确
  300. if len(stgs) == 0 {
  301. // 如果没有被引用,直接投递CheckObject的任务
  302. err := svc.scanner.PostEvent(scevt.NewCheckObject([]int64{msg.ObjectID}), false, false)
  303. if err != nil {
  304. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  305. }
  306. logger.Debugf("post check object event")
  307. } else {
  308. // 有引用则让Agent去检查StorageObject
  309. for _, stg := range stgs {
  310. err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.ObjectID}), false, false)
  311. if err != nil {
  312. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  313. }
  314. }
  315. logger.Debugf("post agent check storage event")
  316. }
  317. return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
  318. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。