You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

package.go 11 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package services
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "sort"
  6. "github.com/jmoiron/sqlx"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/models"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/pkgs/mq"
  11. coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
  12. scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event"
  13. )
  14. func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) {
  15. pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
  16. if err != nil {
  17. logger.WithField("PackageID", msg.PackageID).
  18. Warnf("get package: %s", err.Error())
  19. return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
  20. }
  21. return mq.ReplyOK(coormq.NewGetPackageResp(pkg))
  22. }
  23. func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {
  24. // TODO 检查用户是否有权限
  25. objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID)
  26. if err != nil {
  27. logger.WithField("PackageID", msg.PackageID).
  28. Warnf("get package objects: %s", err.Error())
  29. return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed")
  30. }
  31. return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs))
  32. }
  33. func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) {
  34. var pkgID int64
  35. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  36. var err error
  37. pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name, msg.Redundancy)
  38. return err
  39. })
  40. if err != nil {
  41. logger.WithField("BucketID", msg.BucketID).
  42. WithField("Name", msg.Name).
  43. Warnf("creating package: %s", err.Error())
  44. return nil, mq.Failed(errorcode.OperationFailed, "creating package failed")
  45. }
  46. return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID))
  47. }
  48. func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.UpdateRepPackageResp, *mq.CodeMessage) {
  49. _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
  50. if err != nil {
  51. logger.WithField("PackageID", msg.PackageID).
  52. Warnf("get package: %s", err.Error())
  53. return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
  54. }
  55. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  56. // 先执行删除操作
  57. if len(msg.Deletes) > 0 {
  58. if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil {
  59. return fmt.Errorf("deleting objects: %w", err)
  60. }
  61. }
  62. // 再执行添加操作
  63. if len(msg.Adds) > 0 {
  64. if _, err := svc.db.Object().BatchAddRep(tx, msg.PackageID, msg.Adds); err != nil {
  65. return fmt.Errorf("adding objects: %w", err)
  66. }
  67. }
  68. return nil
  69. })
  70. if err != nil {
  71. logger.Warn(err.Error())
  72. return nil, mq.Failed(errorcode.OperationFailed, "update rep package failed")
  73. }
  74. // 紧急任务
  75. var affectFileHashes []string
  76. for _, add := range msg.Adds {
  77. affectFileHashes = append(affectFileHashes, add.FileHash)
  78. }
  79. err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true)
  80. if err != nil {
  81. logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
  82. }
  83. return mq.ReplyOK(coormq.NewUpdateRepPackageResp())
  84. }
  85. func (svc *Service) UpdateECPackage(msg *coormq.UpdateECPackage) (*coormq.UpdateECPackageResp, *mq.CodeMessage) {
  86. _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
  87. if err != nil {
  88. logger.WithField("PackageID", msg.PackageID).
  89. Warnf("get package: %s", err.Error())
  90. return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
  91. }
  92. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  93. // 先执行删除操作
  94. if len(msg.Deletes) > 0 {
  95. if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil {
  96. return fmt.Errorf("deleting objects: %w", err)
  97. }
  98. }
  99. // 再执行添加操作
  100. if len(msg.Adds) > 0 {
  101. if _, err := svc.db.Object().BatchAddEC(tx, msg.PackageID, msg.Adds); err != nil {
  102. return fmt.Errorf("adding objects: %w", err)
  103. }
  104. }
  105. return nil
  106. })
  107. if err != nil {
  108. logger.Warn(err.Error())
  109. return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed")
  110. }
  111. return mq.ReplyOK(coormq.NewUpdateECPackageResp())
  112. }
  113. func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) {
  114. isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
  115. if err != nil {
  116. logger.WithField("UserID", msg.UserID).
  117. WithField("PackageID", msg.PackageID).
  118. Warnf("check package available failed, err: %s", err.Error())
  119. return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "check package available failed")
  120. }
  121. if !isAva {
  122. logger.WithField("UserID", msg.UserID).
  123. WithField("PackageID", msg.PackageID).
  124. Warnf("package is not available to the user")
  125. return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "package is not available to the user")
  126. }
  127. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  128. return svc.db.Package().SoftDelete(tx, msg.PackageID)
  129. })
  130. if err != nil {
  131. logger.WithField("UserID", msg.UserID).
  132. WithField("PackageID", msg.PackageID).
  133. Warnf("set package deleted failed, err: %s", err.Error())
  134. return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "set package deleted failed")
  135. }
  136. stgs, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID)
  137. if err != nil {
  138. logger.Warnf("find package storages failed, but this will not affect the deleting, err: %s", err.Error())
  139. return mq.ReplyOK(coormq.NewDeletePackageResp())
  140. }
  141. // 不追求及时、准确
  142. if len(stgs) == 0 {
  143. // 如果没有被引用,直接投递CheckPackage的任务
  144. err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false)
  145. if err != nil {
  146. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  147. }
  148. logger.Debugf("post check package event")
  149. } else {
  150. // 有引用则让Agent去检查StoragePackage
  151. for _, stg := range stgs {
  152. err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false)
  153. if err != nil {
  154. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  155. }
  156. }
  157. logger.Debugf("post agent check storage event")
  158. }
  159. return mq.ReplyOK(coormq.NewDeletePackageResp())
  160. }
  161. func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*coormq.GetPackageCachedNodesResp, *mq.CodeMessage) {
  162. isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
  163. if err != nil {
  164. logger.WithField("UserID", msg.UserID).
  165. WithField("PackageID", msg.PackageID).
  166. Warnf("check package available failed, err: %s", err.Error())
  167. return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "check package available failed")
  168. }
  169. if !isAva {
  170. logger.WithField("UserID", msg.UserID).
  171. WithField("PackageID", msg.PackageID).
  172. Warnf("package is not available to the user")
  173. return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "package is not available to the user")
  174. }
  175. pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
  176. if err != nil {
  177. logger.WithField("PackageID", msg.PackageID).
  178. Warnf("get package: %s", err.Error())
  179. return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
  180. }
  181. var packageSize int64
  182. nodeInfoMap := make(map[int64]*models.NodePackageCachingInfo)
  183. if pkg.Redundancy.IsRepInfo() {
  184. // 备份方式为rep
  185. objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
  186. if err != nil {
  187. logger.WithField("PackageID", msg.PackageID).
  188. Warnf("get objectRepDatas by packageID failed, err: %s", err.Error())
  189. return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectRepDatas by packageID failed")
  190. }
  191. for _, data := range objectRepDatas {
  192. packageSize += data.Object.Size
  193. for _, nodeID := range data.NodeIDs {
  194. nodeInfo, exists := nodeInfoMap[nodeID]
  195. if !exists {
  196. nodeInfo = &models.NodePackageCachingInfo{
  197. NodeID: nodeID,
  198. FileSize: data.Object.Size,
  199. ObjectCount: 1,
  200. }
  201. } else {
  202. nodeInfo.FileSize += data.Object.Size
  203. nodeInfo.ObjectCount++
  204. }
  205. nodeInfoMap[nodeID] = nodeInfo
  206. }
  207. }
  208. } else if pkg.Redundancy.IsECInfo() {
  209. // 备份方式为ec
  210. objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID)
  211. if err != nil {
  212. logger.WithField("PackageID", msg.PackageID).
  213. Warnf("get objectECDatas by packageID failed, err: %s", err.Error())
  214. return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "get objectECDatas by packageID failed")
  215. }
  216. for _, ecData := range objectECDatas {
  217. packageSize += ecData.Object.Size
  218. for _, block := range ecData.Blocks {
  219. for _, nodeID := range block.NodeIDs {
  220. nodeInfo, exists := nodeInfoMap[nodeID]
  221. if !exists {
  222. nodeInfo = &models.NodePackageCachingInfo{
  223. NodeID: nodeID,
  224. FileSize: ecData.Object.Size,
  225. ObjectCount: 1,
  226. }
  227. } else {
  228. nodeInfo.FileSize += ecData.Object.Size
  229. nodeInfo.ObjectCount++
  230. }
  231. nodeInfoMap[nodeID] = nodeInfo
  232. }
  233. }
  234. }
  235. } else {
  236. logger.WithField("PackageID", msg.PackageID).
  237. Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type)
  238. return mq.ReplyFailed[coormq.GetPackageCachedNodesResp](errorcode.OperationFailed, "redundancy type is wrong")
  239. }
  240. var nodeInfos []models.NodePackageCachingInfo
  241. for _, nodeInfo := range nodeInfoMap {
  242. nodeInfos = append(nodeInfos, *nodeInfo)
  243. }
  244. sort.Slice(nodeInfos, func(i, j int) bool {
  245. return nodeInfos[i].NodeID < nodeInfos[j].NodeID
  246. })
  247. return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize, pkg.Redundancy.Type))
  248. }
  249. func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) {
  250. storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID)
  251. if err != nil {
  252. logger.WithField("PackageID", msg.PackageID).
  253. Warnf("get storages by packageID failed, err: %s", err.Error())
  254. return mq.ReplyFailed[coormq.GetPackageLoadedNodesResp](errorcode.OperationFailed, "get storages by packageID failed")
  255. }
  256. uniqueNodeIDs := make(map[int64]bool)
  257. var nodeIDs []int64
  258. for _, stg := range storages {
  259. if !uniqueNodeIDs[stg.NodeID] {
  260. uniqueNodeIDs[stg.NodeID] = true
  261. nodeIDs = append(nodeIDs, stg.NodeID)
  262. }
  263. }
  264. return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs))
  265. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。