You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object_rep.go 4.3 kB

2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package db
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "github.com/jmoiron/sqlx"
  8. stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
  9. "gitlink.org.cn/cloudream/storage/common/consts"
  10. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  11. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  12. )
  13. type ObjectRepDB struct {
  14. *DB
  15. }
  16. func (db *DB) ObjectRep() *ObjectRepDB {
  17. return &ObjectRepDB{DB: db}
  18. }
  19. // GetObjectRep 查询对象副本表
  20. func (db *ObjectRepDB) GetByID(ctx SQLContext, objectID int64) (model.ObjectRep, error) {
  21. var ret model.ObjectRep
  22. err := sqlx.Get(ctx, &ret, "select * from ObjectRep where ObjectID = ?", objectID)
  23. return ret, err
  24. }
  25. func (db *ObjectRepDB) Create(ctx SQLContext, objectID int64, fileHash string) error {
  26. _, err := ctx.Exec("insert into ObjectRep(ObjectID, FileHash) values(?,?)", objectID, fileHash)
  27. return err
  28. }
  29. func (db *ObjectRepDB) Update(ctx SQLContext, objectID int64, fileHash string) (int64, error) {
  30. ret, err := ctx.Exec("update ObjectRep set FileHash = ? where ObjectID = ?", fileHash, objectID)
  31. if err != nil {
  32. return 0, err
  33. }
  34. cnt, err := ret.RowsAffected()
  35. if err != nil {
  36. return 0, fmt.Errorf("get affected rows failed, err: %w", err)
  37. }
  38. return cnt, nil
  39. }
  40. func (db *ObjectRepDB) Delete(ctx SQLContext, objectID int64) error {
  41. _, err := ctx.Exec("delete from ObjectRep where ObjectID = ?", objectID)
  42. return err
  43. }
  44. func (db *ObjectRepDB) DeleteInPackage(ctx SQLContext, packageID int64) error {
  45. _, err := ctx.Exec("delete ObjectRep from ObjectRep inner join Object on ObjectRep.ObjectID = Object.ObjectID where PackageID = ?", packageID)
  46. return err
  47. }
  48. func (db *ObjectRepDB) GetFileMaxRepCount(ctx SQLContext, fileHash string) (int, error) {
  49. var maxRepCnt *int
  50. err := sqlx.Get(ctx, &maxRepCnt,
  51. "select json_extract(Redundancy, '$.info.repCount') from ObjectRep, Object, Package where FileHash = ? and"+
  52. " ObjectRep.ObjectID = Object.ObjectID and"+
  53. " Object.PackageID = Package.PackageID and"+
  54. " Package.State = ?", fileHash, consts.PackageStateNormal)
  55. if err == sql.ErrNoRows {
  56. return 0, nil
  57. }
  58. if err != nil {
  59. return 0, err
  60. }
  61. if maxRepCnt == nil {
  62. return 0, nil
  63. }
  64. return *maxRepCnt, err
  65. }
  66. func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ([]stgmod.ObjectRepData, error) {
  67. var tmpRets []struct {
  68. model.Object
  69. FileHash *string `db:"FileHash"`
  70. NodeIDs *string `db:"NodeIDs"`
  71. }
  72. err := sqlx.Select(ctx,
  73. &tmpRets,
  74. "select Object.*, ObjectRep.FileHash, group_concat(NodeID) as NodeIDs from Object"+
  75. " left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+
  76. " left join Cache on ObjectRep.FileHash = Cache.FileHash"+
  77. " where PackageID = ? group by Object.ObjectID order by Object.ObjectID asc",
  78. packageID,
  79. )
  80. if err != nil {
  81. return nil, err
  82. }
  83. rets := make([]stgmod.ObjectRepData, 0, len(tmpRets))
  84. for _, tmp := range tmpRets {
  85. var repData stgmod.ObjectRepData
  86. repData.Object = tmp.Object
  87. if tmp.FileHash != nil {
  88. repData.FileHash = *tmp.FileHash
  89. }
  90. if tmp.NodeIDs != nil {
  91. repData.NodeIDs = splitIDStringUnsafe(*tmp.NodeIDs)
  92. }
  93. rets = append(rets, repData)
  94. }
  95. return rets, nil
  96. }
  97. func (db *ObjectRepDB) GetPackageObjectCacheInfos(ctx SQLContext, packageID int64) ([]stgsdk.ObjectCacheInfo, error) {
  98. var tmpRet []struct {
  99. stgsdk.Object
  100. FileHash string `db:"FileHash"`
  101. }
  102. err := sqlx.Select(ctx, &tmpRet, "select Object.*, ObjectRep.FileHash from Object"+
  103. " left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+
  104. " where Object.PackageID = ? order by Object.ObjectID asc", packageID)
  105. if err != nil {
  106. return nil, err
  107. }
  108. ret := make([]stgsdk.ObjectCacheInfo, len(tmpRet))
  109. for i, r := range tmpRet {
  110. ret[i] = stgsdk.NewObjectCacheInfo(r.Object, r.FileHash)
  111. }
  112. return ret, nil
  113. }
  114. // 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
  115. // 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
  116. func splitIDStringUnsafe(idStr string) []int64 {
  117. idStrs := strings.Split(idStr, ",")
  118. ids := make([]int64, 0, len(idStrs))
  119. for _, str := range idStrs {
  120. // 假设传入的ID是正确的数字格式
  121. id, _ := strconv.ParseInt(str, 10, 64)
  122. ids = append(ids, id)
  123. }
  124. return ids
  125. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。