| @@ -53,14 +53,16 @@ func (svc *HubService) GetHubs(hubIDs []cdssdk.HubID) ([]cdssdk.Hub, error) { | |||
| func (svc *HubService) GetHubStat(hubID cdssdk.HubID) (stgmod.HubStat, error) { | |||
| datamapCli, err := stgglb.DatamapMQPool.Acquire() | |||
| if err != nil { | |||
| return stgmod.HubStat{}, fmt.Errorf("new coordinator client: %w", err) | |||
| return stgmod.HubStat{}, fmt.Errorf("new datamap client: %w", err) | |||
| } | |||
| defer stgglb.DatamapMQPool.Release(datamapCli) | |||
| getResp, err := datamapCli.GetHubStat(datamapmq.NewGetHubStat(hubID)) | |||
| if err != nil { | |||
| return stgmod.HubStat{}, fmt.Errorf("requesting to coordinator: %w", err) | |||
| return stgmod.HubStat{}, fmt.Errorf("requesting to datamap: %w", err) | |||
| } | |||
| return getResp.HubStat, nil | |||
| } | |||
| // | |||
| @@ -52,5 +52,6 @@ type GetHubTransResp struct { | |||
| } | |||
| func (client *Client) GetHubTrans(msg *GetHubTrans) (*GetHubTransResp, error) { | |||
| //获取到传输统计之后 | |||
| return mq.Request(Service.GetHubTrans, client.rabbitCli, msg) | |||
| } | |||
| @@ -1 +1,42 @@ | |||
| package db | |||
| import "gitlink.org.cn/cloudream/storage/datamap/internal/models" | |||
| type BlockDistributionDB struct { | |||
| *DB | |||
| } | |||
| func (db *DB) BlockDistribution() *BlockDistributionDB { | |||
| return &BlockDistributionDB{DB: db} | |||
| } | |||
| // GetAllBlockDistribution 查询所有BlockDistribution列表 | |||
| func (*HubDB) GetAllBlockDistribution(ctx SQLContext) ([]models.BlockDistribution, error) { | |||
| var ret []models.BlockDistribution | |||
| err := ctx.Table("blockdistribution").Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // GetBlockDistribution 根据输入的BlockID查询BlockDistribution | |||
| func (*HubDB) GetBlockDistribution(ctx SQLContext, BlockID int64) (models.BlockDistribution, error) { | |||
| var ret models.BlockDistribution | |||
| err := ctx.Table("blockdistribution").Where("BlockID = ?", BlockID).Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // CreateBlockDistribution 根据输入的BlockDistribution信息创建BlockDistribution记录 | |||
| func (*HubDB) CreateBlockDistribution(ctx SQLContext, blockDistribution models.BlockDistribution) (*models.BlockDistribution, error) { | |||
| err := ctx.Table("blockdistribution").Create(&blockDistribution).Error | |||
| return &blockDistribution, err | |||
| } | |||
| // DeleteBlockDistribution 根据输入的BlockID删除BlockDistribution记录 | |||
| func (*HubDB) DeleteBlockDistribution(ctx SQLContext, BlockID int64) error { | |||
| return ctx.Table("blockdistribution").Where("BlockID = ?", BlockID).Delete(&models.BlockDistribution{}).Error | |||
| } | |||
| // UpdateBlockDistribution 根据输入的BlockDistribution信息更新BlockDistribution记录 | |||
| func (*HubDB) UpdateBlockDistribution(ctx SQLContext, blockDistribution models.BlockDistribution) error { | |||
| return ctx.Table("blockdistribution").Where("BlockID = ?", blockDistribution.BlockID).Updates(&blockDistribution).Error | |||
| } | |||
| @@ -1,9 +1,6 @@ | |||
| package db | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| stgmod "gitlink.org.cn/cloudream/storage/common/models" | |||
| datamapmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/datamap" | |||
| "gitlink.org.cn/cloudream/storage/datamap/internal/models" | |||
| ) | |||
| @@ -15,15 +12,41 @@ func (db *DB) Hub() *HubDB { | |||
| return &HubDB{DB: db} | |||
| } | |||
| func (*HubDB) GetAllHubs(ctx SQLContext) ([]models.Hub, error) { | |||
| // GetHubs 获取所有hub列表 | |||
| func (*HubDB) GetHubs(ctx SQLContext) ([]models.Hub, error) { | |||
| var ret []models.Hub | |||
| err := ctx.Table("Hub").Find(&ret).Error | |||
| err := ctx.Table("hub").Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| func (*HubDB) GetHubStat(msg *datamapmq.GetHubStat) (*datamapmq.GetHubStatResp, *mq.CodeMessage) { | |||
| //todo 数据库操作 | |||
| // GetHub 根据hubId获取该Hub的信息 | |||
| func (*HubDB) GetHub(ctx SQLContext, hubID int64) (*models.Hub, error) { | |||
| var hub models.Hub | |||
| err := ctx.Table("hub").Where("HubID = ?", hubID).Find(&hub).Error | |||
| return &hub, err | |||
| } | |||
| // DeleteHub 根据hubId删除该Hub | |||
| func (*HubDB) DeleteHub(ctx SQLContext, hubID int64) error { | |||
| err := ctx.Table("hub").Where("HubID = ?", hubID).Delete(&models.Hub{}).Error | |||
| return err | |||
| } | |||
| // UpdateHub 根据输入hub信息更新Hub信息 | |||
| func (*HubDB) UpdateHub(ctx SQLContext, hub models.Hub) (*models.Hub, error) { | |||
| err := ctx.Table("hub").Where("HubID = ?", hub.HubID).Updates(&hub).Error | |||
| return &hub, err | |||
| } | |||
| // CreateHub 根据输入hub信息创建Hub信息 | |||
| func (*HubDB) CreateHub(ctx SQLContext, hub models.Hub) (*models.Hub, error) { | |||
| err := ctx.Table("hub").Create(&hub).Error | |||
| return &hub, err | |||
| } | |||
| return mq.ReplyOK(datamapmq.NewGetHubStatResp(stgmod.HubStat{})) | |||
| // IsHubExist 根据hubId查询hub是否存在 | |||
| func (*HubDB) IsHubExist(ctx SQLContext, hubID int64) (bool, error) { | |||
| var count int64 | |||
| err := ctx.Table("hub").Where("HubID = ?", hubID).Count(&count).Error | |||
| return count > 0, err | |||
| } | |||
| @@ -1 +1,24 @@ | |||
| package db | |||
| import "gitlink.org.cn/cloudream/storage/datamap/internal/models" | |||
| type HubReqDB struct { | |||
| *DB | |||
| } | |||
| func (db *DB) HubReq() *HubReqDB { | |||
| return &HubReqDB{DB: db} | |||
| } | |||
| // GetHubRequest 获取所有hubrequest列表 | |||
| func (*HubReqDB) GetHubRequest(ctx SQLContext) ([]models.HubRequest, error) { | |||
| var ret []models.HubRequest | |||
| err := ctx.Table("hubrequest").Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // CreateHubRequest 根据输入的HubRequest信息创建HubRequest信息 | |||
| func (*HubReqDB) CreateHubRequest(ctx SQLContext, hubRequest models.HubRequest) (*models.HubRequest, error) { | |||
| err := ctx.Table("hubrequest").Create(&hubRequest).Error | |||
| return &hubRequest, err | |||
| } | |||
| @@ -1 +1,36 @@ | |||
| package db | |||
| import "gitlink.org.cn/cloudream/storage/datamap/internal/models" | |||
| type ObjectDB struct { | |||
| *DB | |||
| } | |||
| func (db *DB) Object() *ObjectDB { | |||
| return &ObjectDB{DB: db} | |||
| } | |||
| // GetAllObject 查询所有Object列表 | |||
| func (*ObjectDB) GetAllObject(ctx SQLContext) ([]models.Object, error) { | |||
| var ret []models.Object | |||
| err := ctx.Table("object").Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // GetObject 根据输入的ObjectId查询Object | |||
| func (*ObjectDB) GetObject(ctx SQLContext, objectId int64) (models.Object, error) { | |||
| var ret models.Object | |||
| err := ctx.Table("object").Where("ObjectID = ?", objectId).Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // DeleteObject 根据输入的ObjectId删除Object | |||
| func (*ObjectDB) DeleteObject(ctx SQLContext, objectId int64) error { | |||
| return ctx.Table("object").Where("ObjectID = ?", objectId).Delete(&models.Object{}).Error | |||
| } | |||
| // UpdateObject 根据输入的Object信息更新Object | |||
| func (*ObjectDB) UpdateObject(ctx SQLContext, object models.Object) error { | |||
| return ctx.Table("object").Where("ObjectID = ?", object.ObjectID).Updates(&object).Error | |||
| } | |||
| @@ -1 +1,36 @@ | |||
| package db | |||
| import "gitlink.org.cn/cloudream/storage/datamap/internal/models" | |||
| type StorageDB struct { | |||
| *DB | |||
| } | |||
| func (db *DB) Storage() *StorageDB { | |||
| return &StorageDB{DB: db} | |||
| } | |||
| // GetAllStorage 查询所有Storage列表 | |||
| func (*HubDB) GetAllStorage(ctx SQLContext) ([]models.Storage, error) { | |||
| var ret []models.Storage | |||
| err := ctx.Table("storage").Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // GetStorage 根据输入的StorageId查询Storage | |||
| func (*HubDB) GetStorage(ctx SQLContext, storageId int64) (models.Storage, error) { | |||
| var ret models.Storage | |||
| err := ctx.Table("storage").Where("StorageID = ?", storageId).Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // DeleteStorage 根据输入的StorageId删除Storage | |||
| func (*HubDB) DeleteStorage(ctx SQLContext, storageId int64) error { | |||
| return ctx.Table("storage").Where("StorageID = ?", storageId).Delete(&models.Storage{}).Error | |||
| } | |||
| // UpdateStorage 根据输入的Storage信息更新Storage | |||
| func (*HubDB) UpdateStorage(ctx SQLContext, storage models.Storage) error { | |||
| return ctx.Table("storage").Where("StorageID = ?", storage.StorageID).Updates(&storage).Error | |||
| } | |||
| @@ -1 +1,36 @@ | |||
| package db | |||
| import "gitlink.org.cn/cloudream/storage/datamap/internal/models" | |||
| type StorageTransferCountDB struct { | |||
| *DB | |||
| } | |||
| func (db *DB) StorageTransferCount() *StorageTransferCountDB { | |||
| return &StorageTransferCountDB{DB: db} | |||
| } | |||
| // GetAllStorageTransferCount 查询所有Storage列表 | |||
| func (*HubDB) GetAllStorageTransferCount(ctx SQLContext) ([]models.StorageTransferCount, error) { | |||
| var ret []models.StorageTransferCount | |||
| err := ctx.Table("storagetransfercount").Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // GetStorageTransferCount 根据输入的RelationshipID查询StorageTransferCount | |||
| func (*HubDB) GetStorageTransferCount(ctx SQLContext, RelationshipID int64) (models.Storage, error) { | |||
| var ret models.Storage | |||
| err := ctx.Table("storagetransfercount").Where("RelationshipID = ?", RelationshipID).Find(&ret).Error | |||
| return ret, err | |||
| } | |||
| // DeleteStorageTransferCount 根据输入的RelationshipID删除StorageTransferCount | |||
| func (*HubDB) DeleteStorageTransferCount(ctx SQLContext, RelationshipID int64) error { | |||
| return ctx.Table("storagetransfercount").Where("RelationshipID = ?", RelationshipID).Delete(&models.Storage{}).Error | |||
| } | |||
| // UpdateStorageTransferCount 根据输入的StorageTransferCount信息更新StorageTransferCount | |||
| func (*HubDB) UpdateStorageTransferCount(ctx SQLContext, storageTransferCount models.StorageTransferCount) error { | |||
| return ctx.Table("storagetransfercount").Where("RelationshipID = ?", storageTransferCount.RelationshipID).Updates(&storageTransferCount).Error | |||
| } | |||
| @@ -0,0 +1,22 @@ | |||
| package mq | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| datamapmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/datamap" | |||
| ) | |||
| func (svc *Service) GetHubStat(msg *datamapmq.GetHubStat) (*datamapmq.GetHubStatResp, *mq.CodeMessage) { | |||
| logger.WithField("HubID", msg.HubID) | |||
| //从datamapmq队列接收数据到stgmod.HubStat | |||
| //从datamapmq队列接收数据到stgmod.HubStat,然后将该数据和mysql数据库中获取到的models.Storage做对比,用stgmode | |||
| return &datamapmq.GetHubStatResp{}, nil | |||
| } | |||
| func (svc *Service) GetHubTrans(msg *datamapmq.GetHubTrans) (*datamapmq.GetHubTransResp, *mq.CodeMessage) { | |||
| logger.WithField("HubID", msg.HubID) | |||
| return &datamapmq.GetHubTransResp{}, nil | |||
| } | |||
| @@ -0,0 +1,15 @@ | |||
| package mq | |||
| import ( | |||
| "gorm.io/gorm" | |||
| ) | |||
| type Service struct { | |||
| db *gorm.DB | |||
| } | |||
| func NewService(db *gorm.DB) *Service { | |||
| return &Service{ | |||
| db: db, | |||
| } | |||
| } | |||