You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 11 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. package db2
  2. import (
  3. "fmt"
  4. "time"
  5. "gorm.io/gorm/clause"
  6. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  7. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  8. "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model"
  9. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  10. )
  11. type ObjectDB struct {
  12. *DB
  13. }
  14. func (db *DB) Object() *ObjectDB {
  15. return &ObjectDB{DB: db}
  16. }
  17. func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) {
  18. var ret cdssdk.Object
  19. err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error
  20. return ret, err
  21. }
  22. func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
  23. if len(objectIDs) == 0 {
  24. return make(map[cdssdk.ObjectID]bool), nil
  25. }
  26. var avaiIDs []cdssdk.ObjectID
  27. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Pluck("ObjectID", &avaiIDs).Error
  28. if err != nil {
  29. return nil, err
  30. }
  31. avaiIDMap := make(map[cdssdk.ObjectID]bool)
  32. for _, pkgID := range avaiIDs {
  33. avaiIDMap[pkgID] = true
  34. }
  35. return avaiIDMap, nil
  36. }
  37. func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]model.Object, error) {
  38. if len(objectIDs) == 0 {
  39. return nil, nil
  40. }
  41. var objs []cdssdk.Object
  42. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
  43. if err != nil {
  44. return nil, err
  45. }
  46. return objs, nil
  47. }
  48. func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
  49. if len(pathes) == 0 {
  50. return nil, nil
  51. }
  52. var objs []cdssdk.Object
  53. err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error
  54. if err != nil {
  55. return nil, err
  56. }
  57. return objs, nil
  58. }
  59. func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
  60. err := ctx.Table("Object").Create(&obj).Error
  61. if err != nil {
  62. return 0, fmt.Errorf("insert object failed, err: %w", err)
  63. }
  64. return obj.ObjectID, nil
  65. }
  66. // 批量创建对象,创建完成后会填充ObjectID。
  67. func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error {
  68. if len(*objs) == 0 {
  69. return nil
  70. }
  71. return ctx.Table("Object").Create(objs).Error
  72. }
  73. // 批量更新对象所有属性,objs中的对象必须包含ObjectID
  74. func (db *ObjectDB) BatchUpdate(ctx SQLContext, objs []cdssdk.Object) error {
  75. if len(objs) == 0 {
  76. return nil
  77. }
  78. return ctx.Clauses(clause.OnConflict{
  79. Columns: []clause.Column{{Name: "ObjectID"}},
  80. UpdateAll: true,
  81. }).Create(objs).Error
  82. }
  83. // 批量更新对象指定属性,objs中的对象只需设置需要更新的属性即可,但:
  84. // 1. 必须包含ObjectID
  85. // 2. 日期类型属性不能设置为0值
  86. func (db *ObjectDB) BatchUpdateColumns(ctx SQLContext, objs []cdssdk.Object, columns []string) error {
  87. if len(objs) == 0 {
  88. return nil
  89. }
  90. return ctx.Clauses(clause.OnConflict{
  91. Columns: []clause.Column{{Name: "ObjectID"}},
  92. DoUpdates: clause.AssignmentColumns(columns),
  93. }).Create(objs).Error
  94. }
  95. func (db *ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) {
  96. var ret []cdssdk.Object
  97. err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&ret).Error
  98. return ret, err
  99. }
  100. func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
  101. var objs []cdssdk.Object
  102. err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&objs).Error
  103. if err != nil {
  104. return nil, fmt.Errorf("getting objects: %w", err)
  105. }
  106. // 获取所有的 ObjectBlock
  107. var allBlocks []stgmod.ObjectBlock
  108. err = ctx.Table("ObjectBlock").
  109. Select("ObjectBlock.*").
  110. Joins("JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
  111. Where("Object.PackageID = ?", packageID).
  112. Order("ObjectBlock.ObjectID, `Index` ASC").
  113. Find(&allBlocks).Error
  114. if err != nil {
  115. return nil, fmt.Errorf("getting all object blocks: %w", err)
  116. }
  117. // 获取所有的 PinnedObject
  118. var allPinnedObjs []cdssdk.PinnedObject
  119. err = ctx.Table("PinnedObject").
  120. Select("PinnedObject.*").
  121. Joins("JOIN Object ON PinnedObject.ObjectID = Object.ObjectID").
  122. Where("Object.PackageID = ?", packageID).
  123. Order("PinnedObject.ObjectID").
  124. Find(&allPinnedObjs).Error
  125. if err != nil {
  126. return nil, fmt.Errorf("getting all pinned objects: %w", err)
  127. }
  128. details := make([]stgmod.ObjectDetail, len(objs))
  129. for i, obj := range objs {
  130. details[i] = stgmod.ObjectDetail{
  131. Object: obj,
  132. }
  133. }
  134. stgmod.DetailsFillObjectBlocks(details, allBlocks)
  135. stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
  136. return details, nil
  137. }
  138. func (db *ObjectDB) GetObjectsIfAnyBlockOnStorage(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) {
  139. var objs []cdssdk.Object
  140. err := ctx.Table("Object").Where("ObjectID IN (SELECT ObjectID FROM ObjectBlock WHERE StorageID = ?)", stgID).Order("ObjectID ASC").Find(&objs).Error
  141. if err != nil {
  142. return nil, fmt.Errorf("getting objects: %w", err)
  143. }
  144. return objs, nil
  145. }
  146. func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) {
  147. if len(adds) == 0 {
  148. return nil, nil
  149. }
  150. // 收集所有路径
  151. pathes := make([]string, 0, len(adds))
  152. for _, add := range adds {
  153. pathes = append(pathes, add.Path)
  154. }
  155. // 先查询要更新的对象,不存在也没关系
  156. existsObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
  157. if err != nil {
  158. return nil, fmt.Errorf("batch get object by path: %w", err)
  159. }
  160. existsObjsMap := make(map[string]cdssdk.Object)
  161. for _, obj := range existsObjs {
  162. existsObjsMap[obj.Path] = obj
  163. }
  164. var updatingObjs []cdssdk.Object
  165. var addingObjs []cdssdk.Object
  166. for i := range adds {
  167. o := cdssdk.Object{
  168. PackageID: packageID,
  169. Path: adds[i].Path,
  170. Size: adds[i].Size,
  171. FileHash: adds[i].FileHash,
  172. Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
  173. CreateTime: adds[i].UploadTime,
  174. UpdateTime: adds[i].UploadTime,
  175. }
  176. e, ok := existsObjsMap[adds[i].Path]
  177. if ok {
  178. o.ObjectID = e.ObjectID
  179. o.CreateTime = e.CreateTime
  180. updatingObjs = append(updatingObjs, o)
  181. } else {
  182. addingObjs = append(addingObjs, o)
  183. }
  184. }
  185. // 先进行更新
  186. err = db.BatchUpdate(ctx, updatingObjs)
  187. if err != nil {
  188. return nil, fmt.Errorf("batch update objects: %w", err)
  189. }
  190. // 再执行插入,Create函数插入后会填充ObjectID
  191. err = db.BatchCreate(ctx, &addingObjs)
  192. if err != nil {
  193. return nil, fmt.Errorf("batch create objects: %w", err)
  194. }
  195. // 按照add参数的顺序返回结果
  196. affectedObjsMp := make(map[string]cdssdk.Object)
  197. for _, o := range updatingObjs {
  198. affectedObjsMp[o.Path] = o
  199. }
  200. for _, o := range addingObjs {
  201. affectedObjsMp[o.Path] = o
  202. }
  203. affectedObjs := make([]cdssdk.Object, 0, len(affectedObjsMp))
  204. affectedObjIDs := make([]cdssdk.ObjectID, 0, len(affectedObjsMp))
  205. for i := range adds {
  206. obj := affectedObjsMp[adds[i].Path]
  207. affectedObjs = append(affectedObjs, obj)
  208. affectedObjIDs = append(affectedObjIDs, obj.ObjectID)
  209. }
  210. if len(affectedObjIDs) > 0 {
  211. // 批量删除 ObjectBlock
  212. if err := ctx.Table("ObjectBlock").Where("ObjectID IN ?", affectedObjIDs).Delete(&stgmod.ObjectBlock{}).Error; err != nil {
  213. return nil, fmt.Errorf("batch delete object blocks: %w", err)
  214. }
  215. // 批量删除 PinnedObject
  216. if err := ctx.Table("PinnedObject").Where("ObjectID IN ?", affectedObjIDs).Delete(&cdssdk.PinnedObject{}).Error; err != nil {
  217. return nil, fmt.Errorf("batch delete pinned objects: %w", err)
  218. }
  219. }
  220. // 创建 ObjectBlock
  221. objBlocks := make([]stgmod.ObjectBlock, len(adds))
  222. for i, add := range adds {
  223. objBlocks[i] = stgmod.ObjectBlock{
  224. ObjectID: affectedObjIDs[i],
  225. Index: 0,
  226. StorageID: add.StorageID,
  227. FileHash: add.FileHash,
  228. }
  229. }
  230. if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
  231. return nil, fmt.Errorf("batch create object blocks: %w", err)
  232. }
  233. // 创建 Cache
  234. caches := make([]model.Cache, len(adds))
  235. for i, add := range adds {
  236. caches[i] = model.Cache{
  237. FileHash: add.FileHash,
  238. StorageID: add.StorageID,
  239. CreateTime: time.Now(),
  240. Priority: 0,
  241. }
  242. }
  243. if err := db.Cache().BatchCreate(ctx, caches); err != nil {
  244. return nil, fmt.Errorf("batch create caches: %w", err)
  245. }
  246. return affectedObjs, nil
  247. }
  248. func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error {
  249. if len(objs) == 0 {
  250. return nil
  251. }
  252. nowTime := time.Now()
  253. objIDs := make([]cdssdk.ObjectID, 0, len(objs))
  254. dummyObjs := make([]cdssdk.Object, 0, len(objs))
  255. for _, obj := range objs {
  256. objIDs = append(objIDs, obj.ObjectID)
  257. dummyObjs = append(dummyObjs, cdssdk.Object{
  258. ObjectID: obj.ObjectID,
  259. Redundancy: obj.Redundancy,
  260. CreateTime: nowTime, // 实际不会更新,只因为不能是0值
  261. UpdateTime: nowTime,
  262. })
  263. }
  264. err := db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"Redundancy", "UpdateTime"})
  265. if err != nil {
  266. return fmt.Errorf("batch update object redundancy: %w", err)
  267. }
  268. // 删除原本所有的编码块记录,重新添加
  269. err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
  270. if err != nil {
  271. return fmt.Errorf("batch delete object blocks: %w", err)
  272. }
  273. // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
  274. err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
  275. if err != nil {
  276. return fmt.Errorf("batch delete pinned object: %w", err)
  277. }
  278. blocks := make([]stgmod.ObjectBlock, 0, len(objs))
  279. for _, obj := range objs {
  280. blocks = append(blocks, obj.Blocks...)
  281. }
  282. err = db.ObjectBlock().BatchCreate(ctx, blocks)
  283. if err != nil {
  284. return fmt.Errorf("batch create object blocks: %w", err)
  285. }
  286. caches := make([]model.Cache, 0, len(objs))
  287. for _, obj := range objs {
  288. for _, blk := range obj.Blocks {
  289. caches = append(caches, model.Cache{
  290. FileHash: blk.FileHash,
  291. StorageID: blk.StorageID,
  292. CreateTime: nowTime,
  293. Priority: 0,
  294. })
  295. }
  296. }
  297. err = db.Cache().BatchCreate(ctx, caches)
  298. if err != nil {
  299. return fmt.Errorf("batch create object caches: %w", err)
  300. }
  301. pinneds := make([]cdssdk.PinnedObject, 0, len(objs))
  302. for _, obj := range objs {
  303. for _, p := range obj.PinnedAt {
  304. pinneds = append(pinneds, cdssdk.PinnedObject{
  305. ObjectID: obj.ObjectID,
  306. StorageID: p,
  307. CreateTime: nowTime,
  308. })
  309. }
  310. }
  311. err = db.PinnedObject().BatchTryCreate(ctx, pinneds)
  312. if err != nil {
  313. return fmt.Errorf("batch create pinned objects: %w", err)
  314. }
  315. return nil
  316. }
  317. func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
  318. if len(ids) == 0 {
  319. return nil
  320. }
  321. return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error
  322. }
  323. func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
  324. return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error
  325. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。