You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 11 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package db
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  9. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  10. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  11. )
  12. type ObjectDB struct {
  13. *DB
  14. }
  15. func (db *DB) Object() *ObjectDB {
  16. return &ObjectDB{DB: db}
  17. }
  18. func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) {
  19. var ret model.TempObject
  20. err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID)
  21. return ret.ToObject(), err
  22. }
  23. func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]model.Object, error) {
  24. if len(objectIDs) == 0 {
  25. return nil, nil
  26. }
  27. // TODO In语句
  28. stmt, args, err := sqlx.In("select * from Object where ObjectID in (?) order by ObjectID asc", objectIDs)
  29. if err != nil {
  30. return nil, err
  31. }
  32. stmt = ctx.Rebind(stmt)
  33. objs := make([]model.TempObject, 0, len(objectIDs))
  34. err = sqlx.Select(ctx, &objs, stmt, args...)
  35. if err != nil {
  36. return nil, err
  37. }
  38. return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
  39. }
  40. func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
  41. if len(pathes) == 0 {
  42. return nil, nil
  43. }
  44. // TODO In语句
  45. stmt, args, err := sqlx.In("select * from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes)
  46. if err != nil {
  47. return nil, err
  48. }
  49. stmt = ctx.Rebind(stmt)
  50. objs := make([]model.TempObject, 0, len(pathes))
  51. err = sqlx.Select(ctx, &objs, stmt, args...)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
  56. }
  57. func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
  58. sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime) values(?,?,?,?,?,?,?)"
  59. ret, err := ctx.Exec(sql, obj.PackageID, obj.Path, obj.Size, obj.FileHash, obj.Redundancy, obj.UpdateTime, obj.UpdateTime)
  60. if err != nil {
  61. return 0, fmt.Errorf("insert object failed, err: %w", err)
  62. }
  63. objectID, err := ret.LastInsertId()
  64. if err != nil {
  65. return 0, fmt.Errorf("get id of inserted object failed, err: %w", err)
  66. }
  67. return cdssdk.ObjectID(objectID), nil
  68. }
  69. // 可以用于批量创建或者更新记录。
  70. // 用于创建时,需要额外检查PackageID+Path的唯一性。
  71. // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。
  72. func (db *ObjectDB) BatchUpsertByPackagePath(ctx SQLContext, objs []cdssdk.Object) error {
  73. if len(objs) == 0 {
  74. return nil
  75. }
  76. sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" +
  77. " values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" +
  78. " on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime"
  79. return BatchNamedExec(ctx, sql, 7, objs, nil)
  80. }
  81. func (db *ObjectDB) BatchUpert(ctx SQLContext, objs []cdssdk.Object) error {
  82. if len(objs) == 0 {
  83. return nil
  84. }
  85. sql := "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" +
  86. " values(:ObjectID, :PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" +
  87. " on duplicate key update PackageID = new.PackageID, Path = new.Path, Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime"
  88. return BatchNamedExec(ctx, sql, 8, objs, nil)
  89. }
  90. func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) {
  91. var ret []model.TempObject
  92. err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
  93. return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err
  94. }
  95. func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
  96. var objs []model.TempObject
  97. err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
  98. if err != nil {
  99. return nil, fmt.Errorf("getting objects: %w", err)
  100. }
  101. rets := make([]stgmod.ObjectDetail, 0, len(objs))
  102. var allBlocks []stgmod.ObjectBlock
  103. err = sqlx.Select(ctx, &allBlocks, "select ObjectBlock.* from ObjectBlock, Object where PackageID = ? and ObjectBlock.ObjectID = Object.ObjectID order by ObjectBlock.ObjectID, `Index` asc", packageID)
  104. if err != nil {
  105. return nil, fmt.Errorf("getting all object blocks: %w", err)
  106. }
  107. var allPinnedObjs []cdssdk.PinnedObject
  108. err = sqlx.Select(ctx, &allPinnedObjs, "select PinnedObject.* from PinnedObject, Object where PackageID = ? and PinnedObject.ObjectID = Object.ObjectID order by PinnedObject.ObjectID", packageID)
  109. if err != nil {
  110. return nil, fmt.Errorf("getting all pinned objects: %w", err)
  111. }
  112. blksCur := 0
  113. pinnedsCur := 0
  114. for _, temp := range objs {
  115. detail := stgmod.ObjectDetail{
  116. Object: temp.ToObject(),
  117. }
  118. // 1. 查询Object和ObjectBlock时均按照ObjectID升序排序
  119. // 2. ObjectBlock结果集中的不同ObjectID数只会比Object结果集的少
  120. // 因此在两个结果集上同时从头开始遍历时,如果两边的ObjectID字段不同,那么一定是ObjectBlock这边的ObjectID > Object的ObjectID,
  121. // 此时让Object的遍历游标前进,直到两边的ObjectID再次相等
  122. for ; blksCur < len(allBlocks); blksCur++ {
  123. if allBlocks[blksCur].ObjectID != temp.ObjectID {
  124. break
  125. }
  126. detail.Blocks = append(detail.Blocks, allBlocks[blksCur])
  127. }
  128. for ; pinnedsCur < len(allPinnedObjs); pinnedsCur++ {
  129. if allPinnedObjs[pinnedsCur].ObjectID != temp.ObjectID {
  130. break
  131. }
  132. detail.PinnedAt = append(detail.PinnedAt, allPinnedObjs[pinnedsCur].NodeID)
  133. }
  134. rets = append(rets, detail)
  135. }
  136. return rets, nil
  137. }
  138. func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) {
  139. if len(adds) == 0 {
  140. return nil, nil
  141. }
  142. objs := make([]cdssdk.Object, 0, len(adds))
  143. for _, add := range adds {
  144. objs = append(objs, cdssdk.Object{
  145. PackageID: packageID,
  146. Path: add.Path,
  147. Size: add.Size,
  148. FileHash: add.FileHash,
  149. Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
  150. CreateTime: add.UploadTime,
  151. UpdateTime: add.UploadTime,
  152. })
  153. }
  154. err := db.BatchUpsertByPackagePath(ctx, objs)
  155. if err != nil {
  156. return nil, fmt.Errorf("batch create or update objects: %w", err)
  157. }
  158. pathes := make([]string, 0, len(adds))
  159. for _, add := range adds {
  160. pathes = append(pathes, add.Path)
  161. }
  162. // 这里可以不用检查查询结果是否与pathes的数量相同
  163. addedObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
  164. if err != nil {
  165. return nil, fmt.Errorf("batch get object ids: %w", err)
  166. }
  167. addedObjIDs := make([]cdssdk.ObjectID, len(addedObjs))
  168. for i := range addedObjs {
  169. addedObjIDs[i] = addedObjs[i].ObjectID
  170. }
  171. err = db.ObjectBlock().BatchDeleteByObjectID(ctx, addedObjIDs)
  172. if err != nil {
  173. return nil, fmt.Errorf("batch delete object blocks: %w", err)
  174. }
  175. err = db.PinnedObject().BatchDeleteByObjectID(ctx, addedObjIDs)
  176. if err != nil {
  177. return nil, fmt.Errorf("batch delete pinned objects: %w", err)
  178. }
  179. objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
  180. for i, add := range adds {
  181. objBlocks = append(objBlocks, stgmod.ObjectBlock{
  182. ObjectID: addedObjIDs[i],
  183. Index: 0,
  184. NodeID: add.NodeID,
  185. FileHash: add.FileHash,
  186. })
  187. }
  188. err = db.ObjectBlock().BatchCreate(ctx, objBlocks)
  189. if err != nil {
  190. return nil, fmt.Errorf("batch create object blocks: %w", err)
  191. }
  192. caches := make([]model.Cache, 0, len(adds))
  193. for _, add := range adds {
  194. caches = append(caches, model.Cache{
  195. FileHash: add.FileHash,
  196. NodeID: add.NodeID,
  197. CreateTime: time.Now(),
  198. Priority: 0,
  199. })
  200. }
  201. err = db.Cache().BatchCreate(ctx, caches)
  202. if err != nil {
  203. return nil, fmt.Errorf("batch create caches: %w", err)
  204. }
  205. return addedObjs, nil
  206. }
  207. func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error {
  208. if len(objs) == 0 {
  209. return nil
  210. }
  211. nowTime := time.Now()
  212. objIDs := make([]cdssdk.ObjectID, 0, len(objs))
  213. dummyObjs := make([]cdssdk.Object, 0, len(objs))
  214. for _, obj := range objs {
  215. objIDs = append(objIDs, obj.ObjectID)
  216. dummyObjs = append(dummyObjs, cdssdk.Object{
  217. ObjectID: obj.ObjectID,
  218. Redundancy: obj.Redundancy,
  219. CreateTime: nowTime,
  220. UpdateTime: nowTime,
  221. })
  222. }
  223. // 目前只能使用这种方式来同时更新大量数据
  224. err := BatchNamedExec(ctx,
  225. "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime)"+
  226. " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :CreateTime, :UpdateTime) as new"+
  227. " on duplicate key update Redundancy=new.Redundancy", 8, dummyObjs, nil)
  228. if err != nil {
  229. return fmt.Errorf("batch update object redundancy: %w", err)
  230. }
  231. // 删除原本所有的编码块记录,重新添加
  232. err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
  233. if err != nil {
  234. return fmt.Errorf("batch delete object blocks: %w", err)
  235. }
  236. // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
  237. err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
  238. if err != nil {
  239. return fmt.Errorf("batch delete pinned object: %w", err)
  240. }
  241. blocks := make([]stgmod.ObjectBlock, 0, len(objs))
  242. for _, obj := range objs {
  243. blocks = append(blocks, obj.Blocks...)
  244. }
  245. err = db.ObjectBlock().BatchCreate(ctx, blocks)
  246. if err != nil {
  247. return fmt.Errorf("batch create object blocks: %w", err)
  248. }
  249. caches := make([]model.Cache, 0, len(objs))
  250. for _, obj := range objs {
  251. for _, blk := range obj.Blocks {
  252. caches = append(caches, model.Cache{
  253. FileHash: blk.FileHash,
  254. NodeID: blk.NodeID,
  255. CreateTime: time.Now(),
  256. Priority: 0,
  257. })
  258. }
  259. }
  260. err = db.Cache().BatchCreate(ctx, caches)
  261. if err != nil {
  262. return fmt.Errorf("batch create object caches: %w", err)
  263. }
  264. pinneds := make([]cdssdk.PinnedObject, 0, len(objs))
  265. for _, obj := range objs {
  266. for _, p := range obj.PinnedAt {
  267. pinneds = append(pinneds, cdssdk.PinnedObject{
  268. ObjectID: obj.ObjectID,
  269. NodeID: p,
  270. CreateTime: time.Now(),
  271. })
  272. }
  273. }
  274. err = db.PinnedObject().BatchTryCreate(ctx, pinneds)
  275. if err != nil {
  276. return fmt.Errorf("batch create pinned objects: %w", err)
  277. }
  278. return nil
  279. }
  280. func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
  281. if len(ids) == 0 {
  282. return nil
  283. }
  284. query, args, err := sqlx.In("delete from Object where ObjectID in (?)", ids)
  285. if err != nil {
  286. return err
  287. }
  288. _, err = ctx.Exec(query, args...)
  289. return err
  290. }
  291. func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
  292. _, err := ctx.Exec("delete from Object where PackageID = ?", packageID)
  293. return err
  294. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。