package db import ( "fmt" "github.com/jmoiron/sqlx" "github.com/samber/lo" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) type ObjectDB struct { *DB } func (db *DB) Object() *ObjectDB { return &ObjectDB{DB: db} } func (db *ObjectDB) GetByID(ctx SQLContext, objectID int64) (model.Object, error) { var ret model.Object err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID) return ret, err } func (db *ObjectDB) Create(ctx SQLContext, packageID int64, path string, size int64) (int64, error) { sql := "insert into Object(PackageID, Path, Size) values(?,?,?)" ret, err := ctx.Exec(sql, packageID, path, size) if err != nil { return 0, fmt.Errorf("insert object failed, err: %w", err) } objectID, err := ret.LastInsertId() if err != nil { return 0, fmt.Errorf("get id of inserted object failed, err: %w", err) } return objectID, nil } // 创建或者更新记录,返回值true代表是创建,false代表是更新 func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID int64, path string, size int64) (int64, bool, error) { sql := "insert into Object(PackageID, Path, Size) values(?,?,?) on duplicate key update Size = ?" ret, err := ctx.Exec(sql, packageID, path, size, size) if err != nil { return 0, false, fmt.Errorf("insert object failed, err: %w", err) } affs, err := ret.RowsAffected() if err != nil { return 0, false, fmt.Errorf("getting affected rows: %w", err) } // 影响行数为1时是插入,为2时是更新 if affs == 1 { objectID, err := ret.LastInsertId() if err != nil { return 0, false, fmt.Errorf("get id of inserted object failed, err: %w", err) } return objectID, true, nil } var objID int64 if err = sqlx.Get(ctx, &objID, "select ObjectID from Object where PackageID = ? and Path = ?", packageID, path); err != nil { return 0, false, fmt.Errorf("getting object id: %w", err) } return objID, false, nil } func (db *ObjectDB) UpdateRepObject(ctx SQLContext, objectID int64, fileSize int64, nodeIDs []int64, fileHash string) error { _, err := db.UpdateFileInfo(ctx, objectID, fileSize) if err != nil { if err != nil { return fmt.Errorf("update rep object failed, err: %w", err) } } objRep, err := db.ObjectRep().GetByID(ctx, objectID) if err != nil { return fmt.Errorf("get object rep failed, err: %w", err) } // 如果新文件与旧文件的Hash不同,则需要更新关联的FileHash,重新插入Cache记录 if objRep.FileHash != fileHash { _, err := db.ObjectRep().Update(ctx, objectID, fileHash) if err != nil { return fmt.Errorf("update rep object file hash failed, err: %w", err) } for _, nodeID := range nodeIDs { err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority = 0 if err != nil { return fmt.Errorf("create cache failed, err: %w", err) } } } else { // 如果相同,则只增加Cache中不存在的记录 cachedNodes, err := db.Cache().GetCachingFileNodes(ctx, fileHash) if err != nil { return fmt.Errorf("find caching file nodes failed, err: %w", err) } // 筛选出不在cachedNodes中的id newNodeIDs := lo.Filter(nodeIDs, func(id int64, index int) bool { return lo.NoneBy(cachedNodes, func(node model.Node) bool { return node.NodeID == id }) }) for _, nodeID := range newNodeIDs { err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority if err != nil { return fmt.Errorf("create cache failed, err: %w", err) } } } return nil } func (*ObjectDB) BatchGetAllEcObjectIDs(ctx SQLContext, start int, count int) ([]int64, error) { var ret []int64 rep := "rep" err := sqlx.Select(ctx, &ret, "SELECT ObjectID FROM object where Redundancy != ? limit ?, ?", rep, start, count) return ret, err } func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID int64, fileSize int64) (bool, error) { ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID) if err != nil { return false, err } cnt, err := ret.RowsAffected() if err != nil { return false, fmt.Errorf("get affected rows failed, err: %w", err) } return cnt > 0, nil } func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID int64) ([]model.Object, error) { var ret []model.Object err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID) return ret, err } func (db *ObjectDB) BatchAddRep(ctx SQLContext, packageID int64, objs []coormq.AddRepObjectInfo) ([]int64, error) { var objIDs []int64 for _, obj := range objs { // 创建对象的记录 objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size) if err != nil { return nil, fmt.Errorf("creating object: %w", err) } objIDs = append(objIDs, objID) if isCreate { if err := db.createRep(ctx, objID, obj); err != nil { return nil, err } } else { if err := db.updateRep(ctx, objID, obj); err != nil { return nil, err } } } return objIDs, nil } func (db *ObjectDB) createRep(ctx SQLContext, objID int64, obj coormq.AddRepObjectInfo) error { // 创建对象副本的记录 if err := db.ObjectRep().Create(ctx, objID, obj.FileHash); err != nil { return fmt.Errorf("creating object rep: %w", err) } // 创建缓存记录 priority := 0 //优先级暂时设置为0 for _, nodeID := range obj.NodeIDs { if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, priority); err != nil { return fmt.Errorf("creating cache: %w", err) } } return nil } func (db *ObjectDB) updateRep(ctx SQLContext, objID int64, obj coormq.AddRepObjectInfo) error { objRep, err := db.ObjectRep().GetByID(ctx, objID) if err != nil { return fmt.Errorf("getting object rep: %w", err) } // 如果新文件与旧文件的Hash不同,则需要更新关联的FileHash,重新插入Cache记录 if objRep.FileHash != obj.FileHash { _, err := db.ObjectRep().Update(ctx, objID, obj.FileHash) if err != nil { return fmt.Errorf("updating rep object file hash: %w", err) } for _, nodeID := range obj.NodeIDs { if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, 0); err != nil { return fmt.Errorf("creating cache: %w", err) } } } else { // 如果相同,则只增加Cache中不存在的记录 cachedNodes, err := db.Cache().GetCachingFileNodes(ctx, obj.FileHash) if err != nil { return fmt.Errorf("finding caching file nodes: %w", err) } // 筛选出不在cachedNodes中的id newNodeIDs := lo.Filter(obj.NodeIDs, func(id int64, index int) bool { return lo.NoneBy(cachedNodes, func(node model.Node) bool { return node.NodeID == id }) }) for _, nodeID := range newNodeIDs { if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, 0); err != nil { return fmt.Errorf("creating cache: %w", err) } } } return nil } func (db *ObjectDB) BatchAddEC(ctx SQLContext, packageID int64, objs []coormq.AddECObjectInfo) ([]int64, error) { objIDs := make([]int64, 0, len(objs)) for _, obj := range objs { // 创建对象的记录 objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size) if err != nil { return nil, fmt.Errorf("creating object: %w", err) } objIDs = append(objIDs, objID) if !isCreate { // 删除原本所有的编码块记录,重新添加 if err = db.ObjectBlock().DeleteObjectAll(ctx, objID); err != nil { return nil, fmt.Errorf("deleting all object block: %w", err) } } // 创建编码块的记录 for i := 0; i < len(obj.FileHashes); i++ { err := db.ObjectBlock().Create(ctx, objID, i, obj.FileHashes[i]) if err != nil { return nil, fmt.Errorf("creating object block: %w", err) } } // 创建缓存记录 priority := 0 //优先级暂时设置为0 for i, nodeID := range obj.NodeIDs { err = db.Cache().CreatePinned(ctx, obj.FileHashes[i], nodeID, priority) if err != nil { return nil, fmt.Errorf("creating cache: %w", err) } } } return objIDs, nil } func (*ObjectDB) BatchDelete(ctx SQLContext, ids []int64) error { _, err := ctx.Exec("delete from Object where ObjectID in (?)", ids) return err } func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID int64) error { _, err := ctx.Exec("delete from Object where PackageID = ?", packageID) return err }