Browse Source

存储系统日志分析处理

gitlink
songjc 1 year ago
parent
commit
59b5082bc6
17 changed files with 435 additions and 0 deletions
  1. +24
    -0
      common/assets/confs/datamap.config.json
  2. +1
    -0
      common/pkgs/mq/consts.go
  3. +4
    -0
      common/pkgs/mq/datamap/hub.go
  4. +4
    -0
      common/pkgs/mq/datamap/object.go
  5. +72
    -0
      common/pkgs/mq/datamap/server.go
  6. +28
    -0
      datamap/internal/cmd/serve.go
  7. +26
    -0
      datamap/internal/config/config.go
  8. +1
    -0
      datamap/internal/db/blockdistribution.go
  9. +21
    -0
      datamap/internal/db/config/config.go
  10. +37
    -0
      datamap/internal/db/db.go
  11. +20
    -0
      datamap/internal/db/hub.go
  12. +1
    -0
      datamap/internal/db/hubrequest.go
  13. +1
    -0
      datamap/internal/db/object.go
  14. +1
    -0
      datamap/internal/db/storage.go
  15. +1
    -0
      datamap/internal/db/storagetransfercount.go
  16. +177
    -0
      datamap/internal/models/models.go
  17. +16
    -0
      datamap/main.go

+ 24
- 0
common/assets/confs/datamap.config.json View File

@@ -0,0 +1,24 @@
{
"logger": {
"output": "file",
"outputFileName": "datamap",
"outputDirectory": "log",
"level": "debug"
},
"db": {
"address": "106.75.6.194:3306",
"account": "root",
"password": "cloudream123456",
"databaseName": "cloudream"
},
"rabbitMQ": {
"address": "106.75.6.194:5672",
"account": "cloudream",
"password": "123456",
"vhost": "/",
"param": {
"retryNum": 5,
"retryInterval": 5000
}
}
}

+ 1
- 0
common/pkgs/mq/consts.go View File

@@ -5,6 +5,7 @@ import "fmt"
const (
COORDINATOR_QUEUE_NAME = "Coordinator"
SCANNER_QUEUE_NAME = "Scanner"
DATAMAP_QUEUE_NAME = "DataMap"
)

func MakeAgentQueueName(id int64) string {


+ 4
- 0
common/pkgs/mq/datamap/hub.go View File

@@ -0,0 +1,4 @@
package datamap

type HubService interface {
}

+ 4
- 0
common/pkgs/mq/datamap/object.go View File

@@ -0,0 +1,4 @@
package datamap

type ObjectService interface {
}

+ 72
- 0
common/pkgs/mq/datamap/server.go View File

@@ -0,0 +1,72 @@
package datamap

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

type Service interface {
HubService
ObjectService
}

type Server struct {
service Service
rabbitSvr mq.RabbitMQServer
}

func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
srv := &Server{
service: svc,
}

rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
mymq.DATAMAP_QUEUE_NAME,
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
cfg.Param,
)
if err != nil {
return nil, err
}

srv.rabbitSvr = *rabbitSvr

return srv, nil
}

func (s *Server) Stop() {
s.rabbitSvr.Close()
}

func (s *Server) Start(cfg mymq.Config) *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
return s.rabbitSvr.Start()
}

func (s *Server) OnError(callback func(error)) {
s.rabbitSvr.OnError = callback
}

var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()

// Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束:Service实现了TSvc接口
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {
mq.AddServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
mq.RegisterMessage[TResp]()

return nil
}

// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束:Service实现了TSvc接口
func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()

return nil
}

+ 28
- 0
datamap/internal/cmd/serve.go View File

@@ -0,0 +1,28 @@
package cmd

import (
"fmt"
"os"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage/datamap/internal/config"
)

func serve() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

// dataSvr, err := datamq.NewServer(mymq.NewService(db2), &config.Cfg().RabbitMQ)
// if err != nil {
// logger.Fatalf("new coordinator server failed, err: %s", err.Error())
// }
}

+ 26
- 0
datamap/internal/config/config.go View File

@@ -0,0 +1,26 @@
package config

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
db "gitlink.org.cn/cloudream/storage/common/pkgs/db2/config"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

type Config struct {
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
DistLock distlock.Config `json:"distlock"`
}

var cfg Config

func Init() error {
return c.DefaultLoad("datamap", &cfg)
}

func Cfg() *Config {
return &cfg
}

+ 1
- 0
datamap/internal/db/blockdistribution.go View File

@@ -0,0 +1 @@
package db

+ 21
- 0
datamap/internal/db/config/config.go View File

@@ -0,0 +1,21 @@
package config

import "fmt"

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
DatabaseName string `json:"databaseName"`
}

func (cfg *Config) MakeSourceString() string {
return fmt.Sprintf(
"%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s",
cfg.Account,
cfg.Password,
cfg.Address,
cfg.DatabaseName,
"Asia%2FShanghai",
)
}

+ 37
- 0
datamap/internal/db/db.go View File

@@ -0,0 +1,37 @@
package db

import (
"github.com/sirupsen/logrus"
"gitlink.org.cn/cloudream/storage/datamap/internal/db/config"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

type DB struct {
db *gorm.DB
}

func NewDB(cfg *config.Config) (*DB, error) {
mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{})
if err != nil {
logrus.Fatalf("failed to connect to database: %v", err)
}

return &DB{
db: mydb,
}, nil
}

func (db *DB) DoTx(do func(tx SQLContext) error) error {
return db.db.Transaction(func(tx *gorm.DB) error {
return do(SQLContext{tx})
})
}

type SQLContext struct {
*gorm.DB
}

func (db *DB) DefCtx() SQLContext {
return SQLContext{db.db}
}

+ 20
- 0
datamap/internal/db/hub.go View File

@@ -0,0 +1,20 @@
package db

import (
"gitlink.org.cn/cloudream/storage/datamap/internal/models"
)

type HubDB struct {
*DB
}

func (db *DB) Hub() *HubDB {
return &HubDB{DB: db}
}

func (*HubDB) GetAllHubs(ctx SQLContext) ([]models.Hub, error) {
var ret []models.Hub

err := ctx.Table("Hub").Find(&ret).Error
return ret, err
}

+ 1
- 0
datamap/internal/db/hubrequest.go View File

@@ -0,0 +1 @@
package db

+ 1
- 0
datamap/internal/db/object.go View File

@@ -0,0 +1 @@
package db

+ 1
- 0
datamap/internal/db/storage.go View File

@@ -0,0 +1 @@
package db

+ 1
- 0
datamap/internal/db/storagetransfercount.go View File

@@ -0,0 +1 @@
package db

+ 177
- 0
datamap/internal/models/models.go View File

@@ -0,0 +1,177 @@
package models

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type RequestID int64

type BlockID int64

type RelationshipID int64

type Status int

const (
StatusNow Status = iota // =0 表示当前实时状态
StatusYesterdayAfter // =1 表示前一天调整后的状态
StatusYesterdayBefore // =2 表示前一天调整前的状态
StatusTodayBeforeYesterday // =3 表示前两天调整后的状态
)

// 节点间关系图
type HubRelationship struct {
HubID cdssdk.HubID `json:"hubID"` // 节点ID
HubName string `json:"hubName"` // 名称
HubAddress cdssdk.HubAddressInfo `json:"hubAddress"` // 地址
Storages []StorageInfo `json:"storages"` // 对应的中心信息
Requests []Request `json:"requests"` // 与各节点的连接信息(仅记录发出的请求)
}

type StorageInfo struct {
StorageID cdssdk.StorageID `json:"storageID"` // 中心ID
DataCount int64 `json:"dataCount"` // 总数据量(文件分块数)
NewDataCount int64 `json:"newdataCount"` // 新增数据量(文件分块数)
Timestamp time.Time `json:"timestamp"` // 时间戳
}

type Request struct {
SourceHubID cdssdk.HubID `json:"sourceHubID"` // 源节点ID
TargetHubID cdssdk.HubID `json:"targetHubID"` // 目标节点ID
DataTransferCount int64 `json:"dataTransferCount"` // 数据传输量
RequestCount int64 `json:"requestCount"` // 请求数
FailedRequestCount int64 `json:"failedRequestCount"` // 失败请求数
AvgTransferCount int64 `json:"avgTransferCount"` // 平均数据传输量
MaxTransferCount int64 `json:"maxTransferCount"` // 最大数据传输量
MinTransferCount int64 `json:"minTransferCount"` // 最小数据传输量
StartTimestamp time.Time `json:"startTimestamp"` // 起始时间戳
EndTimestamp time.Time `json:"endTimestamp"` // 结束时间戳
}

// 对象块分布结构
type ObjectDistribution struct {
ObjectID cdssdk.ObjectID `json:"objectID"` // 对象 ID
PackageID cdssdk.PackageID `json:"packageID"` // 包 ID
Path string `json:"path"` // 路径
Size int64 `json:"size"` // 大小
FileHash string `json:"fileHash"` // 文件哈希
States []State `json:"states"` // 各阶段状态信息(只需要传1、2、3阶段)
Relationships []Relationship `json:"relationships"` // 节点间传输量
Timestamp time.Time `json:"timestamp"` // 请求中的时间戳
}

type State struct {
Timestamp time.Time `json:"timestamp"` // 时间戳
Status string `json:"status"` // 状态
FaultTolerance string `json:"faultTolerance"` // 容灾度(仅布局调整后)
Redundancy string `json:"redundancy"` // 冗余度(仅布局调整后)
AvgAccessCost float64 `json:"avgAccessCost"` // 平均访问开销(仅布局调整前)
BlockDistributions []BlockDist `json:"blockDistributions"` // 块分布情况
}

type BlockDist struct {
StorageID cdssdk.StorageID `json:"storageID"` // 中心ID
Blocks []Block `json:"blocks"` // 该中心的所有块
}

type Block struct {
Type string `json:"type"` // 块类型
Index string `json:"index"` // 块编号
ID string `json:"id"` // 块ID
}

type Relationship struct {
Status Status `json:"status"` // 连线左侧的状态
SourceStorageID string `json:"sourceStorageID"` // 源存储节点 ID
TargetStorageID string `json:"targetStorageID"` // 目标存储节点 ID
DataTransferCount string `json:"dataTransferCount"` // 数据传输量
Timestamp time.Time `json:"timestamp"` // 变化结束时间戳
}

//数据库结构定义

type Hub struct {
HubID cdssdk.HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"`
Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"`
Address cdssdk.HubAddressInfo `gorm:"column:Address; type:json; serializer:union" json:"address"`
}

func (Hub) TableName() string {
return "Hub"
}

type Storage struct {
StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint; autoIncrement" json:"storageID"`
HubID cdssdk.HubID `gorm:"column:HubID; type:bigint; not null" json:"hubID"`
DataCount int64 `gorm:"column:DataCount; type:bigint; not null" json:"dataCount"`
NewDataCount int64 `gorm:"column:NewDataCount; type:bigint; not null" json:"newDataCount"`
Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"`
}

func (Storage) TableName() string {
return "Storage"
}

type HubRequest struct {
RequestID int64 `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"`
SourceHubID cdssdk.HubID `gorm:"column:SourceHubID; type:bigint; not null" json:"sourceHubID"`
TargetHubID cdssdk.HubID `gorm:"column:TargetHubID; type:bigint; not null" json:"targetHubID"`
DataTransfer int64 `gorm:"column:DataTransfer; type:bigint; not null" json:"dataTransfer"`
RequestCount int64 `gorm:"column:RequestCount; type:bigint; not null" json:"requestCount"`
FailedRequest int64 `gorm:"column:FailedRequest; type:bigint; not null" json:"failedRequest"`
AvgTransferCount int64 `gorm:"column:AvgTransferCount; type:bigint; not null" json:"avgTransferCount"`
MaxTransferCount int64 `gorm:"column:MaxTransferCount; type:bigint; not null" json:"maxTransferCount"`
MinTransferCount int64 `gorm:"column:MinTransferCount; type:bigint; not null" json:"minTransferCount"`
StartTimestamp time.Time `gorm:"column:StartTimestamp; type:datatime; not null" json:"startTimestamp"`
EndTimestamp time.Time `gorm:"column:EndTimestamp; type:datatime; not null" json:"endTimestamp"`
}

func (HubRequest) TableName() string {
return "HubRequest"
}

type Object struct {
ObjectID cdssdk.ObjectID `gorm:"column:ObjectID; primaryKey; type:bigint; autoIncrement" json:"objectID"`
PackageID cdssdk.PackageID `gorm:"column:PackageID; type:bigint; not null" json:"packageID"`
Path string `gorm:"column:Path; type:varchar(1024); not null" json:"path"`
Size int64 `gorm:"column:Size; type:bigint; not null" json:"size"`
FileHash string `gorm:"column:FileHash; type:varchar(255); not null" json:"fileHash"`
Status Status `gorm:"column:Status; type:tinyint; not null" json:"status"`
FaultTolerance float64 `gorm:"column:faultTolerance; type:float; not null" json:"faultTolerance"`
Redundancy float64 `gorm:"column:redundancy; type:float; not null" json:"redundancy"`
AvgAccessCost float64 `gorm:"column:avgAccessCost; type:float; not null" json:"avgAccessCost"`
Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"`
}

func (Object) TableName() string {
return "Object"
}

type BlockDistribution struct {
BlockID int64 `gorm:"column:BlockID; primaryKey; type:bigint; autoIncrement" json:"blockID"`
ObjectID cdssdk.ObjectID `gorm:"column:ObjectID; type:bigint; not null" json:"objectID"`
Type string `gorm:"column:Type; type:varchar(1024); not null" json:"type"`
Index int64 `gorm:"column:Index; type:bigint; not null" json:"index"`
StorageID cdssdk.StorageID `gorm:"column:StorageID; type:bigint; not null" json:"storageID"`
Status Status `gorm:"column:Status; type:tinyint; not null" json:"status"`
Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"`
}

func (BlockDistribution) TableName() string {
return "BlockDistribution"
}

type StorageTransferCount struct {
RelationshipID RelationshipID `gorm:"column:RelationshipID; primaryKey; type:bigint; autoIncrement" json:"relationshipID"`
Status Status `gorm:"column:Status; type:tinyint; not null" json:"status"`
SourceStorageID cdssdk.StorageID `gorm:"column:SourceStorageID; type:bigint; not null" json:"sourceStorageID"`
TargetStorageID cdssdk.StorageID `gorm:"column:TargetStorageID; type:bigint; not null" json:"targetStorageID"`
DataTransferCount int64 `gorm:"column:DataTransferCount; type:bigint; not null" json:"dataTransferCount"`
Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"`
}

func (StorageTransferCount) TableName() string {
return "StorageTransferCount"
}

+ 16
- 0
datamap/main.go View File

@@ -0,0 +1,16 @@
package main

import (
"fmt"
"os"

"gitlink.org.cn/cloudream/storage/datamap/internal/config"
)

func main() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}
}

Loading…
Cancel
Save