You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 12 kB

1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package db2
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "gorm.io/gorm/clause"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  9. "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model"
  10. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  11. )
  12. type ObjectDB struct {
  13. *DB
  14. }
  15. func (db *DB) Object() *ObjectDB {
  16. return &ObjectDB{DB: db}
  17. }
  18. func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (cdssdk.Object, error) {
  19. var ret cdssdk.Object
  20. err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error
  21. return ret, err
  22. }
  23. func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) ([]cdssdk.Object, error) {
  24. var ret []cdssdk.Object
  25. err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Find(&ret).Error
  26. return ret, err
  27. }
  28. func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) {
  29. var ret []cdssdk.Object
  30. err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").Order("ObjectID ASC").Find(&ret).Error
  31. return ret, err
  32. }
  33. func (db *ObjectDB) GetCommonPrefixes(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]string, error) {
  34. var ret []string
  35. sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1
  36. prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)
  37. err := ctx.Table("Object").Select(prefixStatm+" as Prefix").
  38. Where("PackageID = ?", packageID).
  39. Where("Path like ?", pathPrefix+"%").
  40. Where(prefixStatm + " <> Path").
  41. Group("Prefix").Find(&ret).Error
  42. if err != nil {
  43. return nil, err
  44. }
  45. for i := range ret {
  46. ret[i] = ret[i] + cdssdk.ObjectPathSeparator
  47. }
  48. return ret, nil
  49. }
  50. func (db *ObjectDB) GetDirectChildren(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) {
  51. var ret []cdssdk.Object
  52. sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1
  53. prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)
  54. err := ctx.Table("Object").
  55. Where("PackageID = ?", packageID).
  56. Where("Path like ?", pathPrefix+"%").
  57. Where(prefixStatm + " = Path").
  58. Find(&ret).Error
  59. return ret, err
  60. }
  61. func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
  62. if len(objectIDs) == 0 {
  63. return make(map[cdssdk.ObjectID]bool), nil
  64. }
  65. var avaiIDs []cdssdk.ObjectID
  66. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Pluck("ObjectID", &avaiIDs).Error
  67. if err != nil {
  68. return nil, err
  69. }
  70. avaiIDMap := make(map[cdssdk.ObjectID]bool)
  71. for _, pkgID := range avaiIDs {
  72. avaiIDMap[pkgID] = true
  73. }
  74. return avaiIDMap, nil
  75. }
  76. func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.Object, error) {
  77. if len(objectIDs) == 0 {
  78. return nil, nil
  79. }
  80. var objs []cdssdk.Object
  81. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
  82. if err != nil {
  83. return nil, err
  84. }
  85. return objs, nil
  86. }
  87. func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
  88. if len(pathes) == 0 {
  89. return nil, nil
  90. }
  91. var objs []cdssdk.Object
  92. err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error
  93. if err != nil {
  94. return nil, err
  95. }
  96. return objs, nil
  97. }
  98. // 仅返回查询到的对象
  99. func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) {
  100. var objs []cdssdk.Object
  101. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
  102. if err != nil {
  103. return nil, err
  104. }
  105. // 获取所有的 ObjectBlock
  106. var allBlocks []stgmod.ObjectBlock
  107. err = ctx.Table("ObjectBlock").Where("ObjectID IN ?", objectIDs).Order("ObjectID, `Index` ASC").Find(&allBlocks).Error
  108. if err != nil {
  109. return nil, err
  110. }
  111. // 获取所有的 PinnedObject
  112. var allPinnedObjs []cdssdk.PinnedObject
  113. err = ctx.Table("PinnedObject").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&allPinnedObjs).Error
  114. if err != nil {
  115. return nil, err
  116. }
  117. details := make([]stgmod.ObjectDetail, len(objs))
  118. for i, obj := range objs {
  119. details[i] = stgmod.ObjectDetail{
  120. Object: obj,
  121. }
  122. }
  123. stgmod.DetailsFillObjectBlocks(details, allBlocks)
  124. stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
  125. return details, nil
  126. }
  127. func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
  128. err := ctx.Table("Object").Create(&obj).Error
  129. if err != nil {
  130. return 0, fmt.Errorf("insert object failed, err: %w", err)
  131. }
  132. return obj.ObjectID, nil
  133. }
  134. // 批量创建对象,创建完成后会填充ObjectID。
  135. func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error {
  136. if len(*objs) == 0 {
  137. return nil
  138. }
  139. return ctx.Table("Object").Create(objs).Error
  140. }
  141. // 批量更新对象所有属性,objs中的对象必须包含ObjectID
  142. func (db *ObjectDB) BatchUpdate(ctx SQLContext, objs []cdssdk.Object) error {
  143. if len(objs) == 0 {
  144. return nil
  145. }
  146. return ctx.Clauses(clause.OnConflict{
  147. Columns: []clause.Column{{Name: "ObjectID"}},
  148. UpdateAll: true,
  149. }).Create(objs).Error
  150. }
  151. // 批量更新对象指定属性,objs中的对象只需设置需要更新的属性即可,但:
  152. // 1. 必须包含ObjectID
  153. // 2. 日期类型属性不能设置为0值
  154. func (db *ObjectDB) BatchUpdateColumns(ctx SQLContext, objs []cdssdk.Object, columns []string) error {
  155. if len(objs) == 0 {
  156. return nil
  157. }
  158. return ctx.Clauses(clause.OnConflict{
  159. Columns: []clause.Column{{Name: "ObjectID"}},
  160. DoUpdates: clause.AssignmentColumns(columns),
  161. }).Create(objs).Error
  162. }
  163. func (db *ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]cdssdk.Object, error) {
  164. var ret []cdssdk.Object
  165. err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&ret).Error
  166. return ret, err
  167. }
  168. func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
  169. var objs []cdssdk.Object
  170. err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&objs).Error
  171. if err != nil {
  172. return nil, fmt.Errorf("getting objects: %w", err)
  173. }
  174. // 获取所有的 ObjectBlock
  175. var allBlocks []stgmod.ObjectBlock
  176. err = ctx.Table("ObjectBlock").
  177. Select("ObjectBlock.*").
  178. Joins("JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
  179. Where("Object.PackageID = ?", packageID).
  180. Order("ObjectBlock.ObjectID, `Index` ASC").
  181. Find(&allBlocks).Error
  182. if err != nil {
  183. return nil, fmt.Errorf("getting all object blocks: %w", err)
  184. }
  185. // 获取所有的 PinnedObject
  186. var allPinnedObjs []cdssdk.PinnedObject
  187. err = ctx.Table("PinnedObject").
  188. Select("PinnedObject.*").
  189. Joins("JOIN Object ON PinnedObject.ObjectID = Object.ObjectID").
  190. Where("Object.PackageID = ?", packageID).
  191. Order("PinnedObject.ObjectID").
  192. Find(&allPinnedObjs).Error
  193. if err != nil {
  194. return nil, fmt.Errorf("getting all pinned objects: %w", err)
  195. }
  196. details := make([]stgmod.ObjectDetail, len(objs))
  197. for i, obj := range objs {
  198. details[i] = stgmod.ObjectDetail{
  199. Object: obj,
  200. }
  201. }
  202. stgmod.DetailsFillObjectBlocks(details, allBlocks)
  203. stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
  204. return details, nil
  205. }
  206. func (db *ObjectDB) GetObjectsIfAnyBlockOnStorage(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) {
  207. var objs []cdssdk.Object
  208. err := ctx.Table("Object").Where("ObjectID IN (SELECT ObjectID FROM ObjectBlock WHERE StorageID = ?)", stgID).Order("ObjectID ASC").Find(&objs).Error
  209. if err != nil {
  210. return nil, fmt.Errorf("getting objects: %w", err)
  211. }
  212. return objs, nil
  213. }
  214. func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) {
  215. if len(adds) == 0 {
  216. return nil, nil
  217. }
  218. // 收集所有路径
  219. pathes := make([]string, 0, len(adds))
  220. for _, add := range adds {
  221. pathes = append(pathes, add.Path)
  222. }
  223. // 先查询要更新的对象,不存在也没关系
  224. existsObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
  225. if err != nil {
  226. return nil, fmt.Errorf("batch get object by path: %w", err)
  227. }
  228. existsObjsMap := make(map[string]cdssdk.Object)
  229. for _, obj := range existsObjs {
  230. existsObjsMap[obj.Path] = obj
  231. }
  232. var updatingObjs []cdssdk.Object
  233. var addingObjs []cdssdk.Object
  234. for i := range adds {
  235. o := cdssdk.Object{
  236. PackageID: packageID,
  237. Path: adds[i].Path,
  238. Size: adds[i].Size,
  239. FileHash: adds[i].FileHash,
  240. Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
  241. CreateTime: adds[i].UploadTime,
  242. UpdateTime: adds[i].UploadTime,
  243. }
  244. e, ok := existsObjsMap[adds[i].Path]
  245. if ok {
  246. o.ObjectID = e.ObjectID
  247. o.CreateTime = e.CreateTime
  248. updatingObjs = append(updatingObjs, o)
  249. } else {
  250. addingObjs = append(addingObjs, o)
  251. }
  252. }
  253. // 先进行更新
  254. err = db.BatchUpdate(ctx, updatingObjs)
  255. if err != nil {
  256. return nil, fmt.Errorf("batch update objects: %w", err)
  257. }
  258. // 再执行插入,Create函数插入后会填充ObjectID
  259. err = db.BatchCreate(ctx, &addingObjs)
  260. if err != nil {
  261. return nil, fmt.Errorf("batch create objects: %w", err)
  262. }
  263. // 按照add参数的顺序返回结果
  264. affectedObjsMp := make(map[string]cdssdk.Object)
  265. for _, o := range updatingObjs {
  266. affectedObjsMp[o.Path] = o
  267. }
  268. for _, o := range addingObjs {
  269. affectedObjsMp[o.Path] = o
  270. }
  271. affectedObjs := make([]cdssdk.Object, 0, len(affectedObjsMp))
  272. affectedObjIDs := make([]cdssdk.ObjectID, 0, len(affectedObjsMp))
  273. for i := range adds {
  274. obj := affectedObjsMp[adds[i].Path]
  275. affectedObjs = append(affectedObjs, obj)
  276. affectedObjIDs = append(affectedObjIDs, obj.ObjectID)
  277. }
  278. if len(affectedObjIDs) > 0 {
  279. // 批量删除 ObjectBlock
  280. if err := db.ObjectBlock().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil {
  281. return nil, fmt.Errorf("batch delete object blocks: %w", err)
  282. }
  283. // 批量删除 PinnedObject
  284. if err := db.PinnedObject().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil {
  285. return nil, fmt.Errorf("batch delete pinned objects: %w", err)
  286. }
  287. }
  288. // 创建 ObjectBlock
  289. objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
  290. for i, add := range adds {
  291. for _, stgID := range add.StorageIDs {
  292. objBlocks = append(objBlocks, stgmod.ObjectBlock{
  293. ObjectID: affectedObjIDs[i],
  294. Index: 0,
  295. StorageID: stgID,
  296. FileHash: add.FileHash,
  297. Size: add.Size,
  298. })
  299. }
  300. }
  301. if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
  302. return nil, fmt.Errorf("batch create object blocks: %w", err)
  303. }
  304. // 创建 Cache
  305. caches := make([]model.Cache, 0, len(adds))
  306. for _, add := range adds {
  307. for _, stgID := range add.StorageIDs {
  308. caches = append(caches, model.Cache{
  309. FileHash: add.FileHash,
  310. StorageID: stgID,
  311. CreateTime: time.Now(),
  312. Priority: 0,
  313. })
  314. }
  315. }
  316. if err := db.Cache().BatchCreate(ctx, caches); err != nil {
  317. return nil, fmt.Errorf("batch create caches: %w", err)
  318. }
  319. return affectedObjs, nil
  320. }
  321. func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
  322. if len(ids) == 0 {
  323. return nil
  324. }
  325. return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error
  326. }
  327. func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
  328. return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error
  329. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。