| @@ -27,6 +27,48 @@ func NewObjectDetail(object cdssdk.Object, pinnedAt []cdssdk.NodeID, blocks []Ob | |||
| } | |||
| } | |||
| func DetailsFromObjects(objects []cdssdk.Object) []ObjectDetail { | |||
| details := make([]ObjectDetail, len(objects)) | |||
| for i, object := range objects { | |||
| details[i] = ObjectDetail{ | |||
| Object: object, | |||
| } | |||
| } | |||
| return details | |||
| } | |||
| // 将blocks放到对应的object中。要求objs和blocks都按ObjectID升序 | |||
| func DetailsFillObjectBlocks(objs []ObjectDetail, blocks []ObjectBlock) { | |||
| blksCur := 0 | |||
| for i := range objs { | |||
| obj := &objs[i] | |||
| // 1. 查询Object和ObjectBlock时均按照ObjectID升序排序 | |||
| // 2. ObjectBlock结果集中的不同ObjectID数只会比Object结果集的少 | |||
| // 因此在两个结果集上同时从头开始遍历时,如果两边的ObjectID字段不同,那么一定是ObjectBlock这边的ObjectID > Object的ObjectID, | |||
| // 此时让Object的遍历游标前进,直到两边的ObjectID再次相等 | |||
| for ; blksCur < len(blocks); blksCur++ { | |||
| if blocks[blksCur].ObjectID != obj.Object.ObjectID { | |||
| break | |||
| } | |||
| obj.Blocks = append(obj.Blocks, blocks[blksCur]) | |||
| } | |||
| } | |||
| } | |||
| // 将pinnedAt放到对应的object中。要求objs和pinnedAt都按ObjectID升序 | |||
| func DetailsFillPinnedAt(objs []ObjectDetail, pinnedAt []cdssdk.PinnedObject) { | |||
| pinnedCur := 0 | |||
| for i := range objs { | |||
| obj := &objs[i] | |||
| for ; pinnedCur < len(pinnedAt); pinnedCur++ { | |||
| if pinnedAt[pinnedCur].ObjectID != obj.Object.ObjectID { | |||
| break | |||
| } | |||
| obj.PinnedAt = append(obj.PinnedAt, pinnedAt[pinnedCur].NodeID) | |||
| } | |||
| } | |||
| } | |||
| type GrouppedObjectBlock struct { | |||
| ObjectID cdssdk.ObjectID | |||
| Index int | |||
| @@ -58,3 +100,17 @@ type LocalMachineInfo struct { | |||
| LocalIP string `json:"localIP"` | |||
| LocationID cdssdk.LocationID `json:"locationID"` | |||
| } | |||
| type PackageAccessStat struct { | |||
| PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"` | |||
| NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` | |||
| Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 | |||
| Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 | |||
| } | |||
| type ObjectAccessStat struct { | |||
| ObjectID cdssdk.ObjectID `db:"ObjectID" json:"objectID"` | |||
| NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` | |||
| Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 | |||
| Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 | |||
| } | |||
| @@ -100,17 +100,3 @@ type Location struct { | |||
| LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"` | |||
| Name string `db:"Name" json:"name"` | |||
| } | |||
| type PackageAccessStat struct { | |||
| PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"` | |||
| NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` | |||
| Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 | |||
| Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 | |||
| } | |||
| type ObjectAccessStat struct { | |||
| ObjectID cdssdk.ObjectID `db:"ObjectID" json:"objectID"` | |||
| NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` | |||
| Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 | |||
| Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 | |||
| } | |||
| @@ -164,35 +164,31 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac | |||
| return nil, fmt.Errorf("getting all pinned objects: %w", err) | |||
| } | |||
| blksCur := 0 | |||
| pinnedsCur := 0 | |||
| for _, temp := range objs { | |||
| detail := stgmod.ObjectDetail{ | |||
| Object: temp.ToObject(), | |||
| details := make([]stgmod.ObjectDetail, len(objs)) | |||
| for i, obj := range objs { | |||
| details[i] = stgmod.ObjectDetail{ | |||
| Object: obj.ToObject(), | |||
| } | |||
| } | |||
| // 1. 查询Object和ObjectBlock时均按照ObjectID升序排序 | |||
| // 2. ObjectBlock结果集中的不同ObjectID数只会比Object结果集的少 | |||
| // 因此在两个结果集上同时从头开始遍历时,如果两边的ObjectID字段不同,那么一定是ObjectBlock这边的ObjectID > Object的ObjectID, | |||
| // 此时让Object的遍历游标前进,直到两边的ObjectID再次相等 | |||
| for ; blksCur < len(allBlocks); blksCur++ { | |||
| if allBlocks[blksCur].ObjectID != temp.ObjectID { | |||
| break | |||
| } | |||
| detail.Blocks = append(detail.Blocks, allBlocks[blksCur]) | |||
| } | |||
| stgmod.DetailsFillObjectBlocks(details, allBlocks) | |||
| stgmod.DetailsFillPinnedAt(details, allPinnedObjs) | |||
| return rets, nil | |||
| } | |||
| for ; pinnedsCur < len(allPinnedObjs); pinnedsCur++ { | |||
| if allPinnedObjs[pinnedsCur].ObjectID != temp.ObjectID { | |||
| break | |||
| } | |||
| detail.PinnedAt = append(detail.PinnedAt, allPinnedObjs[pinnedsCur].NodeID) | |||
| } | |||
| func (*ObjectDB) GetObjectsIfAnyBlockOnNode(ctx SQLContext, nodeID cdssdk.NodeID) ([]cdssdk.Object, error) { | |||
| var temps []model.TempObject | |||
| err := sqlx.Select(ctx, &temps, "select * from Object where ObjectID in (select ObjectID from ObjectBlock where NodeID = ?) order by ObjectID asc", nodeID) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("getting objects: %w", err) | |||
| } | |||
| rets = append(rets, detail) | |||
| objs := make([]cdssdk.Object, len(temps)) | |||
| for i := range temps { | |||
| objs[i] = temps[i].ToObject() | |||
| } | |||
| return rets, nil | |||
| return objs, nil | |||
| } | |||
| func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) { | |||
| @@ -3,7 +3,7 @@ package db | |||
| import ( | |||
| "github.com/jmoiron/sqlx" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" | |||
| stgmod "gitlink.org.cn/cloudream/storage/common/models" | |||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||
| ) | |||
| @@ -15,24 +15,24 @@ func (db *DB) ObjectAccessStat() *ObjectAccessStatDB { | |||
| return &ObjectAccessStatDB{db} | |||
| } | |||
| func (*ObjectAccessStatDB) Get(ctx SQLContext, objID cdssdk.ObjectID, nodeID cdssdk.NodeID) (model.ObjectAccessStat, error) { | |||
| var ret model.ObjectAccessStat | |||
| func (*ObjectAccessStatDB) Get(ctx SQLContext, objID cdssdk.ObjectID, nodeID cdssdk.NodeID) (stgmod.ObjectAccessStat, error) { | |||
| var ret stgmod.ObjectAccessStat | |||
| err := sqlx.Get(ctx, &ret, "select * from ObjectAccessStat where ObjectID=? and NodeID=?", objID, nodeID) | |||
| return ret, err | |||
| } | |||
| func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) ([]model.ObjectAccessStat, error) { | |||
| var ret []model.ObjectAccessStat | |||
| func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) { | |||
| var ret []stgmod.ObjectAccessStat | |||
| err := sqlx.Select(ctx, &ret, "select * from ObjectAccessStat where ObjectID=?", objID) | |||
| return ret, err | |||
| } | |||
| func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) ([]model.ObjectAccessStat, error) { | |||
| func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) { | |||
| if len(objIDs) == 0 { | |||
| return nil, nil | |||
| } | |||
| var ret []model.ObjectAccessStat | |||
| var ret []stgmod.ObjectAccessStat | |||
| stmt, args, err := sqlx.In("select * from ObjectAccessStat where ObjectID in (?)", objIDs) | |||
| if err != nil { | |||
| return ret, err | |||
| @@ -42,6 +42,21 @@ func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.Ob | |||
| return ret, err | |||
| } | |||
| func (*ObjectAccessStatDB) BatchGetByObjectIDOnNode(ctx SQLContext, objIDs []cdssdk.ObjectID, nodeID cdssdk.NodeID) ([]stgmod.ObjectAccessStat, error) { | |||
| if len(objIDs) == 0 { | |||
| return nil, nil | |||
| } | |||
| var ret []stgmod.ObjectAccessStat | |||
| stmt, args, err := sqlx.In("select * from ObjectAccessStat where ObjectID in (?) and NodeID=?", objIDs, nodeID) | |||
| if err != nil { | |||
| return ret, err | |||
| } | |||
| err = sqlx.Select(ctx, &ret, stmt, args...) | |||
| return ret, err | |||
| } | |||
| func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error { | |||
| if len(entries) == 0 { | |||
| return nil | |||
| @@ -3,7 +3,7 @@ package db | |||
| import ( | |||
| "github.com/jmoiron/sqlx" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" | |||
| stgmod "gitlink.org.cn/cloudream/storage/common/models" | |||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||
| ) | |||
| @@ -15,24 +15,24 @@ func (db *DB) PackageAccessStat() *PackageAccessStatDB { | |||
| return &PackageAccessStatDB{db} | |||
| } | |||
| func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, nodeID cdssdk.NodeID) (model.PackageAccessStat, error) { | |||
| var ret model.PackageAccessStat | |||
| func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, nodeID cdssdk.NodeID) (stgmod.PackageAccessStat, error) { | |||
| var ret stgmod.PackageAccessStat | |||
| err := sqlx.Get(ctx, &ret, "select * from PackageAccessStat where PackageID=? and NodeID=?", pkgID, nodeID) | |||
| return ret, err | |||
| } | |||
| func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]model.PackageAccessStat, error) { | |||
| var ret []model.PackageAccessStat | |||
| func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) { | |||
| var ret []stgmod.PackageAccessStat | |||
| err := sqlx.Select(ctx, &ret, "select * from PackageAccessStat where PackageID=?", pkgID) | |||
| return ret, err | |||
| } | |||
| func (*PackageAccessStatDB) BatchGetByPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) ([]model.PackageAccessStat, error) { | |||
| func (*PackageAccessStatDB) BatchGetByPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) { | |||
| if len(pkgIDs) == 0 { | |||
| return nil, nil | |||
| } | |||
| var ret []model.PackageAccessStat | |||
| var ret []stgmod.PackageAccessStat | |||
| stmt, args, err := sqlx.In("select * from PackageAccessStat where PackageID in (?)", pkgIDs) | |||
| if err != nil { | |||
| return nil, err | |||
| @@ -8,6 +8,8 @@ import ( | |||
| type Service = distlock.Service | |||
| type Mutex = distlock.Mutex | |||
| func NewService(cfg *distlock.Config) (*distlock.Service, error) { | |||
| srv, err := distlock.NewService(cfg, initProviders()) | |||
| if err != nil { | |||
| @@ -26,6 +26,9 @@ func (e *UpdateAllPackageAccessStatAmount) Execute(ctx ExecuteContext) { | |||
| e.todayUpdated = false | |||
| return | |||
| } | |||
| if e.todayUpdated { | |||
| return | |||
| } | |||
| e.todayUpdated = true | |||
| ctx.Args.EventExecutor.Post(evt.NewUpdatePackageAccessStatAmount(event.NewUpdatePackageAccessStatAmount(nil))) | |||