From 5fe52ee0689a40e86d848dea71568c0198bdaf60 Mon Sep 17 00:00:00 2001 From: Jake <450705171@qq.com> Date: Wed, 15 Jan 2025 16:41:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9=E9=BD=90=E5=89=8D=E7=AB=AF=E5=8F=A3?= =?UTF-8?q?=E5=BE=84=20mq=E5=8F=A3=E5=BE=84=E9=83=A8=E5=88=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E8=BF=98=E5=BE=85=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/models/datamap.go | 222 ++++++++++------- datamap/internal/config/config.go | 2 +- datamap/internal/db/db.go | 25 +- datamap/internal/handlers/handlers.go | 163 +++++++++++-- datamap/internal/models/block.go | 57 ----- datamap/internal/models/blockdistribution.go | 178 ++++++++++++++ datamap/internal/models/blocktransfer.go | 227 ++++++++++++++++++ datamap/internal/models/hub.go | 53 ++++ datamap/internal/models/hubinfo.go | 75 ++++++ datamap/internal/models/hubrequest.go | 84 +++++++ datamap/internal/models/hubtrans.go | 53 ---- datamap/internal/models/models.go | 177 +++++++------- datamap/internal/models/object.go | 65 ++--- .../models/{hubs.go => storageinfo.go} | 50 ++-- datamap/internal/mq/mq.go | 67 ++++-- datamap/internal/server/server.go | 5 +- datamap/main.go | 3 + go.mod | 10 +- go.sum | 41 +++- 19 files changed, 1166 insertions(+), 391 deletions(-) delete mode 100644 datamap/internal/models/block.go create mode 100644 datamap/internal/models/blockdistribution.go create mode 100644 datamap/internal/models/blocktransfer.go create mode 100644 datamap/internal/models/hub.go create mode 100644 datamap/internal/models/hubinfo.go create mode 100644 datamap/internal/models/hubrequest.go delete mode 100644 datamap/internal/models/hubtrans.go rename datamap/internal/models/{hubs.go => storageinfo.go} (66%) diff --git a/common/models/datamap.go b/common/models/datamap.go index 581e8e5..38bf89b 100644 --- a/common/models/datamap.go +++ b/common/models/datamap.go @@ -1,55 +1,118 @@ package stgmod -import "time" +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "time" +) -// HubStat 中心信息 每天一次,各节点统计自身当前的总数据量 +// HubInfo 节点信息 + +type SourceHub struct { + Type string `json:"type"` +} + +type HubInfoBody struct { + HubID int `json:"hubID"` + HubInfo cdssdk.Hub `json:"hubInfo"` + Type string `json:"type"` +} + +type HubInfo struct { + Timestamp time.Time `json:"timestamp"` + Source SourceHub `json:"source"` + Category string `json:"category"` + Body HubInfoBody `json:"body"` +} + +//StorageInfo 节点信息 + +type SourceStorage struct { + Type string `json:"type"` +} + +type StorageInfoBody struct { + StorageID int64 `json:"storageID"` + StorageInfo cdssdk.Storage `json:"storageInfo"` + Type string `json:"type"` +} + +type StorageInfo struct { + Timestamp time.Time `json:"timestamp"` + Source SourceStorage `json:"source"` + Category string `json:"category"` + Body StorageInfoBody `json:"body"` +} + +// StorageStats 节点信息数据 type Source struct { Type string `json:"type"` HubID string `json:"hubID"` HubName string `json:"hubName"` } -type HubStatBody struct { - MasterHubID int `json:"masterHubID"` - MasterHubAddress string `json:"masterHubAddress"` - StorageID int `json:"storageID"` - DataCount int `json:"dataCount"` + +type StorageStatsBody struct { + StorageID int64 `json:"storageID"` + DataCount int64 `json:"dataCount"` } -type HubStat struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body HubStatBody `json:"body"` + +type StorageStats struct { + Timestamp time.Time `json:"timestamp"` + Source Source `json:"source"` + Category string `json:"category"` + Body StorageStatsBody `json:"body"` } -// HubTrans 节点传输信息 +// HubTransferStats 节点传输信息 // 每天一次,各节点统计自身当天向外部各个节点传输的总数据量 -type HubTransBody struct { - SourceHubID int64 `json:"sourceHubID"` - TargetHubID int64 `json:"targetHubID"` - DataTransferCount int64 `json:"dataTransferCount"` - RequestCount int64 `json:"requestCount"` - FailedRequestCount int64 `json:"failedRequestCount"` - AvgTransferCount int64 `json:"avgTransferCount"` - MaxTransferCount int64 `json:"maxTransferCount"` - MinTransferCount int64 `json:"minTransferCount"` - StartTimestamp time.Time `json:"startTimestamp"` - EndTimestamp time.Time `json:"endTimestamp"` -} -type HubTrans struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body HubTransBody `json:"body"` -} - -// BlockTransInfo 块传输信息 +type DataTrans struct { + TotalTransfer int64 `json:"totalTransfer"` + RequestCount int64 `json:"requestCount"` + FailedRequestCount int64 `json:"failedRequestCount"` + AvgTransfer int64 `json:"avgTransfer"` + MaxTransfer int64 `json:"maxTransfer"` + MinTransfer int64 `json:"minTransfer"` +} +type HubTransferStatsBody struct { + SourceHubID int64 `json:"sourceHubID"` + TargetHubID int64 `json:"targetHubID"` + Send DataTrans `json:"send"` + StartTimestamp time.Time `json:"startTimestamp"` + EndTimestamp time.Time `json:"endTimestamp"` +} +type HubTransferStats struct { + Timestamp time.Time `json:"timestamp"` + Source Source `json:"source"` + Category string `json:"category"` + Body HubTransferStatsBody `json:"body"` +} + +//hubStorageTransferStats 节点-中心间传输信息 + +type HubStorageTransferStatsBody struct { + HubID int64 `json:"hubID"` + StorageID int64 `json:"storageID"` + Send DataTrans `json:"send"` + Receive DataTrans `json:"receive"` + StartTimestamp time.Time `json:"startTimestamp"` + EndTimestamp time.Time `json:"endTimestamp"` +} + +type HubStorageTransferStats struct { + Timestamp time.Time `json:"timestamp"` + Source Source `json:"source"` + Category string `json:"category"` + Body HubStorageTransferStatsBody `json:"body"` +} + +// blockTransfer 块传输信息 /*实时日志,当对象的块信息发生变化时推送,共有4种变化类型: 拷贝 编解码(一变多、多变一、多变多) 删除 更新*/ + type Block struct { BlockType string `json:"blockType"` Index string `json:"index"` @@ -68,68 +131,63 @@ type BlockChange struct { SourceStorageID string `json:"sourceStorageID"` TargetStorageID string `json:"targetStorageID"` DataTransferCount string `json:"dataTransferCount"` - Timestamp string `json:"timestamp"` + Timestamp time.Time `json:"timestamp"` SourceBlocks []Block `json:"sourceBlocks,omitempty"` TargetBlocks []Block `json:"targetBlocks,omitempty"` DataTransfers []DataTransfer `json:"dataTransfers,omitempty"` Blocks []Block `json:"blocks,omitempty"` } -type BlockTransInfoBody struct { +type BlockTransferBody struct { Type string `json:"type"` ObjectID string `json:"objectID"` PackageID string `json:"packageID"` BlockChanges []BlockChange `json:"blockChanges"` } -type BlockTransInfo struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body BlockTransInfoBody `json:"body"` +type BlockTransfer struct { + Timestamp time.Time `json:"timestamp"` + Source Source `json:"source"` + Category string `json:"category"` + Body BlockTransferBody `json:"body"` } // BlockDistribution 块传输信息 // 每天一次,在调整完成后,将当天调整前后的布局情况一起推送 -type BlockDistribution struct { + +type BlockDistributionObjectInfo struct { Type string `json:"type"` Index string `json:"index"` StorageID string `json:"storageID"` } -type Object struct { - ObjectID string `json:"objectID"` - PackageID string `json:"packageID"` - Path string `json:"path"` - Size int64 `json:"size"` - FileHash string `json:"fileHash"` - FaultTolerance string `json:"faultTolerance"` - Redundancy string `json:"redundancy"` - AvgAccessCost string `json:"avgAccessCost"` - BlockDistribution []BlockDistribution `json:"blockDistribution"` - DataTransfers []DataTransfer `json:"dataTransfers"` +type BlockDistributionObject struct { + ObjectID int64 `json:"objectID"` + PackageID int64 `json:"packageID"` + Path string `json:"path"` + Size int64 `json:"size"` + FileHash string `json:"fileHash"` + FaultTolerance string `json:"faultTolerance"` + Redundancy string `json:"redundancy"` + AvgAccessCost string `json:"avgAccessCost"` + BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` + DataTransfers []DataTransfer `json:"dataTransfers"` } type BlockDistributionBody struct { - Timestamp time.Time `json:"timestamp"` - Type string `json:"type"` - ObjectID string `json:"objectID"` - PackageID string `json:"packageID"` - SourceBlocks []Block `json:"sourceBlocks,omitempty"` - TargetBlocks []Block `json:"targetBlocks,omitempty"` - DataTransfers []DataTransfer `json:"dataTransfers,omitempty"` - Blocks []Block `json:"blocks,omitempty"` + Timestamp time.Time `json:"timestamp"` + Object BlockDistributionObject `json:"object,omitempty"` } -type BlockDistributionInfo struct { +type BlockDistribution struct { Timestamp time.Time `json:"timestamp"` Source Source `json:"source"` Category string `json:"category"` Body BlockDistributionBody `json:"body"` } -// ObjectUpdateInfo Object变化信息 +// ObjectChange Object变化信息 -type ObjectUpdateInfoBody struct { +type ObjectChangeBody struct { Type string `json:"type"` ObjectID string `json:"objectID"` PackageID string `json:"packageID"` @@ -138,41 +196,41 @@ type ObjectUpdateInfoBody struct { BlockDistribution []BlockDistribution `json:"blockDistribution"` Timestamp string `json:"timestamp"` } -type ObjectUpdateInfo struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body ObjectUpdateInfoBody `json:"body"` +type ObjectChange struct { + Timestamp time.Time `json:"timestamp"` + Source Source `json:"source"` + Category string `json:"category"` + Body ObjectChangeBody `json:"body"` } -// PackageUpdateInfo package变化信息 +// PackageChange package变化信息 -type PackageUpdateInfoBody struct { +type PackageChangeBody struct { Type string `json:"type"` PackageID string `json:"packageID"` PackageName string `json:"packageName"` BucketID string `json:"bucketID"` Timestamp string `json:"timestamp"` } -type PackageUpdateInfo struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body PackageUpdateInfoBody `json:"body"` +type PackageChange struct { + Timestamp time.Time `json:"timestamp"` + Source Source `json:"source"` + Category string `json:"category"` + Body PackageChangeBody `json:"body"` } -// BucketUpdateInfo bucket变化信息 +// BucketChange bucket变化信息 -type BucketUpdateInfoBody struct { +type BucketChangeBody struct { Type string `json:"type"` BucketID string `json:"bucketID"` BucketName string `json:"bucketName"` Timestamp string `json:"timestamp"` } -type BucketUpdateInfo struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body BucketUpdateInfoBody `json:"body"` +type BucketChange struct { + Timestamp time.Time `json:"timestamp"` + Source Source `json:"source"` + Category string `json:"category"` + Body BucketChangeBody `json:"body"` } diff --git a/datamap/internal/config/config.go b/datamap/internal/config/config.go index 521ea76..069d877 100644 --- a/datamap/internal/config/config.go +++ b/datamap/internal/config/config.go @@ -30,7 +30,7 @@ type ServerConfig struct { } func LoadConfig() *Config { - viper.SetConfigFile(".env") + viper.SetConfigFile("C:\\Users\\Administrator\\workspace\\workspace\\storage\\datamap\\.env") viper.ReadInConfig() return &Config{ diff --git a/datamap/internal/db/db.go b/datamap/internal/db/db.go index d279f64..4f2d1d5 100644 --- a/datamap/internal/db/db.go +++ b/datamap/internal/db/db.go @@ -3,29 +3,30 @@ package db import ( "fmt" "gitlink.org.cn/cloudream/storage/datamap/internal/config" - "gitlink.org.cn/cloudream/storage/datamap/internal/models" "gorm.io/driver/mysql" "gorm.io/gorm" + "gorm.io/gorm/schema" ) -// 全局数据库连接实例 -var DB *gorm.DB - func InitDB(cfg config.DatabaseConfig) (*gorm.DB, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.DBName) - db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) + db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ + NamingStrategy: schema.NamingStrategy{ + SingularTable: true, // 禁用自动复数化表名 + }, + }) if err != nil { return nil, err } - // 自动迁移表结构 - db.AutoMigrate( - &models.Hub{}, - &models.Storage{}, - &models.HubRequest{}, - &models.BlockDistribution{}, - ) + //// 自动迁移表结构 + //db.AutoMigrate( + // &models.Hub{}, + // &models.Storage{}, + // &models.HubRequest{}, + // &models.BlockDistribution{}, + //) return db, nil } diff --git a/datamap/internal/handlers/handlers.go b/datamap/internal/handlers/handlers.go index 2b909af..0fa46ba 100644 --- a/datamap/internal/handlers/handlers.go +++ b/datamap/internal/handlers/handlers.go @@ -2,29 +2,160 @@ package handlers import ( "github.com/gin-gonic/gin" - "gitlink.org.cn/cloudream/storage/datamap/internal/db" "gitlink.org.cn/cloudream/storage/datamap/internal/models" + "gorm.io/gorm" "net/http" + "strconv" ) -// GetStorageData 获取 Storage 表中的全部数据 -func GetStorageData(c *gin.Context) { - repo := models.NewStorageRepository(db.DB) - storages, err := repo.GetAllStorages() - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch storage data"}) - return +// DB 全局数据库连接实例 +var DB *gorm.DB + +// SetDB 设置数据库连接实例 +func SetDB(db *gorm.DB) { + DB = db +} + +// GetHubInfo 获取 节点交互数据 +func GetHubInfo(c *gin.Context) { + + repoHub := models.NewHubRepository(DB) + repoStorage := models.NewStorageRepository(DB) + repoHubReq := models.NewHubRequestRepository(DB) + + nodes := make([]models.Node, 0) + edges := make([]models.Edge, 0) + + //添加所有节点信息 + hubs, _ := repoHub.GetAllHubs() + storages, _ := repoStorage.GetAllStorages() + for _, hub := range hubs { + node := models.Node{ + ID: int64(hub.HubID), + NodeType: "hub", + Name: hub.Name, + Address: hub.Address, + } + nodes = append(nodes, node) + } + for _, storage := range storages { + node := models.Node{ + ID: int64(storage.StorageID), + NodeType: "storage", + Name: storage.StorageName, + DataCount: storage.DataCount, + NewDataCount: storage.NewDataCount, + Timestamp: storage.Timestamp, + } + nodes = append(nodes, node) + } + + // 添加所有边信息 + hubReqs, _ := repoHubReq.GetAllHubRequests() + for _, hubReq := range hubReqs { + edge := models.Edge{ + SourceType: hubReq.SourceType, + SourceID: int64(hubReq.SourceID), + TargetType: hubReq.TargetType, + TargetID: int64(hubReq.TargetID), + DataTransferCount: hubReq.DataTransferCount, + RequestCount: hubReq.RequestCount, + FailedRequestCount: hubReq.FailedRequestCount, + AvgTransferCount: hubReq.AvgTransferCount, + MaxTransferCount: hubReq.MaxTransferCount, + MinTransferCount: hubReq.MinTransferCount, + StartTimestamp: hubReq.StartTimestamp, + EndTimestamp: hubReq.EndTimestamp, + } + edges = append(edges, edge) + } + hubRelationship := models.HubRelationship{ + Nodes: nodes, + Edges: edges, } - c.JSON(http.StatusOK, storages) + c.JSON(http.StatusOK, hubRelationship) } -// GetBlockDistributionData 获取 BlockDistribution 表中的全部数据 -func GetBlockDistributionData(c *gin.Context) { - repo := models.NewBlockDistributionRepository(db.DB) - blocks, err := repo.GetAllBlocks() - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch block distribution data"}) +func containsCombo(combos []models.Combo, targetID string, targetComboType string) bool { + for _, combo := range combos { + if combo.ID == targetID && combo.ComboType == targetComboType { + return true + } + } + return false +} + +// GetDataTransfer 数据对象的节点间传输量 +func GetDataTransfer(c *gin.Context) { + + repoObject := models.NewObjectRepository(DB) + repoBlockDistribution := models.NewBlockDistributionRepository(DB) + repoStorageTrans := models.NewStorageTransferCountRepository(DB) + + //首先判断object是否存在 + objectIDStr := c.Param("objectID") + objectID, _ := strconv.ParseInt(objectIDStr, 10, 64) + object, _ := repoObject.GetObjectByID(objectID) + if object == nil { + c.JSON(http.StatusOK, []interface{}{}) return } - c.JSON(http.StatusOK, blocks) + + nodes := make([]models.DistNode, 0) + combos := make([]models.Combo, 0) + edges := make([]models.DistEdge, 0) + + //根据ObjectID查询出在所有storage中存储的块或副本 + blocks, _ := repoBlockDistribution.GetBlockDistributionByObjectID(objectID) + for _, block := range blocks { + //nodes --------- block + //添加node信息 + node := models.DistNode{ + //block id + ID: strconv.FormatInt(block.BlockID, 10), + //storage id + ComboID: strconv.FormatInt(block.StorageID, 10), + //block index + Label: strconv.FormatInt(block.Index, 10), + //block type + NodeType: block.Type, + } + nodes = append(nodes, node) + + //combos ------- state or storage + //添加storage combo信息 + if !containsCombo(combos, strconv.FormatInt(block.StorageID, 10), "storage") { + combo := models.Combo{ + ID: strconv.FormatInt(block.StorageID, 10), + Label: "存储中心" + strconv.FormatInt(block.StorageID, 10), + ParentId: strconv.Itoa(block.Status), + ComboType: "storage", + } + combos = append(combos, combo) + } + //添加state combo信息 + if !containsCombo(combos, strconv.Itoa(block.Status), "state") { + combo := models.Combo{ + ID: strconv.Itoa(block.Status), + Label: "存储状态" + string(block.Status), + ComboType: "state", + } + combos = append(combos, combo) + } + } + //edges data trans between storage and storage + relations, _ := repoStorageTrans.GetStorageTransferCountByObjectID(objectID) + for _, relation := range relations { + edge := models.DistEdge{ + Source: strconv.FormatInt(relation.SourceStorageID, 10), + Target: strconv.FormatInt(relation.TargetStorageID, 10), + } + edges = append(edges, edge) + } + result := models.ObjectDistribution{ + Nodes: nodes, + Combos: combos, + Edges: edges, + } + c.JSON(http.StatusOK, result) } diff --git a/datamap/internal/models/block.go b/datamap/internal/models/block.go deleted file mode 100644 index 9c15b5c..0000000 --- a/datamap/internal/models/block.go +++ /dev/null @@ -1,57 +0,0 @@ -package models - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gorm.io/gorm" - "strconv" - "time" -) - -type BlockDistribution struct { - BlockID int64 `gorm:"column:BlockID; primaryKey; type:bigint; autoIncrement" json:"blockID"` - ObjectID cdssdk.ObjectID `gorm:"column:ObjectID; type:bigint; not null" json:"objectID"` - Type string `gorm:"column:Type; type:varchar(1024); not null" json:"type"` - Index int64 `gorm:"column:Index; type:bigint; not null" json:"index"` - StorageID cdssdk.StorageID `gorm:"column:StorageID; type:bigint; not null" json:"storageID"` - Status Status `gorm:"column:Status; type:tinyint; not null" json:"status"` - Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` -} - -type BlockDistributionRepository struct { - repo *GormRepository -} - -func NewBlockDistributionRepository(db *gorm.DB) *BlockDistributionRepository { - return &BlockDistributionRepository{repo: NewGormRepository(db)} -} - -func (r *BlockDistributionRepository) CreateBlockDistribution(block *BlockDistribution) error { - return r.repo.Create(block) -} - -func (r *BlockDistributionRepository) GetAllBlocks() ([]BlockDistribution, error) { - var blocks []BlockDistribution - err := r.repo.GetAll(&blocks) - if err != nil { - return nil, err - } - return blocks, nil -} - -func ProcessBlockDistributionInfo(data stgmod.BlockTransInfo) { - repo := NewBlockDistributionRepository(db) - - for _, change := range data.Body.BlockChanges { - objectID, _ := strconv.ParseInt(data.Body.ObjectID, 10, 64) - index, _ := strconv.ParseInt(change.Index, 10, 64) - targetStorageID, _ := strconv.ParseInt(change.TargetStorageID, 10, 64) - block := &BlockDistribution{ - ObjectID: cdssdk.ObjectID(objectID), - Type: change.Type, - Index: index, - StorageID: cdssdk.StorageID(targetStorageID), - } - repo.CreateBlockDistribution(block) - } -} diff --git a/datamap/internal/models/blockdistribution.go b/datamap/internal/models/blockdistribution.go new file mode 100644 index 0000000..a3e6e18 --- /dev/null +++ b/datamap/internal/models/blockdistribution.go @@ -0,0 +1,178 @@ +package models + +import ( + "errors" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gorm.io/gorm" + "log" + "strconv" + "time" +) + +type BlockDistribution struct { + BlockID int64 `gorm:"column:BlockID; primaryKey; type:bigint; autoIncrement" json:"blockID"` + ObjectID int64 `gorm:"column:ObjectID; type:bigint; not null" json:"objectID"` + Type string `gorm:"column:Type; type:varchar(1024); not null" json:"type"` + Index int64 `gorm:"column:Index; type:bigint; not null" json:"index"` + StorageID int64 `gorm:"column:StorageID; type:bigint; not null" json:"storageID"` + Status int `gorm:"column:Status; type:tinyint; not null" json:"status"` + Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` +} + +func (BlockDistribution) TableName() string { + return "blockdistribution" +} + +type BlockDistributionRepository struct { + repo *GormRepository +} + +func NewBlockDistributionRepository(db *gorm.DB) *BlockDistributionRepository { + return &BlockDistributionRepository{repo: NewGormRepository(db)} +} + +func (r *BlockDistributionRepository) CreateBlockDistribution(block *BlockDistribution) error { + return r.repo.Create(block) +} + +func (r *BlockDistributionRepository) UpdateBlockDistribution(block *BlockDistribution) error { + return r.repo.Update(block) +} + +func (r *BlockDistributionRepository) GetAllBlocks() ([]BlockDistribution, error) { + var blocks []BlockDistribution + err := r.repo.GetAll(&blocks) + if err != nil { + return nil, err + } + return blocks, nil +} + +func (r *BlockDistributionRepository) GetBlockDistributionByObjectID(objectID int64) ([]BlockDistribution, error) { + var blocks []BlockDistribution + query := "SELECT * FROM blockdistribution WHERE ObjectID = ?" + err := r.repo.db.Raw(query, objectID).Scan(&blocks).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return []BlockDistribution{}, errors.New("block not found") + } + return blocks, nil +} + +func (r *BlockDistributionRepository) GetStorageIDsByObjectID(objectID int64) ([]int64, error) { + var storageIDs []int64 + query := "SELECT distinct storageID FROM blockdistribution WHERE ObjectID = ?" + // 通过 ObjectID 查询 + err := r.repo.db.Raw(query, objectID).Scan(&storageIDs).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return []int64{}, errors.New("block not found") + } + return storageIDs, nil +} + +func (r *BlockDistributionRepository) GetBlockDistributionByIndex(objectID int64, index int64, storageID int64) (BlockDistribution, error) { + var block BlockDistribution + query := "SELECT * FROM blockdistribution WHERE ObjectID = ? AND `Index` = ? AND StorageID = ?" + // 通过 ObjectID 和 Index 联合查询 + err := r.repo.db.Exec(query, objectID, index, storageID).First(&block).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return BlockDistribution{}, errors.New("block not found") + } + return block, nil +} + +// DeleteBlockDistribution 删除 BlockDistribution 记录 (根据 ObjectID 和 Index) + +func (r *BlockDistributionRepository) DeleteBlockDistribution(objectID int64, index int64, storageID int64) error { + query := "DELETE FROM blockdistribution WHERE ObjectID = ? AND `Index` = ? AND StorageID = ?" + + return r.repo.db.Exec(query, objectID, index, storageID).Error +} + +// ProcessBlockDistribution mq推送各节点统计自身当前的总数据量时的处理逻辑 + +func ProcessBlockDistribution(data stgmod.BlockDistribution) { + repoObject := NewObjectRepository(DB) + repoBlock := NewBlockDistributionRepository(DB) + repoStorage := NewStorageTransferCountRepository(DB) + //更新object表中的状态 + object, err := repoObject.GetObjectByID(data.Body.Object.ObjectID) + faultTolerance, _ := strconv.ParseFloat(data.Body.Object.FaultTolerance, 64) + redundancy, _ := strconv.ParseFloat(data.Body.Object.Redundancy, 64) + avgAccessCost, _ := strconv.ParseFloat(data.Body.Object.AvgAccessCost, 64) + if errors.Is(err, gorm.ErrRecordNotFound) { + err := repoObject.CreateObject(&Object{ + ObjectID: cdssdk.ObjectID(data.Body.Object.ObjectID), + PackageID: cdssdk.PackageID(data.Body.Object.PackageID), + Path: data.Body.Object.Path, + Size: data.Body.Object.Size, + FileHash: data.Body.Object.FileHash, + Status: StatusYesterdayAfter, + FaultTolerance: faultTolerance, + Redundancy: redundancy, + AvgAccessCost: avgAccessCost, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create object: %v", err) + } + } else { + object.Status = StatusYesterdayAfter + err = repoObject.UpdateObject(object) + if err != nil { + log.Printf("Error update object: %v", err) + } + } + + //更新block表中的状态 + for _, blockDistribution := range data.Body.Object.BlockDistribution { + blockIndex, _ := strconv.ParseInt(blockDistribution.Index, 10, 64) + blockStorageID, _ := strconv.ParseInt(blockDistribution.StorageID, 10, 64) + blockDist, err := repoBlock.GetBlockDistributionByIndex(data.Body.Object.ObjectID, blockIndex, blockStorageID) + if errors.Is(err, gorm.ErrRecordNotFound) { + err := repoBlock.CreateBlockDistribution(&BlockDistribution{ + BlockID: blockDist.BlockID, + ObjectID: blockDist.ObjectID, + Type: blockDistribution.Type, + Index: blockIndex, + StorageID: blockStorageID, + Status: StatusYesterdayAfter, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create BlockDistribution: %v", err) + } + } else { + err := repoBlock.UpdateBlockDistribution(&BlockDistribution{ + BlockID: blockDist.BlockID, + ObjectID: blockDist.ObjectID, + Type: blockDistribution.Type, + Index: blockIndex, + StorageID: blockStorageID, + Status: StatusYesterdayAfter, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error update BlockDistribution: %v", err) + } + } + } + //在storageTransferCount表中添加记录 + for _, dataTransfer := range data.Body.Object.DataTransfers { + sourceStorageID, _ := strconv.ParseInt(dataTransfer.SourceStorageID, 10, 64) + targetStorageID, _ := strconv.ParseInt(dataTransfer.TargetStorageID, 10, 64) + dataTransferCount, _ := strconv.ParseInt(dataTransfer.DataTransferCount, 10, 64) + + err := repoStorage.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: data.Body.Object.ObjectID, + Status: StatusTodayBeforeYesterday, + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: dataTransferCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create StorageTransferCount : %v", err) + } + } +} diff --git a/datamap/internal/models/blocktransfer.go b/datamap/internal/models/blocktransfer.go new file mode 100644 index 0000000..0bcd351 --- /dev/null +++ b/datamap/internal/models/blocktransfer.go @@ -0,0 +1,227 @@ +package models + +import ( + "errors" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gorm.io/gorm" + "log" + "strconv" + "time" +) + +type StorageTransferCount struct { + RelationshipID int64 `gorm:"column:RelationshipID; primaryKey; type:bigint; autoIncrement" json:"relationshipID"` + ObjectID int64 `gorm:"column:ObjectID; type:bigint; not null" json:"objectID"` + Status int64 `gorm:"column:Status; type:bigint; not null" json:"status"` // 连线左侧的状态 + SourceStorageID int64 `gorm:"column:SourceStorageID; type:bigint; not null" json:"sourceStorageID"` // 源存储节点 ID + TargetStorageID int64 `gorm:"column:TargetStorageID; type:bigint; not null" json:"targetStorageID"` // 目标存储节点 ID + DataTransferCount int64 `gorm:"column:DataTransferCount; type:bigint; not null" json:"dataTransferCount"` // 数据传输量 + Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` // 变化结束时间戳 +} + +func (StorageTransferCount) TableName() string { + return "storagetransfercount" +} + +type StorageTransferCountRepository struct { + repo *GormRepository +} + +func NewStorageTransferCountRepository(db *gorm.DB) *StorageTransferCountRepository { + return &StorageTransferCountRepository{repo: NewGormRepository(db)} +} + +func (r *StorageTransferCountRepository) CreateStorageTransferCount(storageTransferCount *StorageTransferCount) error { + return r.repo.Create(storageTransferCount) +} + +func (r *StorageTransferCountRepository) UpdateStorageTransferCount(storageTransferCount *StorageTransferCount) error { + return r.repo.Update(storageTransferCount) +} + +func (r *StorageTransferCountRepository) GetStorageTransferCountByID(id int) (*StorageTransferCount, error) { + var storageTransferCount StorageTransferCount + err := r.repo.GetByID(uint(id), &storageTransferCount) + if err != nil { + return nil, err + } + return &storageTransferCount, nil +} + +func (r *StorageTransferCountRepository) GetStorageTransferCountByObjectID(objectID int64) ([]StorageTransferCount, error) { + var storageTransferCounts []StorageTransferCount + query := "SELECT * FROM storagetransfercount WHERE ObjectID = ?" + err := r.repo.db.Raw(query, objectID).Scan(&storageTransferCounts).Error + if err != nil { + return nil, err + } + return storageTransferCounts, nil +} + +func (r *StorageTransferCountRepository) GetAllStorageTransferCounts() ([]StorageTransferCount, error) { + var storageTransferCounts []StorageTransferCount + err := r.repo.GetAll(&storageTransferCounts) + if err != nil { + return nil, err + } + return storageTransferCounts, nil +} + +// ProcessBlockTransfer 处理块传输信息的实时日志,当对象的块信息发生变化时推送触发数据库刷新 + +func ProcessBlockTransfer(data stgmod.BlockTransfer) { + + repoDist := NewBlockDistributionRepository(DB) + repoStorage := NewStorageRepository(DB) + repoStorageTrans := NewStorageTransferCountRepository(DB) + repoObject := NewObjectRepository(DB) + + for _, change := range data.Body.BlockChanges { + + objectID, _ := strconv.ParseInt(data.Body.ObjectID, 10, 64) + object, _ := repoObject.GetObjectByID(objectID) + index, _ := strconv.ParseInt(change.Index, 10, 64) + sourceStorageID, _ := strconv.ParseInt(change.SourceStorageID, 10, 64) + targetStorageID, _ := strconv.ParseInt(change.TargetStorageID, 10, 64) + newDataCount, _ := strconv.ParseInt(change.DataTransferCount, 10, 64) + + switch change.Type { + case "0": //拷贝 + //查询出存储在数据库中的BlockDistribution信息 + blockSource, errSource := repoDist.GetBlockDistributionByIndex(objectID, index, sourceStorageID) + //没有记录就将source和target的信息都保存到库中 + if errors.Is(errSource, gorm.ErrRecordNotFound) { + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: objectID, + Type: change.BlockType, + Index: index, + StorageID: sourceStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create source blockdistribution: %v", err) + } + } else { + //有数据则新增一条storageID为targetStorageID的记录,同时更新状态 + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: blockSource.ObjectID, + Type: change.BlockType, + Index: index, + StorageID: targetStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error update blockdistribution: %v", err) + } + //复制完成之后增加的dataCount要加到targetStorage的记录中 + storageOld, err := repoStorage.GetStorageByID(targetStorageID) + if errors.Is(err, gorm.ErrRecordNotFound) { + err = repoStorage.CreateStorage(&Storage{ + StorageID: cdssdk.StorageID(targetStorageID), + DataCount: newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error increase datacount in targetstorage: %v", err) + } + } else { + err = repoStorage.UpdateStorage(&Storage{ + StorageID: cdssdk.StorageID(targetStorageID), + DataCount: storageOld.DataCount + newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error increase datacount in targetstorage: %v", err) + } + } + + } + //新增记录到storageTransferCount表中 + err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: objectID, + Status: int64(blockSource.Status), + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create StorageTransferCount : %v", err) + } + case "1": //编解码 + //删除所有的sourceBlock + for _, sourceBlock := range change.SourceBlocks { + sourceBlockIndex, _ := strconv.ParseInt(sourceBlock.Index, 10, 64) + err := repoDist.DeleteBlockDistribution(objectID, sourceBlockIndex, sourceStorageID) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) + } + } + //插入所有的targetBlock + for _, targetBlock := range change.TargetBlocks { + storageID, _ := strconv.ParseInt(targetBlock.StorageID, 10, 64) + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: objectID, + Type: targetBlock.BlockType, + Index: index, + //直接保存到目标中心 + StorageID: storageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create blockdistribution: %v", err) + } + } + //新增记录到storageTransferCount表中 + err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: objectID, + Status: int64(object.Status), + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create StorageTransferCount : %v", err) + } + + case "2": //删除 + for _, block := range change.Blocks { + storageID, _ := strconv.ParseInt(block.StorageID, 10, 64) + changeIndex, _ := strconv.ParseInt(block.Index, 10, 64) + err := repoDist.DeleteBlockDistribution(objectID, changeIndex, storageID) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) + } + } + + case "3": //更新 + for _, blockUpdate := range change.Blocks { + //查询出存储在数据库中的BlockDistribution信息 + blockIndex, _ := strconv.ParseInt(blockUpdate.Index, 10, 64) + blockOld, err := repoDist.GetBlockDistributionByIndex(objectID, blockIndex, sourceStorageID) + newStorageID, _ := strconv.ParseInt(blockUpdate.StorageID, 10, 64) + err = repoDist.UpdateBlockDistribution(&BlockDistribution{ + BlockID: blockOld.BlockID, + ObjectID: blockOld.ObjectID, + Type: blockUpdate.BlockType, + Index: blockIndex, + StorageID: newStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) + } + } + + default: + break + } + } + +} diff --git a/datamap/internal/models/hub.go b/datamap/internal/models/hub.go new file mode 100644 index 0000000..fdb7795 --- /dev/null +++ b/datamap/internal/models/hub.go @@ -0,0 +1,53 @@ +package models + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gorm.io/datatypes" + "gorm.io/gorm" +) + +type Hub struct { + HubID cdssdk.HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"` + Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` + Address datatypes.JSON `gorm:"column:Address; type:json; " json:"address"` +} + +func (Hub) TableName() string { return "hub" } + +type HubRepository struct { + repo *GormRepository +} + +func NewHubRepository(db *gorm.DB) *HubRepository { + return &HubRepository{repo: NewGormRepository(db)} +} + +func (r *HubRepository) CreateHub(hub *Hub) error { + return r.repo.Create(hub) +} + +func (r *HubRepository) UpdateHub(hub *Hub) error { + return r.repo.Update(hub) +} + +func (r *HubRepository) DeleteHub(hub *Hub) error { + return r.repo.Delete(hub, uint(hub.HubID)) +} + +func (r *HubRepository) GetHubByID(id int) (*Hub, error) { + var hub Hub + err := r.repo.GetByID(uint(id), &hub) + if err != nil { + return nil, err + } + return &hub, nil +} + +func (r *HubRepository) GetAllHubs() ([]Hub, error) { + var hubs []Hub + err := r.repo.GetAll(&hubs) + if err != nil { + return nil, err + } + return hubs, nil +} diff --git a/datamap/internal/models/hubinfo.go b/datamap/internal/models/hubinfo.go new file mode 100644 index 0000000..81b1369 --- /dev/null +++ b/datamap/internal/models/hubinfo.go @@ -0,0 +1,75 @@ +package models + +import ( + "encoding/json" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gorm.io/datatypes" +) + +// LocalHub 本地结构体,嵌入cdssdk.Hub +type LocalHub struct { + cdssdk.Hub +} + +type ConcreteHubType struct { + Address string +} + +func (c ConcreteHubType) GetStorageType() string { + return c.Address +} + +func (c ConcreteHubType) String() string { + return c.Address +} + +func (s *LocalHub) UnmarshalJSON(data []byte) error { + // 定义一个临时结构体来解析 JSON + type Alias LocalHub + aux := &struct { + Address string `json:"address"` + *Alias + }{ + Alias: (*Alias)(s), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + s.Address = ConcreteHubType{Address: aux.Address} + return nil +} + +func ProcessHubInfo(data stgmod.HubInfo) { + repo := NewHubRepository(DB) + jsonData, _ := json.Marshal(data.Body.HubInfo.Address) + HubInfo := &Hub{ + HubID: cdssdk.HubID(data.Body.HubID), + Name: data.Body.HubInfo.Name, + Address: datatypes.JSON(jsonData), + } + + //先判断传输数据的类型 + switch data.Body.Type { + case "add": + err := repo.CreateHub(HubInfo) + if err != nil { + return + } + case "update": + err := repo.UpdateHub(HubInfo) + if err != nil { + return + } + case "delete": + err := repo.DeleteHub(HubInfo) + if err != nil { + return + } + default: + return + } + +} diff --git a/datamap/internal/models/hubrequest.go b/datamap/internal/models/hubrequest.go new file mode 100644 index 0000000..47a77c8 --- /dev/null +++ b/datamap/internal/models/hubrequest.go @@ -0,0 +1,84 @@ +package models + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gorm.io/gorm" + "log" + "time" +) + +type HubRequest struct { + RequestID int64 `gorm:"column:RequestID; primaryKey; type:bigint; autoIncrement" json:"RequestID"` + SourceType string `gorm:"column:SourceType; type:varchar(255); not null" json:"sourceType"` + SourceID cdssdk.HubID `gorm:"column:SourceHubID; type:bigint; not null" json:"sourceID"` + TargetType string `gorm:"column:TargetType; type:varchar(255); not null" json:"targetType"` + TargetID cdssdk.HubID `gorm:"column:TargetHubID; type:bigint; not null" json:"targetID"` + DataTransferCount int64 `gorm:"column:DataTransferCount; type:bigint; not null" json:"dataTransferCount"` + RequestCount int64 `gorm:"column:RequestCount; type:bigint; not null" json:"requestCount"` + FailedRequestCount int64 `gorm:"column:FailedRequestCount; type:bigint; not null" json:"failedRequestCount"` + AvgTransferCount int64 `gorm:"column:AvgTransferCount; type:bigint; not null" json:"avgTransferCount"` + MaxTransferCount int64 `gorm:"column:MaxTransferCount; type:bigint; not null" json:"maxTransferCount"` + MinTransferCount int64 `gorm:"column:MinTransferCount; type:bigint; not null" json:"minTransferCount"` + StartTimestamp time.Time `gorm:"column:StartTimestamp; type:datatime; not null" json:"startTimestamp"` + EndTimestamp time.Time `gorm:"column:EndTimestamp; type:datatime; not null" json:"endTimestamp"` +} + +func (HubRequest) TableName() string { return "hubrequest" } + +type HubRequestRepository struct { + repo *GormRepository +} + +func NewHubRequestRepository(db *gorm.DB) *HubRequestRepository { + return &HubRequestRepository{repo: NewGormRepository(db)} +} + +func (r *HubRequestRepository) CreateHubRequest(request *HubRequest) error { + return r.repo.Create(request) +} + +func (r *HubRequestRepository) GetHubRequestByHubID(hubId int64) ([]HubRequest, error) { + var hubRequests []HubRequest + query := "SELECT * FROM hubrequest WHERE SourceHubID = ?" + err := r.repo.db.Raw(query, hubId).Scan(&hubRequests).Error + if err != nil { + return nil, err + } + return hubRequests, nil +} + +func (r *HubRequestRepository) GetAllHubRequests() ([]HubRequest, error) { + var hubRequests []HubRequest + err := r.repo.GetAll(&hubRequests) + if err != nil { + return nil, err + } + return hubRequests, nil +} + +// ProcessHubTransfer mq推送各节点统计自身当天向外部各个节点传输的总数据量时的处理逻辑 +func ProcessHubTransfer(data stgmod.HubTransferStats) { + repo := NewHubRequestRepository(DB) + + //todo 加字段 + hubRequest := &HubRequest{ + + SourceID: cdssdk.HubID(data.Body.SourceHubID), + TargetID: cdssdk.HubID(data.Body.TargetHubID), + DataTransferCount: data.Body.Send.TotalTransfer, + RequestCount: data.Body.Send.RequestCount, + FailedRequestCount: data.Body.Send.FailedRequestCount, + AvgTransferCount: data.Body.Send.AvgTransfer, + MaxTransferCount: data.Body.Send.MaxTransfer, + MinTransferCount: data.Body.Send.MinTransfer, + StartTimestamp: data.Body.StartTimestamp, + EndTimestamp: data.Body.EndTimestamp, + } + + err := repo.CreateHubRequest(hubRequest) + if err != nil { + log.Printf("Error update hubrequest: %v", err) + + } +} diff --git a/datamap/internal/models/hubtrans.go b/datamap/internal/models/hubtrans.go deleted file mode 100644 index d209fc8..0000000 --- a/datamap/internal/models/hubtrans.go +++ /dev/null @@ -1,53 +0,0 @@ -package models - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gorm.io/gorm" - "time" -) - -type HubRequest struct { - RequestID int64 `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"` - SourceHubID cdssdk.HubID `gorm:"column:SourceHubID; type:bigint; not null" json:"sourceHubID"` - TargetHubID cdssdk.HubID `gorm:"column:TargetHubID; type:bigint; not null" json:"targetHubID"` - DataTransfer int64 `gorm:"column:DataTransfer; type:bigint; not null" json:"dataTransfer"` - RequestCount int64 `gorm:"column:RequestCount; type:bigint; not null" json:"requestCount"` - FailedRequest int64 `gorm:"column:FailedRequest; type:bigint; not null" json:"failedRequest"` - AvgTransferCount int64 `gorm:"column:AvgTransferCount; type:bigint; not null" json:"avgTransferCount"` - MaxTransferCount int64 `gorm:"column:MaxTransferCount; type:bigint; not null" json:"maxTransferCount"` - MinTransferCount int64 `gorm:"column:MinTransferCount; type:bigint; not null" json:"minTransferCount"` - StartTimestamp time.Time `gorm:"column:StartTimestamp; type:datatime; not null" json:"startTimestamp"` - EndTimestamp time.Time `gorm:"column:EndTimestamp; type:datatime; not null" json:"endTimestamp"` -} - -type HubRequestRepository struct { - repo *GormRepository -} - -func NewHubRequestRepository(db *gorm.DB) *HubRequestRepository { - return &HubRequestRepository{repo: NewGormRepository(db)} -} - -func (r *HubRequestRepository) CreateHubRequest(request *HubRequest) error { - return r.repo.Create(request) -} - -func ProcessHubTrans(data stgmod.HubTrans) { - repo := NewHubRequestRepository(db) - - hubRequest := &HubRequest{ - SourceHubID: cdssdk.HubID(data.Body.SourceHubID), - TargetHubID: cdssdk.HubID(data.Body.TargetHubID), - DataTransfer: data.Body.DataTransferCount, - RequestCount: data.Body.RequestCount, - FailedRequest: data.Body.FailedRequestCount, - AvgTransferCount: data.Body.AvgTransferCount, - MaxTransferCount: data.Body.MaxTransferCount, - MinTransferCount: data.Body.MinTransferCount, - StartTimestamp: data.Body.StartTimestamp, - EndTimestamp: data.Body.EndTimestamp, - } - - repo.CreateHubRequest(hubRequest) -} diff --git a/datamap/internal/models/models.go b/datamap/internal/models/models.go index 2c239e9..b3b1161 100644 --- a/datamap/internal/models/models.go +++ b/datamap/internal/models/models.go @@ -2,116 +2,123 @@ package models import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gorm.io/gorm" "time" ) -type RequestID int64 +var DB *gorm.DB -type BlockID int64 +// InitDB 初始化数据库连接 +func InitDB(db *gorm.DB) { + DB = db +} +type RequestID int64 +type BlockID int64 type RelationshipID int64 - type Status int const ( - StatusNow Status = iota // =0 表示当前实时状态 - StatusYesterdayAfter // =1 表示前一天调整后的状态 - StatusYesterdayBefore // =2 表示前一天调整前的状态 - StatusTodayBeforeYesterday // =3 表示前两天调整后的状态 + StatusNow = 0 //表示当前实时状态 + StatusYesterdayAfter = 1 // 表示前一天调整后的状态 + StatusYesterdayBefore = 2 // 表示前一天调整前的状态 + StatusTodayBeforeYesterday = 3 // 表示前两天调整后的状态 ) // 节点间关系图 + type HubRelationship struct { - HubID cdssdk.HubID `json:"hubID"` // 节点ID - HubName string `json:"hubName"` // 名称 - HubAddress cdssdk.HubAddressInfo `json:"hubAddress"` // 地址 - Storages []StorageInfo `json:"storages"` // 对应的中心信息 - Requests []Request `json:"requests"` // 与各节点的连接信息(仅记录发出的请求) + Nodes []Node `json:"nodes"` // 节点列表 + Edges []Edge `json:"edges"` // 名称 } -type StorageInfo struct { - StorageID cdssdk.StorageID `json:"storageID"` // 中心ID - DataCount int64 `json:"dataCount"` // 总数据量(文件分块数) - NewDataCount int64 `json:"newdataCount"` // 新增数据量(文件分块数) - Timestamp time.Time `json:"timestamp"` // 时间戳 +type Node struct { + ID int64 `json:"id"` // 节点/中心ID + NodeType string `json:"nodeType"` //节点类型 storage/hub + Name string `json:"name"` // 节点/中心名称 + Address cdssdk.HubAddressInfo `json:"address"` // 地址 + DataCount int64 `json:"dataCount"` // 总数据量(文件分块数) + NewDataCount int64 `json:"newdataCount"` // 新增数据量(文件分块数) + Timestamp time.Time `json:"timestamp"` // 时间戳 } -type Request struct { - SourceHubID cdssdk.HubID `json:"sourceHubID"` // 源节点ID - TargetHubID cdssdk.HubID `json:"targetHubID"` // 目标节点ID - DataTransferCount int64 `json:"dataTransferCount"` // 数据传输量 - RequestCount int64 `json:"requestCount"` // 请求数 - FailedRequestCount int64 `json:"failedRequestCount"` // 失败请求数 - AvgTransferCount int64 `json:"avgTransferCount"` // 平均数据传输量 - MaxTransferCount int64 `json:"maxTransferCount"` // 最大数据传输量 - MinTransferCount int64 `json:"minTransferCount"` // 最小数据传输量 - StartTimestamp time.Time `json:"startTimestamp"` // 起始时间戳 - EndTimestamp time.Time `json:"endTimestamp"` // 结束时间戳 +type Edge struct { + SourceType string `json:"sourceType"` // 源节点类型 + SourceID int64 `json:"sourceID"` // 源节点ID + TargetType string `json:"targetType"` // 目标节点类型 + TargetID int64 `json:"targetID"` // 目标节点ID + DataTransferCount int64 `json:"dataTransferCount"` // 数据传输量 + RequestCount int64 `json:"requestCount"` // 请求数 + FailedRequestCount int64 `json:"failedRequestCount"` // 失败请求数 + AvgTransferCount int64 `json:"avgTransferCount"` // 平均数据传输量 + MaxTransferCount int64 `json:"maxTransferCount"` // 最大数据传输量 + MinTransferCount int64 `json:"minTransferCount"` // 最小数据传输量 + StartTimestamp time.Time `json:"startTimestamp"` // 起始时间戳 + EndTimestamp time.Time `json:"endTimestamp"` // 结束时间戳 } -// 对象块分布结构 +// 对象分布 type ObjectDistribution struct { - ObjectID cdssdk.ObjectID `json:"objectID"` // 对象 ID - PackageID cdssdk.PackageID `json:"packageID"` // 包 ID - Path string `json:"path"` // 路径 - Size int64 `json:"size"` // 大小 - FileHash string `json:"fileHash"` // 文件哈希 - States []State `json:"states"` // 各阶段状态信息(只需要传1、2、3阶段) - Relationships []Relationship `json:"relationships"` // 节点间传输量 - Timestamp time.Time `json:"timestamp"` // 请求中的时间戳 + Nodes []DistNode `json:"nodes"` + Combos []Combo `json:"combos"` + Edges []DistEdge `json:"edges"` } -type State struct { - Timestamp time.Time `json:"timestamp"` // 时间戳 - Status string `json:"status"` // 状态 - FaultTolerance string `json:"faultTolerance"` // 容灾度(仅布局调整后) - Redundancy string `json:"redundancy"` // 冗余度(仅布局调整后) - AvgAccessCost float64 `json:"avgAccessCost"` // 平均访问开销(仅布局调整前) - BlockDistributions []BlockDist `json:"blockDistributions"` // 块分布情况 +type DistNode struct { + ID string `json:"id"` + ComboID string `json:"comboID"` + Label string `json:"label"` + NodeType string `json:"nodeType"` } -type BlockDist struct { - StorageID cdssdk.StorageID `json:"storageID"` // 中心ID - Blocks []Block `json:"blocks"` // 该中心的所有块 +type Combo struct { + ID string `json:"id"` + Label string `json:"label"` + ParentId string `json:"parentId"` + ComboType string `json:"comboType"` } -type Block struct { - Type string `json:"type"` // 块类型 - Index string `json:"index"` // 块编号 - ID string `json:"id"` // 块ID +type DistEdge struct { + Source string `json:"source"` + Target string `json:"target"` } -type Relationship struct { - Status Status `json:"status"` // 连线左侧的状态 - SourceStorageID string `json:"sourceStorageID"` // 源存储节点 ID - TargetStorageID string `json:"targetStorageID"` // 目标存储节点 ID - DataTransferCount string `json:"dataTransferCount"` // 数据传输量 - Timestamp time.Time `json:"timestamp"` // 变化结束时间戳 -} - -//数据库结构定义 - -func (HubRequest) TableName() string { - return "HubRequest" -} - -func (Object) TableName() string { - return "Object" -} - -func (BlockDistribution) TableName() string { - return "BlockDistribution" -} - -type StorageTransferCount struct { - RelationshipID RelationshipID `gorm:"column:RelationshipID; primaryKey; type:bigint; autoIncrement" json:"relationshipID"` - Status Status `gorm:"column:Status; type:tinyint; not null" json:"status"` - SourceStorageID cdssdk.StorageID `gorm:"column:SourceStorageID; type:bigint; not null" json:"sourceStorageID"` - TargetStorageID cdssdk.StorageID `gorm:"column:TargetStorageID; type:bigint; not null" json:"targetStorageID"` - DataTransferCount int64 `gorm:"column:DataTransferCount; type:bigint; not null" json:"dataTransferCount"` - Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` -} - -func (StorageTransferCount) TableName() string { - return "StorageTransferCount" -} +// 对象块分布结构 +//type ObjectDistribution struct { +// ObjectID cdssdk.ObjectID `json:"objectID"` // 对象 ID +// PackageID cdssdk.PackageID `json:"packageID"` // 包 ID +// Path string `json:"path"` // 路径 +// Size int64 `json:"size"` // 大小 +// FileHash string `json:"fileHash"` // 文件哈希 +// States []State `json:"states"` // 各阶段状态信息(只需要传1、2、3阶段) +// Relationships []Relationship `json:"relationships"` // 节点间传输量 +// Timestamp time.Time `json:"timestamp"` // 请求中的时间戳 +//} +// +//type State struct { +// Timestamp time.Time `json:"timestamp"` // 时间戳 +// Status string `json:"status"` // 状态 +// FaultTolerance string `json:"faultTolerance"` // 容灾度(仅布局调整后) +// Redundancy string `json:"redundancy"` // 冗余度(仅布局调整后) +// AvgAccessCost float64 `json:"avgAccessCost"` // 平均访问开销(仅布局调整前) +// BlockDistributions []BlockDist `json:"blockDistributions"` // 块分布情况 +//} +// +//type BlockDist struct { +// StorageID cdssdk.StorageID `json:"storageID"` // 中心ID +// Blocks []Block `json:"blocks"` // 该中心的所有块 +//} +// +//type Block struct { +// Type string `json:"type"` // 块类型 +// Index string `json:"index"` // 块编号 +// ID string `json:"id"` // 块ID +//} +// +//type Relationship struct { +// Status Status `json:"status"` // 连线左侧的状态 +// SourceStorageID string `json:"sourceStorageID"` // 源存储节点 ID +// TargetStorageID string `json:"targetStorageID"` // 目标存储节点 ID +// DataTransferCount string `json:"dataTransferCount"` // 数据传输量 +// Timestamp time.Time `json:"timestamp"` // 变化结束时间戳 +//} diff --git a/datamap/internal/models/object.go b/datamap/internal/models/object.go index 6fa8747..b2c8855 100644 --- a/datamap/internal/models/object.go +++ b/datamap/internal/models/object.go @@ -4,7 +4,6 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gorm.io/gorm" - "log" "time" ) @@ -21,50 +20,52 @@ type Object struct { Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` } -// BlockTransferRepository 块传输记录的 Repository -type BlockTransferRepository struct { +func (Object) TableName() string { + return "object" +} + +// ObjectRepository 块传输记录的 Repository +type ObjectRepository struct { repo *GormRepository } -// NewBlockTransferRepository 创建 BlockTransferRepository 实例 -func NewBlockTransferRepository(db *gorm.DB) *BlockTransferRepository { - return &BlockTransferRepository{repo: NewGormRepository(db)} +// NewObjectRepository 创建 ObjectRepository 实例 +func NewObjectRepository(db *gorm.DB) *ObjectRepository { + return &ObjectRepository{repo: NewGormRepository(db)} } -// CreateBlockTransfer 创建块传输记录 -func (r *BlockTransferRepository) CreateBlockTransfer(transfer *Object) error { - return r.repo.Create(transfer) +// CreateObject 创建块传输记录 +func (r *ObjectRepository) CreateObject(object *Object) error { + return r.repo.Create(object) } -// GetAllBlockTransfers 获取所有块传输记录 -func (r *BlockTransferRepository) GetAllBlockTransfers() ([]Object, error) { - var transfers []Object - err := r.repo.GetAll(&transfers) +// GetObjectByID 查询单个Object +func (r *ObjectRepository) GetObjectByID(objectID int64) (*Object, error) { + var object Object + query := "SELECT * FROM object WHERE ObjectID = ?" + err := r.repo.db.Raw(query, objectID).First(&object).Error if err != nil { return nil, err } - return transfers, nil + return &object, nil } -// ProcessBlockTransInfo 处理 BlockTransInfo 数据 -func ProcessBlockTransInfo(data stgmod.Object) { - repo := NewBlockTransferRepository(db) - //TODO 数据待调整 - transfer := &Object{ - ObjectID: 0, - PackageID: 0, - Path: data.Path, - Size: 0, - FileHash: "", - Status: 0, - FaultTolerance: 0, - Redundancy: 0, - AvgAccessCost: 0, - Timestamp: time.Time{}, - } - err := repo.CreateBlockTransfer(transfer) +// UpdateObject 更新块传输记录 +func (r *ObjectRepository) UpdateObject(object *Object) error { + return r.repo.Update(object) +} + +// GetAllObjects 获取所有块传输记录 +func (r *ObjectRepository) GetAllObjects() ([]Object, error) { + var objects []Object + err := r.repo.GetAll(&objects) if err != nil { - log.Printf("Failed to create block transfer record: %v", err) + return nil, err } + return objects, nil +} + +// ProcessObject 处理 Object 数据 +func ProcessObject(data stgmod.ObjectChange) { } diff --git a/datamap/internal/models/hubs.go b/datamap/internal/models/storageinfo.go similarity index 66% rename from datamap/internal/models/hubs.go rename to datamap/internal/models/storageinfo.go index a28cb9b..9f9dbeb 100644 --- a/datamap/internal/models/hubs.go +++ b/datamap/internal/models/storageinfo.go @@ -9,30 +9,17 @@ import ( "time" ) -var db *gorm.DB - -func InitDB(gormDB *gorm.DB) { - db = gormDB -} - -type Hub struct { - HubID cdssdk.HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"` - Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` - Address cdssdk.HubAddressInfo `gorm:"column:Address; type:json; serializer:union" json:"address"` -} - -type HubStat struct { - StorageID int `json:"storageID"` - DataCount int `json:"dataCount"` -} - type Storage struct { StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint; autoIncrement" json:"storageID"` + StorageName string `gorm:"column:StorageName; type:varchar(1024); not null" json:"storageName"` HubID cdssdk.HubID `gorm:"column:HubID; type:bigint; not null" json:"hubID"` DataCount int64 `gorm:"column:DataCount; type:bigint; not null" json:"dataCount"` NewDataCount int64 `gorm:"column:NewDataCount; type:bigint; not null" json:"newDataCount"` Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` } + +func (Storage) TableName() string { return "storage" } + type StorageRepository struct { repo *GormRepository } @@ -49,7 +36,7 @@ func (r *StorageRepository) UpdateStorage(storage *Storage) error { return r.repo.Update(storage) } -func (r *StorageRepository) GetStorageByID(id int) (*Storage, error) { +func (r *StorageRepository) GetStorageByID(id int64) (*Storage, error) { var storage Storage err := r.repo.GetByID(uint(id), &storage) if err != nil { @@ -58,6 +45,16 @@ func (r *StorageRepository) GetStorageByID(id int) (*Storage, error) { return &storage, nil } +func (r *StorageRepository) GetStorageByHubID(hubId int64) (*Storage, error) { + var storage Storage + query := "SELECT * FROM storage WHERE HubID = ?" + err := r.repo.db.Raw(query, hubId).Scan(&storage).Error + if err != nil { + return nil, err + } + return &storage, nil +} + func (r *StorageRepository) GetAllStorages() ([]Storage, error) { var storages []Storage err := r.repo.GetAll(&storages) @@ -67,8 +64,10 @@ func (r *StorageRepository) GetAllStorages() ([]Storage, error) { return storages, nil } -func ProcessHubStat(data stgmod.HubStat) { - repo := NewStorageRepository(db) +//ProcessHubStat mq推送各节点统计自身当前的总数据量时的处理逻辑 + +func ProcessStorageInfo(data stgmod.StorageStats) { + repo := NewStorageRepository(DB) storage, err := repo.GetStorageByID(data.Body.StorageID) if err != nil { @@ -76,7 +75,7 @@ func ProcessHubStat(data stgmod.HubStat) { // 插入新记录 newStorage := &Storage{ StorageID: cdssdk.StorageID(data.Body.StorageID), - DataCount: int64(data.Body.DataCount), + DataCount: data.Body.DataCount, NewDataCount: 0, } repo.CreateStorage(newStorage) @@ -85,9 +84,12 @@ func ProcessHubStat(data stgmod.HubStat) { } } else { // 更新记录 - newDataCount := int64(data.Body.DataCount) - storage.DataCount - storage.DataCount = int64(data.Body.DataCount) + newDataCount := data.Body.DataCount - storage.DataCount + storage.DataCount = data.Body.DataCount storage.NewDataCount = newDataCount - repo.UpdateStorage(storage) + err := repo.UpdateStorage(storage) + if err != nil { + log.Printf("Error update storage: %v", err) + } } } diff --git a/datamap/internal/mq/mq.go b/datamap/internal/mq/mq.go index ba32c6a..53240fd 100644 --- a/datamap/internal/mq/mq.go +++ b/datamap/internal/mq/mq.go @@ -29,9 +29,6 @@ func listenQueues(conn *amqp.Connection) { "datamap_hubtransfer", "datamap_blocktransfer", "datamap_blockdistribution", - "datamap_objectchange", - "datamap_packagechange", - "datamap_bucketchange", } for _, queue := range queues { @@ -58,22 +55,60 @@ func listenQueues(conn *amqp.Connection) { func processMessage(queue string, body []byte) { switch queue { + case "datamap_hubinfo": + var data stgmod.HubInfo + + if err := json.Unmarshal(body, &data); err != nil { + log.Printf("Failed to unmarshal HubInfo: %v, body: %s", err, body) + return + } + models.ProcessHubInfo(data) case "datamap_storageinfo": - var data stgmod.HubStat - json.Unmarshal(body, &data) - models.ProcessHubStat(data) - case "datamap_hubtransfer": - var data stgmod.HubTrans - json.Unmarshal(body, &data) - models.ProcessHubTrans(data) + var data stgmod.StorageInfo + if err := json.Unmarshal(body, &data); err != nil { + log.Printf("Failed to unmarshal StorageInfo: %v, body: %s", err, body) + return + } + //models.ProcessStorageInfo(data) + case "datamap_storagestats": + var data stgmod.StorageStats + if err := json.Unmarshal(body, &data); err != nil { + log.Printf("Failed to unmarshal StorageStats: %v, body: %s", err, body) + return + } + //models.ProcessStorageInfo(data) + case "datamap_hubtransferstats": + var data stgmod.HubTransferStats + err := json.Unmarshal(body, &data) + if err != nil { + log.Printf("Failed to unmarshal HubTransferStats: %v, body: %s", err, body) + return + } + models.ProcessHubTransfer(data) + case "datamap_hubstoragetransferstats": + var data stgmod.HubStorageTransferStats + err := json.Unmarshal(body, &data) + if err != nil { + log.Printf("Failed to unmarshal HubStorageTransferStats: %v, body: %s", err, body) + return + } + //models.ProcessHubTransfer(data) case "datamap_blocktransfer": - var data stgmod.Object - json.Unmarshal(body, &data) - models.ProcessBlockTransInfo(data) + var data stgmod.BlockTransfer + err := json.Unmarshal(body, &data) + if err != nil { + log.Printf("Failed to unmarshal BlockTransfer: %v, body: %s", err, body) + return + } + models.ProcessBlockTransfer(data) case "datamap_blockdistribution": - var data stgmod.BlockTransInfo - json.Unmarshal(body, &data) - models.ProcessBlockDistributionInfo(data) + var data stgmod.BlockDistribution + err := json.Unmarshal(body, &data) + if err != nil { + log.Printf("Failed to unmarshal BlockDistribution: %v, body: %s", err, body) + return + } + models.ProcessBlockDistribution(data) default: log.Printf("Unknown queue: %s", queue) } diff --git a/datamap/internal/server/server.go b/datamap/internal/server/server.go index d2812c7..f3233cb 100644 --- a/datamap/internal/server/server.go +++ b/datamap/internal/server/server.go @@ -11,9 +11,10 @@ import ( func StartServer(db *gorm.DB, mq *amqp.Connection) { r := gin.Default() + handlers.SetDB(db) // 注册HTTP接口 - r.GET("/storage", handlers.GetStorageData) - r.GET("/block-distribution", handlers.GetBlockDistributionData) + r.GET("/hubInfo", handlers.GetHubInfo) + r.GET("/dataTransfer/:objectID", handlers.GetDataTransfer) // 启动服务 if err := r.Run(":8080"); err != nil { diff --git a/datamap/main.go b/datamap/main.go index abe8755..da0ce32 100644 --- a/datamap/main.go +++ b/datamap/main.go @@ -3,6 +3,7 @@ package main import ( "gitlink.org.cn/cloudream/storage/datamap/internal/config" "gitlink.org.cn/cloudream/storage/datamap/internal/db" + "gitlink.org.cn/cloudream/storage/datamap/internal/models" "gitlink.org.cn/cloudream/storage/datamap/internal/mq" "gitlink.org.cn/cloudream/storage/datamap/internal/server" "log" @@ -18,6 +19,8 @@ func main() { log.Fatalf("Failed to initialize database: %v", err) } + models.InitDB(dbConn) + // 初始化RabbitMQ mqConn, err := mq.InitMQ(cfg.RabbitMQ) if err != nil { diff --git a/go.mod b/go.mod index 4c44652..4e4e0fd 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.47 github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0 github.com/gin-gonic/gin v1.7.7 - github.com/go-sql-driver/mysql v1.7.1 + github.com/go-sql-driver/mysql v1.8.1 github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.9+incompatible github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf @@ -27,10 +27,12 @@ require ( gitlink.org.cn/cloudream/common v0.0.0 google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 - gorm.io/gorm v1.25.7 + gorm.io/datatypes v1.2.5 + gorm.io/gorm v1.25.11 ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect @@ -99,11 +101,11 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/crypto v0.21.0 // indirect + golang.org/x/crypto v0.22.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.6.0 - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect diff --git a/go.sum b/go.sum index 2cee4dd..ac97be9 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= @@ -58,12 +60,16 @@ github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn github.com/go-playground/validator/v10 v10.8.0 h1:1kAa0fCrnpv+QYdkdcRzrRM7AyYs5o8+jZdJCz9xj6k= github.com/go-playground/validator/v10 v10.8.0/go.mod h1:9JhgTzTaE31GZDpH/HSvHiRJrJ3iKAgqqH0Bl/Ocjdk= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= -github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= -github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= +github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -96,6 +102,14 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s= github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= +github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jedib0t/go-pretty/v6 v6.4.7 h1:lwiTJr1DEkAgzljsUsORmWsVn5MQjt1BPJdPCtJ6KXE= github.com/jedib0t/go-pretty/v6 v6.4.7/go.mod h1:Ndk3ase2CkQbXLLNf5QDHoYb6J9WtVfmHZu9n8rk2xs= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -130,6 +144,10 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= +github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/microsoft/go-mssqldb v1.7.2 h1:CHkFJiObW7ItKTJfHo1QX7QBBD1iV+mn1eOyRP3b/PA= +github.com/microsoft/go-mssqldb v1.7.2/go.mod h1:kOvZKUdrhhFQmxLZqbwUV0rHkNkZpthMITIb2Ko1IoA= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= @@ -227,8 +245,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -256,8 +274,8 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -297,7 +315,16 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/datatypes v1.2.5 h1:9UogU3jkydFVW1bIVVeoYsTpLRgwDVW3rHfJG6/Ek9I= +gorm.io/datatypes v1.2.5/go.mod h1:I5FUdlKpLb5PMqeMQhm30CQ6jXP8Rj89xkTeCSAaAD4= gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo= gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM= -gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A= +gorm.io/driver/postgres v1.5.0 h1:u2FXTy14l45qc3UeCJ7QaAXZmZfDDv0YrthvmRq1l0U= +gorm.io/driver/postgres v1.5.0/go.mod h1:FUZXzO+5Uqg5zzwzv4KK49R8lvGIyscBOqYrtI1Ce9A= +gorm.io/driver/sqlite v1.4.3 h1:HBBcZSDnWi5BW3B3rwvVTc510KGkBkexlOg0QrmLUuU= +gorm.io/driver/sqlite v1.4.3/go.mod h1:0Aq3iPO+v9ZKbcdiz8gLWRw5VOPcBOPUQJFLq5e2ecI= +gorm.io/driver/sqlserver v1.5.4 h1:xA+Y1KDNspv79q43bPyjDMUgHoYHLhXYmdFcYPobg8g= +gorm.io/driver/sqlserver v1.5.4/go.mod h1:+frZ/qYmuna11zHPlh5oc2O6ZA/lS88Keb0XSH1Zh/g= gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg= +gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=