Browse Source

迁移代码

gitlink
Sydonian 8 months ago
parent
commit
f38fa2d73f
2 changed files with 77 additions and 232 deletions
  1. +77
    -76
      common/models/datamap/datamap.go
  2. +0
    -156
      common/models/models.go

common/models/datamap.go → common/models/datamap/datamap.go View File

@@ -1,12 +1,13 @@
package stgmod
package datamap

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/types"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

// 系统事件
@@ -67,9 +68,9 @@ func (s *SourceScanner) String() string {

type SourceHub struct {
serder.Metadata `union:"Hub"`
Type string `json:"type"`
HubID cdssdk.HubID `json:"hubID"`
HubName string `json:"hubName"`
Type string `json:"type"`
HubID cortypes.HubID `json:"hubID"`
HubName string `json:"hubName"`
}

func (s *SourceHub) GetSourceType() string {
@@ -119,8 +120,8 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEven
// 新增Hub的事件
type BodyNewHub struct {
serder.Metadata `union:"NewHub"`
Type string `json:"type"`
Info cdssdk.Hub `json:"info"`
Type string `json:"type"`
Info cortypes.Hub `json:"info"`
}

func (b *BodyNewHub) GetBodyType() string {
@@ -134,8 +135,8 @@ func (b *BodyNewHub) OnUnionSerializing() {
// Hub信息更新的事件
type BodyHubUpdated struct {
serder.Metadata `union:"HubUpdated"`
Type string `json:"type"`
Info cdssdk.Hub `json:"info"`
Type string `json:"type"`
Info cortypes.Hub `json:"info"`
}

func (b *BodyHubUpdated) GetBodyType() string {
@@ -149,8 +150,8 @@ func (b *BodyHubUpdated) OnUnionSerializing() {
// Hub删除的事件
type BodyHubDeleted struct {
serder.Metadata `union:"HubDeleted"`
Type string `json:"type"`
HubID cdssdk.HubID `json:"hubID"`
Type string `json:"type"`
HubID cortypes.HubID `json:"hubID"`
}

func (b *BodyHubDeleted) GetBodyType() string {
@@ -164,8 +165,8 @@ func (b *BodyHubDeleted) OnUnionSerializing() {
// 新增Storage的事件
type BodyNewStorage struct {
serder.Metadata `union:"NewStorage"`
Info cdssdk.Storage `json:"info"`
Type string `json:"type"`
Info clitypes.Storage `json:"info"`
Type string `json:"type"`
}

func (b *BodyNewStorage) GetBodyType() string {
@@ -179,8 +180,8 @@ func (b *BodyNewStorage) OnUnionSerializing() {
// Storage信息更新的事件
type BodyStorageUpdated struct {
serder.Metadata `union:"StorageUpdated"`
Type string `json:"type"`
Info cdssdk.Storage `json:"info"`
Type string `json:"type"`
Info clitypes.Storage `json:"info"`
}

func (b *BodyStorageUpdated) GetBodyType() string {
@@ -194,8 +195,8 @@ func (b *BodyStorageUpdated) OnUnionSerializing() {
// Storage删除的事件
type BodyStorageDeleted struct {
serder.Metadata `union:"StorageDeleted"`
Type string `json:"type"`
StorageID cdssdk.StorageID `json:"storageID"`
Type string `json:"type"`
StorageID clitypes.StorageID `json:"storageID"`
}

func (b *BodyStorageDeleted) GetBodyType() string {
@@ -209,9 +210,9 @@ func (b *BodyStorageDeleted) OnUnionSerializing() {
// Storage统计信息的事件
type BodyStorageStats struct {
serder.Metadata `union:"StorageStats"`
Type string `json:"type"`
StorageID cdssdk.StorageID `json:"storageID"`
DataCount int64 `json:"dataCount"`
Type string `json:"type"`
StorageID clitypes.StorageID `json:"storageID"`
DataCount int64 `json:"dataCount"`
}

func (b *BodyStorageStats) GetBodyType() string {
@@ -225,12 +226,12 @@ func (b *BodyStorageStats) OnUnionSerializing() {
// Hub数据传输统计信息的事件
type BodyHubTransferStats struct {
serder.Metadata `union:"HubTransferStats"`
Type string `json:"type"`
SourceHubID cdssdk.HubID `json:"sourceHubID"`
TargetHubID cdssdk.HubID `json:"targetHubID"`
Send DataTrans `json:"send"`
StartTimestamp time.Time `json:"startTimestamp"`
EndTimestamp time.Time `json:"endTimestamp"`
Type string `json:"type"`
SourceHubID cortypes.HubID `json:"sourceHubID"`
TargetHubID cortypes.HubID `json:"targetHubID"`
Send DataTrans `json:"send"`
StartTimestamp time.Time `json:"startTimestamp"`
EndTimestamp time.Time `json:"endTimestamp"`
}

func (b *BodyHubTransferStats) GetBodyType() string {
@@ -253,13 +254,13 @@ type DataTrans struct {
// Hub和Storage数据传输统计信息的事件
type BodyHubStorageTransferStats struct {
serder.Metadata `union:"HubStorageTransferStats"`
Type string `json:"type"`
HubID cdssdk.HubID `json:"hubID"`
StorageID cdssdk.StorageID `json:"storageID"`
Send DataTrans `json:"send"`
Receive DataTrans `json:"receive"`
StartTimestamp time.Time `json:"startTimestamp"`
EndTimestamp time.Time `json:"endTimestamp"`
Type string `json:"type"`
HubID cortypes.HubID `json:"hubID"`
StorageID clitypes.StorageID `json:"storageID"`
Send DataTrans `json:"send"`
Receive DataTrans `json:"receive"`
StartTimestamp time.Time `json:"startTimestamp"`
EndTimestamp time.Time `json:"endTimestamp"`
}

func (b *BodyHubStorageTransferStats) GetBodyType() string {
@@ -273,10 +274,10 @@ func (b *BodyHubStorageTransferStats) OnUnionSerializing() {
// 块传输的事件
type BodyBlockTransfer struct {
serder.Metadata `union:"BlockTransfer"`
Type string `json:"type"`
ObjectID cdssdk.ObjectID `json:"objectID"`
PackageID cdssdk.PackageID `json:"packageID"`
BlockChanges []BlockChange `json:"blockChanges"`
Type string `json:"type"`
ObjectID clitypes.ObjectID `json:"objectID"`
PackageID clitypes.PackageID `json:"packageID"`
BlockChanges []BlockChange `json:"blockChanges"`
}

func (b *BodyBlockTransfer) GetBodyType() string {
@@ -305,24 +306,24 @@ const (
)

type Block struct {
BlockType string `json:"blockType"`
Index int `json:"index"`
StorageID cdssdk.StorageID `json:"storageID"`
BlockType string `json:"blockType"`
Index int `json:"index"`
StorageID clitypes.StorageID `json:"storageID"`
}
type DataTransfer struct {
SourceStorageID cdssdk.StorageID `json:"sourceStorageID"`
TargetStorageID cdssdk.StorageID `json:"targetStorageID"`
TransferBytes int64 `json:"transferBytes"`
SourceStorageID clitypes.StorageID `json:"sourceStorageID"`
TargetStorageID clitypes.StorageID `json:"targetStorageID"`
TransferBytes int64 `json:"transferBytes"`
}

type BlockChangeClone struct {
serder.Metadata `union:"Clone"`
Type string `json:"type"`
BlockType string `json:"blockType"`
Index int `json:"index"`
SourceStorageID cdssdk.StorageID `json:"sourceStorageID"`
TargetStorageID cdssdk.StorageID `json:"targetStorageID"`
TransferBytes int64 `json:"transferBytes"`
Type string `json:"type"`
BlockType string `json:"blockType"`
Index int `json:"index"`
SourceStorageID clitypes.StorageID `json:"sourceStorageID"`
TargetStorageID clitypes.StorageID `json:"targetStorageID"`
TransferBytes int64 `json:"transferBytes"`
}

func (b *BlockChangeClone) GetBlockChangeType() string {
@@ -335,9 +336,9 @@ func (b *BlockChangeClone) OnUnionSerializing() {

type BlockChangeDeleted struct {
serder.Metadata `union:"Deleted"`
Type string `json:"type"`
Index int `json:"index"`
StorageID cdssdk.StorageID `json:"storageID"`
Type string `json:"type"`
Index int `json:"index"`
StorageID clitypes.StorageID `json:"storageID"`
}

func (b *BlockChangeDeleted) GetBlockChangeType() string {
@@ -368,11 +369,11 @@ func (b *BlockChangeEnDecode) OnUnionSerializing() {
type BodyBlockDistribution struct {
serder.Metadata `union:"BlockDistribution"`
Type string `json:"type"`
ObjectID cdssdk.ObjectID `json:"objectID"`
PackageID cdssdk.PackageID `json:"packageID"`
ObjectID clitypes.ObjectID `json:"objectID"`
PackageID clitypes.PackageID `json:"packageID"`
Path string `json:"path"`
Size int64 `json:"size"`
FileHash cdssdk.FileHash `json:"fileHash"`
FileHash clitypes.FileHash `json:"fileHash"`
FaultTolerance float64 `json:"faultTolerance"`
Redundancy float64 `json:"redundancy"`
AvgAccessCost float64 `json:"avgAccessCost"`
@@ -389,16 +390,16 @@ func (b *BodyBlockDistribution) OnUnionSerializing() {
}

type BlockDistributionObjectInfo struct {
BlockType string `json:"type"`
Index int `json:"index"`
StorageID cdssdk.StorageID `json:"storageID"`
BlockType string `json:"type"`
Index int `json:"index"`
StorageID clitypes.StorageID `json:"storageID"`
}

// 新增或者重新上传Object的事件
type BodyNewOrUpdateObject struct {
serder.Metadata `union:"NewOrUpdateObject"`
Type string `json:"type"`
Info cdssdk.Object `json:"info"`
Info clitypes.Object `json:"info"`
BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"`
}

@@ -413,8 +414,8 @@ func (b *BodyNewOrUpdateObject) OnUnionSerializing() {
// Object的基本信息更新的事件
type BodyObjectInfoUpdated struct {
serder.Metadata `union:"ObjectInfoUpdated"`
Type string `json:"type"`
Object cdssdk.Object `json:"object"`
Type string `json:"type"`
Object clitypes.Object `json:"object"`
}

func (b *BodyObjectInfoUpdated) GetBodyType() string {
@@ -428,8 +429,8 @@ func (b *BodyObjectInfoUpdated) OnUnionSerializing() {
// Object删除的事件
type BodyObjectDeleted struct {
serder.Metadata `union:"ObjectDeleted"`
Type string `json:"type"`
ObjectID cdssdk.ObjectID `json:"objectID"`
Type string `json:"type"`
ObjectID clitypes.ObjectID `json:"objectID"`
}

func (b *BodyObjectDeleted) GetBodyType() string {
@@ -443,8 +444,8 @@ func (b *BodyObjectDeleted) OnUnionSerializing() {
// 新增Package的事件
type BodyNewPackage struct {
serder.Metadata `union:"NewPackage"`
Type string `json:"type"`
Info cdssdk.Package `json:"info"`
Type string `json:"type"`
Info clitypes.Package `json:"info"`
}

func (b *BodyNewPackage) GetBodyType() string {
@@ -458,11 +459,11 @@ func (b *BodyNewPackage) OnUnionSerializing() {
// Package克隆的事件
type BodyPackageCloned struct {
serder.Metadata `union:"PackageCloned"`
Type string `json:"type"`
SourcePackageID cdssdk.PackageID `json:"sourcePackageID"`
NewPackage cdssdk.Package `json:"newPackage"`
SourceObjectIDs []cdssdk.ObjectID `json:"sourceObjectIDs"` // 原本的ObjectID
NewObjectIDs []cdssdk.ObjectID `json:"newObjectIDs"` // 复制后的新ObjectID,与SourceObjectIDs一一对应
Type string `json:"type"`
SourcePackageID clitypes.PackageID `json:"sourcePackageID"`
NewPackage clitypes.Package `json:"newPackage"`
SourceObjectIDs []clitypes.ObjectID `json:"sourceObjectIDs"` // 原本的ObjectID
NewObjectIDs []clitypes.ObjectID `json:"newObjectIDs"` // 复制后的新ObjectID,与SourceObjectIDs一一对应
}

func (b *BodyPackageCloned) GetBodyType() string {
@@ -476,8 +477,8 @@ func (b *BodyPackageCloned) OnUnionSerializing() {
// Package删除的事件
type BodyPackageDeleted struct {
serder.Metadata `union:"PackageDeleted"`
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
Type string `json:"type"`
PackageID clitypes.PackageID `json:"packageID"`
}

func (b *BodyPackageDeleted) GetBodyType() string {
@@ -491,8 +492,8 @@ func (b *BodyPackageDeleted) OnUnionSerializing() {
// 新增Bucket的事件
type BodyNewBucket struct {
serder.Metadata `union:"NewBucket"`
Type string `json:"type"`
Info cdssdk.Bucket `json:"info"`
Type string `json:"type"`
Info clitypes.Bucket `json:"info"`
}

func (b *BodyNewBucket) GetBodyType() string {
@@ -506,8 +507,8 @@ func (b *BodyNewBucket) OnUnionSerializing() {
// Bucket删除的事件
type BodyBucketDeleted struct {
serder.Metadata `union:"BucketDeleted"`
Type string `json:"type"`
BucketID cdssdk.BucketID `json:"bucketID"`
Type string `json:"type"`
BucketID clitypes.BucketID `json:"bucketID"`
}

func (b *BodyBucketDeleted) GetBodyType() string {

+ 0
- 156
common/models/models.go View File

@@ -1,156 +0,0 @@
package stgmod

import (
"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sort2"
)

type ObjectBlock struct {
ObjectID cdssdk.ObjectID `gorm:"column:ObjectID; primaryKey; type:bigint" json:"objectID"`
Index int `gorm:"column:Index; primaryKey; type:int" json:"index"`
StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint" json:"storageID"` // 这个块应该在哪个节点上
FileHash cdssdk.FileHash `gorm:"column:FileHash; type:char(68); not null" json:"fileHash"`
Size int64 `gorm:"column:Size; type:bigint" json:"size"`
}

func (ObjectBlock) TableName() string {
return "ObjectBlock"
}

type ObjectDetail struct {
Object cdssdk.Object `json:"object"`
PinnedAt []cdssdk.StorageID `json:"pinnedAt"`
Blocks []ObjectBlock `json:"blocks"`
}

func NewObjectDetail(object cdssdk.Object, pinnedAt []cdssdk.StorageID, blocks []ObjectBlock) ObjectDetail {
return ObjectDetail{
Object: object,
PinnedAt: pinnedAt,
Blocks: blocks,
}
}

func DetailsFromObjects(objects []cdssdk.Object) []ObjectDetail {
details := make([]ObjectDetail, len(objects))
for i, object := range objects {
details[i] = ObjectDetail{
Object: object,
}
}
return details
}

// 将blocks放到对应的object中。要求objs和blocks都按ObjectID升序
func DetailsFillObjectBlocks(objs []ObjectDetail, blocks []ObjectBlock) {
blksCur := 0
for i := range objs {
obj := &objs[i]
// 1. 查询Object和ObjectBlock时均按照ObjectID升序排序
// 2. ObjectBlock结果集中的不同ObjectID数只会比Object结果集的少
// 因此在两个结果集上同时从头开始遍历时,如果两边的ObjectID字段不同,那么一定是ObjectBlock这边的ObjectID > Object的ObjectID,
// 此时让Object的遍历游标前进,直到两边的ObjectID再次相等
for ; blksCur < len(blocks); blksCur++ {
if blocks[blksCur].ObjectID != obj.Object.ObjectID {
break
}
obj.Blocks = append(obj.Blocks, blocks[blksCur])
}
}
}

// 将pinnedAt放到对应的object中。要求objs和pinnedAt都按ObjectID升序
func DetailsFillPinnedAt(objs []ObjectDetail, pinnedAt []cdssdk.PinnedObject) {
pinnedCur := 0
for i := range objs {
obj := &objs[i]
for ; pinnedCur < len(pinnedAt); pinnedCur++ {
if pinnedAt[pinnedCur].ObjectID != obj.Object.ObjectID {
break
}
obj.PinnedAt = append(obj.PinnedAt, pinnedAt[pinnedCur].StorageID)
}
}
}

type GrouppedObjectBlock struct {
ObjectID cdssdk.ObjectID
Index int
FileHash cdssdk.FileHash
Size int64
StorageIDs []cdssdk.StorageID
}

func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock {
grps := make(map[int]GrouppedObjectBlock)
for _, block := range o.Blocks {
grp, ok := grps[block.Index]
if !ok {
grp = GrouppedObjectBlock{
ObjectID: block.ObjectID,
Index: block.Index,
FileHash: block.FileHash,
Size: block.Size,
}
}
grp.StorageIDs = append(grp.StorageIDs, block.StorageID)
grps[block.Index] = grp
}

return sort2.Sort(lo.Values(grps), func(l, r GrouppedObjectBlock) int { return l.Index - r.Index })
}

type LocalMachineInfo struct {
HubID *cdssdk.HubID `json:"hubID"`
ExternalIP string `json:"externalIP"`
LocalIP string `json:"localIP"`
LocationID cdssdk.LocationID `json:"locationID"`
}

type PackageAccessStat struct {
PackageID cdssdk.PackageID `gorm:"column:PackageID; primaryKey; type:bigint" json:"packageID"`
StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint" json:"storageID"`
Amount float64 `gorm:"column:Amount; type:double" json:"amount"` // 前一日的读取量的滑动平均值
Counter float64 `gorm:"column:Counter; type:double" json:"counter"` // 当日的读取量
}

func (PackageAccessStat) TableName() string {
return "PackageAccessStat"
}

type ObjectAccessStat struct {
ObjectID cdssdk.ObjectID `gorm:"column:ObjectID; primaryKey; type:bigint" json:"objectID"`
StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint" json:"storageID"`
Amount float64 `gorm:"column:Amount; type:float; not null" json:"amount"` // 前一日的读取量的滑动平均值
Counter float64 `gorm:"column:Counter; type:float; not null" json:"counter"` // 当日的读取量
}

func (ObjectAccessStat) TableName() string {
return "ObjectAccessStat"
}

type StorageDetail struct {
Storage cdssdk.Storage `json:"storage"`
MasterHub *cdssdk.Hub `json:"masterHub"`
}

type ObjectStorage struct {
Manufacturer string `json:"manufacturer"`
Region string `json:"region"`
AK string `json:"access_key_id"`
SK string `json:"secret_access_key"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

const (
HuaweiCloud = "HuaweiCloud"
AliCloud = "AliCloud"
SugonCloud = "SugonCloud"
)

type LoadedPackageID struct {
UserID cdssdk.UserID
PackageID cdssdk.PackageID
}

Loading…
Cancel
Save