Browse Source

#2624

update
tags/v1.22.8.2^2
chenyifan01 3 years ago
parent
commit
9c8e0dacdd
8 changed files with 429 additions and 11 deletions
  1. +34
    -0
      models/cloudbrain.go
  2. +118
    -5
      models/resource_queue.go
  3. +60
    -2
      models/resource_specification.go
  4. +33
    -0
      modules/grampus/resty.go
  5. +33
    -3
      routers/admin/resources.go
  6. +3
    -0
      routers/routes/routes.go
  7. +67
    -0
      services/cloudbrain/resource/resource_queue.go
  8. +81
    -1
      services/cloudbrain/resource/resource_specification.go

+ 34
- 0
models/cloudbrain.go View File

@@ -1299,6 +1299,34 @@ type GrampusSpec struct {
Name string `json:"name"`
ProcessorType string `json:"processorType"`
Centers []Center `json:"centers"`
SpecInfo SpecInfo `json:"specInfo"`
}

type GrampusAiCenter struct {
AccDevices []GrampusAccDevice `json:"accDevices"`
Id string `json:"id"`
Name string `json:"name"`
Resource []GrampusCenterResource `json:"resource"`
}

type GrampusAccDevice struct {
Kind string `json:"kind"` //加速卡类别, npu.huawei.com/NPU,nvidia.com/gpu,cambricon.com/mlu
Model string `json:"model"` //加速卡型号
}

type GrampusCenterResource struct {
Allocated string `json:"allocated"`
Capacity string `json:"capacity"`
Name string `json:"name"`
}

type SpecInfo struct {
AccDeviceKind string `json:"accDeviceKind"`
AccDeviceMemory string `json:"accDeviceMemory"`
AccDeviceModel string `json:"accDeviceModel"`
AccDeviceNum int `json:"accDeviceNum"`
CpuCoreNum int `json:"cpuCoreNum"`
MemorySize string `json:"memorySize"`
}

type GetGrampusResourceSpecsResult struct {
@@ -1306,6 +1334,12 @@ type GetGrampusResourceSpecsResult struct {
Infos []GrampusSpec `json:"resourceSpecs"`
}

type GetGrampusAiCentersResult struct {
GrampusResult
Infos []GrampusAiCenter `json:"aiCenterInfos"`
TotalSize int `json:"totalSize"`
}

type GrampusImage struct {
CreatedAt int64 `json:"createdAt"`
UpdatedAt int64 `json:"updatedAt"`


+ 118
- 5
models/resource_queue.go View File

@@ -2,6 +2,7 @@ package models

import (
"code.gitea.io/gitea/modules/timeutil"
"strings"
"xorm.io/builder"
)

@@ -10,11 +11,13 @@ type ResourceQueue struct {
QueueCode string
Cluster string `xorm:"notnull"`
AiCenterCode string
AiCenterName string
ComputeResource string
AccCardType string
CardsTotalNum int
IsAutomaticSync bool
Remark string
DeletedTime timeutil.TimeStamp `xorm:"deleted"`
CreatedTime timeutil.TimeStamp `xorm:"created"`
CreatedBy int64
UpdatedTime timeutil.TimeStamp `xorm:"updated"`
@@ -27,6 +30,7 @@ func (r ResourceQueue) ConvertToRes() *ResourceQueueRes {
QueueCode: r.QueueCode,
Cluster: r.Cluster,
AiCenterCode: r.AiCenterCode,
AiCenterName: r.AiCenterName,
ComputeResource: r.ComputeResource,
AccCardType: r.AccCardType,
CardsTotalNum: r.CardsTotalNum,
@@ -48,18 +52,26 @@ type ResourceQueueReq struct {
}

func (r ResourceQueueReq) ToDTO() ResourceQueue {
return ResourceQueue{
q := ResourceQueue{
QueueCode: r.QueueCode,
Cluster: r.Cluster,
AiCenterCode: r.AiCenterCode,
ComputeResource: r.ComputeResource,
AccCardType: r.AccCardType,
ComputeResource: strings.ToUpper(r.ComputeResource),
AccCardType: strings.ToUpper(r.AccCardType),
CardsTotalNum: r.CardsTotalNum,
IsAutomaticSync: r.IsAutomaticSync,
Remark: r.Remark,
CreatedBy: r.CreatorId,
UpdatedBy: r.CreatorId,
}
if r.Cluster == OpenICluster {
if r.AiCenterCode == AICenterOfCloudBrainOne {
q.AiCenterName = "云脑一"
} else if r.AiCenterCode == AICenterOfCloudBrainTwo {
q.AiCenterName = "云脑二"
}
}
return q
}

type SearchResourceQueueOptions struct {
@@ -82,6 +94,11 @@ type ResourceQueueCodesRes struct {
AiCenterCode string
}

type ResourceAiCenterRes struct {
AiCenterCode string
AiCenterName string
}

type GetQueueCodesOptions struct {
Cluster string
}
@@ -102,6 +119,7 @@ type ResourceQueueRes struct {
QueueCode string
Cluster string
AiCenterCode string
AiCenterName string
ComputeResource string
AccCardType string
CardsTotalNum int
@@ -134,13 +152,13 @@ func SearchResourceQueue(opts SearchResourceQueueOptions) (int64, []ResourceQueu
if opts.AccCardType != "" {
cond = cond.And(builder.Eq{"acc_card_type": opts.AccCardType})
}
n, err := x.Where(cond).Count(&ResourceQueue{})
n, err := x.Where(cond).Unscoped().Count(&ResourceQueue{})
if err != nil {
return 0, nil, err
}

r := make([]ResourceQueue, 0)
err = x.Where(cond).Desc("id").Limit(opts.PageSize, (opts.Page-1)*opts.PageSize).Find(&r)
err = x.Where(cond).Desc("id").Limit(opts.PageSize, (opts.Page-1)*opts.PageSize).Unscoped().Find(&r)
if err != nil {
return 0, nil, err
}
@@ -161,3 +179,98 @@ func GetResourceQueueCodes(opts GetQueueCodesOptions) ([]*ResourceQueueCodesRes,
}
return r, nil
}

func GetResourceQueue(r *ResourceQueue) (*ResourceQueue, error) {
has, err := x.Get(r)
if err != nil {
return nil, err
} else if !has {
return nil, nil
}
return r, nil
}

func ParseComputeResourceFormGrampus(grampusDeviceKind string) string {
t := strings.Split(grampusDeviceKind, "/")
if len(t) < 2 {
return ""
}
return strings.ToUpper(t[1])
}

func ParseMemSizeFromGrampus(grampusMemSize string) float32 {
return 0
}

func SyncGrampusQueues(updateList []ResourceQueue, insertList []ResourceQueue, existIds []int64) error {
sess := x.NewSession()
var err error
defer func() {
if err != nil {
sess.Rollback()
}
sess.Close()
}()

//delete queues that no longer exists
deleteQueueIds := make([]int64, 0)
queueCond := builder.NewCond()
queueCond = queueCond.And(builder.NotIn("resource_queue.id", existIds)).And(builder.Eq{"resource_queue.cluster": C2NetCluster})
if err := sess.Cols("resource_queue.id").Table("resource_queue").
Where(queueCond).Find(&deleteQueueIds); err != nil {
return err
}

if len(deleteQueueIds) > 0 {
if _, err = sess.In("id", deleteQueueIds).Update(&ResourceQueue{Remark: "自动同步时被下架"}); err != nil {
return err
}
if _, err = sess.In("id", deleteQueueIds).Delete(&ResourceQueue{}); err != nil {
return err
}

//delete specs and scene that no longer exists
deleteSpcIds := make([]int64, 0)
if err := sess.Cols("resource_specification.id").Table("resource_specification").
In("queue_id", deleteQueueIds).Find(&deleteSpcIds); err != nil {
return err
}
if len(deleteSpcIds) > 0 {
if _, err = sess.In("id", deleteSpcIds).Update(&ResourceSpecification{Status: SpecOffShelf}); err != nil {
return err
}
if _, err = sess.In("spec_id", deleteSpcIds).Delete(&ResourceSceneSpec{}); err != nil {
return err
}
}

}

//update exists specs
if len(updateList) > 0 {
for _, v := range updateList {
if _, err = sess.ID(v.ID).Update(&v); err != nil {
return err
}
}

}

//insert new specs
if len(insertList) > 0 {
if _, err = sess.Insert(insertList); err != nil {
return err
}
}

return sess.Commit()
}

func GetResourceAiCenters() ([]ResourceAiCenterRes, error) {
r := make([]ResourceAiCenterRes, 0)
err := x.Table("resource_queue").Distinct("ai_center_code", "ai_center_name").Find(&r)
if err != nil {
return nil, err
}
return r, nil
}

+ 60
- 2
models/resource_specification.go View File

@@ -163,8 +163,9 @@ func SearchResourceSpecification(opts SearchResourceSpecificationOptions) (int64
if opts.Cluster != "" {
cond = cond.And(builder.Eq{"resource_queue.cluster": opts.Cluster})
}
//cond = cond.And(builder.Or(builder.Eq{"resource_queue.deleted_time": 0}).Or(builder.IsNull{"resource_queue.deleted_time"}))
n, err := x.Where(cond).Join("INNER", "resource_queue", "resource_queue.ID = resource_specification.queue_id").
Count(&ResourceSpecAndQueue{})
Unscoped().Count(&ResourceSpecAndQueue{})
if err != nil {
return 0, nil, err
}
@@ -174,7 +175,7 @@ func SearchResourceSpecification(opts SearchResourceSpecificationOptions) (int64
Join("INNER", "resource_queue", "resource_queue.ID = resource_specification.queue_id").
Desc("resource_specification.id").
Limit(opts.PageSize, (opts.Page-1)*opts.PageSize).
Find(&r)
Unscoped().Find(&r)
if err != nil {
return 0, nil, err
}
@@ -210,3 +211,60 @@ func ResourceSpecOffShelf(id int64) (int64, error) {
sess.Commit()
return n, err
}

func GetResourceSpecification(r *ResourceSpecification) (*ResourceSpecification, error) {
has, err := x.Get(r)
if err != nil {
return nil, err
} else if !has {
return nil, nil
}
return r, nil
}

func SyncGrampusSpecs(updateList []ResourceSpecification, insertList []ResourceSpecification, existIds []int64) error {
sess := x.NewSession()
var err error
defer func() {
if err != nil {
sess.Rollback()
}
sess.Close()
}()
//delete specs and scene that no longer exists
deleteIds := make([]int64, 0)
cond := builder.NewCond()
cond = cond.And(builder.NotIn("resource_specification.id", existIds)).And(builder.Eq{"resource_queue.cluster": C2NetCluster})
if err := sess.Cols("resource_specification.id").Table("resource_specification").
Where(cond).Join("INNER", "resource_queue", "resource_queue.id = resource_specification.queue_id").
Find(&deleteIds); err != nil {
return err
}
if len(deleteIds) > 0 {
if _, err = sess.In("id", deleteIds).Update(&ResourceSpecification{Status: SpecOffShelf}); err != nil {
return err
}
if _, err = sess.In("spec_id", deleteIds).Delete(&ResourceSceneSpec{}); err != nil {
return err
}
}

//update exists specs
if len(updateList) > 0 {
for _, v := range updateList {
if _, err = sess.ID(v.ID).Update(&v); err != nil {
return err
}
}

}

//insert new specs
if len(insertList) > 0 {
if _, err = sess.Insert(insertList); err != nil {
return err
}
}

return sess.Commit()
}

+ 33
- 0
modules/grampus/resty.go View File

@@ -23,6 +23,7 @@ const (
urlGetToken = urlOpenApiV1 + "token"
urlTrainJob = urlOpenApiV1 + "trainjob"
urlGetResourceSpecs = urlOpenApiV1 + "resourcespec"
urlGetAiCenter = urlOpenApiV1 + "sharescreen/aicenter"
urlGetImages = urlOpenApiV1 + "image"

errorIllegalToken = 1005
@@ -275,3 +276,35 @@ sendjob:

return &result, nil
}

func GetAiCenters(pageIndex, pageSize int) (*models.GetGrampusAiCentersResult, error) {
checkSetting()
client := getRestyClient()
var result models.GetGrampusAiCentersResult

retry := 0

sendjob:
_, err := client.R().
SetAuthToken(TOKEN).
SetResult(&result).
Get(HOST + urlGetAiCenter + "?pageIndex=" + fmt.Sprint(pageIndex) + "&pageSize=" + fmt.Sprint(pageSize))

if err != nil {
return nil, fmt.Errorf("resty GetAiCenters: %v", err)
}

if result.ErrorCode == errorIllegalToken && retry < 1 {
retry++
log.Info("retry get token")
_ = getToken()
goto sendjob
}

if result.ErrorCode != 0 {
log.Error("GetAiCenters failed(%d): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf("GetAiCenters failed(%d): %s", result.ErrorCode, result.ErrorMsg)
}

return &result, nil
}

+ 33
- 3
routers/admin/resources.go View File

@@ -44,7 +44,7 @@ func GetResourceQueueList(ctx *context.Context) {
computeResource := ctx.Query("resource")
accCardType := ctx.Query("card")
list, err := resource.GetResourceQueueList(models.SearchResourceQueueOptions{
ListOptions: models.ListOptions{Page: page, PageSize: 20},
ListOptions: models.ListOptions{Page: page, PageSize: 10},
Cluster: cluster,
AiCenterCode: aiCenterCode,
ComputeResource: computeResource,
@@ -69,6 +69,16 @@ func GetResourceQueueCodes(ctx *context.Context) {
ctx.JSON(http.StatusOK, response.SuccessWithData(list))
}

func GetResourceAiCenters(ctx *context.Context) {
list, err := resource.GetResourceAiCenters()
if err != nil {
log.Error("GetResourceAiCenters error.%v", err)
ctx.JSON(http.StatusOK, response.ServerError(err.Error()))
return
}
ctx.JSON(http.StatusOK, response.SuccessWithData(list))
}

func AddResourceQueue(ctx *context.Context, req models.ResourceQueueReq) {
req.IsAutomaticSync = false
req.CreatorId = ctx.User.ID
@@ -93,13 +103,23 @@ func UpdateResourceQueue(ctx *context.Context, req models.ResourceQueueReq) {
ctx.JSON(http.StatusOK, response.Success())
}

func SyncGrampusQueue(ctx *context.Context) {
err := resource.SyncGrampusQueue(ctx.User.ID)
if err != nil {
log.Error("AddResourceQueue error. %v", err)
ctx.JSON(http.StatusOK, response.ServerError(err.Error()))
return
}
ctx.JSON(http.StatusOK, response.Success())
}

func GetResourceSpecificationList(ctx *context.Context) {
page := ctx.QueryInt("page")
queue := ctx.QueryInt64("queue")
status := ctx.QueryInt("status")
cluster := ctx.Query("cluster")
list, err := resource.GetResourceSpecificationList(models.SearchResourceSpecificationOptions{
ListOptions: models.ListOptions{Page: page, PageSize: 20},
ListOptions: models.ListOptions{Page: page, PageSize: 10},
QueueId: queue,
Status: status,
Cluster: cluster,
@@ -151,6 +171,16 @@ func UpdateResourceSpecification(ctx *context.Context, req models.ResourceSpecif
ctx.JSON(http.StatusOK, response.Success())
}

func SyncGrampusSpecs(ctx *context.Context) {
err := resource.SyncGrampusSpecs(ctx.User.ID)
if err != nil {
log.Error("AddResourceQueue error. %v", err)
ctx.JSON(http.StatusOK, response.ServerError(err.Error()))
return
}
ctx.JSON(http.StatusOK, response.Success())
}

func GetResourceSceneList(ctx *context.Context) {
page := ctx.QueryInt("page")
jobType := ctx.Query("jobType")
@@ -158,7 +188,7 @@ func GetResourceSceneList(ctx *context.Context) {
queueId := ctx.QueryInt64("queue")
isExclusive := ctx.QueryInt("IsExclusive")
list, err := resource.GetResourceSceneList(models.SearchResourceSceneOptions{
ListOptions: models.ListOptions{Page: page, PageSize: 20},
ListOptions: models.ListOptions{Page: page, PageSize: 10},
JobType: jobType,
IsExclusive: isExclusive,
AiCenterCode: aiCenterCode,


+ 3
- 0
routers/routes/routes.go View File

@@ -610,13 +610,16 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Group("/queue", func() {
m.Get("", admin.GetQueuePage)
m.Get("/list", admin.GetResourceQueueList)
m.Post("/grampus/sync", admin.SyncGrampusQueue)
m.Get("/codes", admin.GetResourceQueueCodes)
m.Get("/centers", admin.GetResourceAiCenters)
m.Post("/add", binding.Bind(models.ResourceQueueReq{}), admin.AddResourceQueue)
m.Post("/update/:id", binding.BindIgnErr(models.ResourceQueueReq{}), admin.UpdateResourceQueue)
})
m.Group("/specification", func() {
m.Get("", admin.GetSpecificationPage)
m.Get("/list", admin.GetResourceSpecificationList)
m.Post("/grampus/sync", admin.SyncGrampusSpecs)
m.Post("/add", binding.Bind(models.ResourceSpecificationReq{}), admin.AddResourceSpecification)
m.Post("/update/:id", binding.BindIgnErr(models.ResourceSpecificationReq{}), admin.UpdateResourceSpecification)
})


+ 67
- 0
services/cloudbrain/resource/resource_queue.go View File

@@ -2,6 +2,8 @@ package resource

import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/grampus"
"strings"
)

func AddResourceQueue(req models.ResourceQueueReq) error {
@@ -38,3 +40,68 @@ func GetResourceQueueCodes(opts models.GetQueueCodesOptions) ([]*models.Resource

return r, nil
}

func GetResourceAiCenters() ([]models.ResourceAiCenterRes, error) {
r, err := models.GetResourceAiCenters()
if err != nil {
return nil, err
}

return r, nil
}

func SyncGrampusQueue(doerId int64) error {
r, err := grampus.GetAiCenters(1, 100)
if err != nil {
return err
}
queueUpdateList := make([]models.ResourceQueue, 0)
queueInsertList := make([]models.ResourceQueue, 0)
existIds := make([]int64, 0)

for _, center := range r.Infos {
for _, device := range center.AccDevices {
computeResource := models.ParseComputeResourceFormGrampus(device.Kind)
accCardType := strings.ToUpper(device.Model)
if computeResource == "" {
continue
}
//Determine if this quque already exists.if exist,update params
//if not exist,insert a new record
oldQueue, err := models.GetResourceQueue(&models.ResourceQueue{
Cluster: models.C2NetCluster,
AiCenterCode: center.Id,
ComputeResource: computeResource,
AccCardType: accCardType,
})
if err != nil {
return err
}

if oldQueue == nil {
queueInsertList = append(queueInsertList, models.ResourceQueue{
Cluster: models.C2NetCluster,
AiCenterCode: center.Id,
AiCenterName: center.Name,
ComputeResource: computeResource,
AccCardType: accCardType,
IsAutomaticSync: true,
CreatedBy: doerId,
UpdatedBy: doerId,
})
} else {
existIds = append(existIds, oldQueue.ID)
queueUpdateList = append(queueUpdateList, models.ResourceQueue{
ID: oldQueue.ID,
ComputeResource: computeResource,
AiCenterName: center.Name,
AccCardType: accCardType,
UpdatedBy: doerId,
})
}

}
}
//todo 调试两个虎鲸接口,注意批量update是否有问题。确定上架时有没有检查queue是否被删除
return models.SyncGrampusQueues(queueUpdateList, queueInsertList, existIds)
}

+ 81
- 1
services/cloudbrain/resource/resource_specification.go View File

@@ -2,6 +2,9 @@ package resource

import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/grampus"
"errors"
"strings"
)

func AddResourceSpecification(req models.ResourceSpecificationReq) error {
@@ -23,6 +26,72 @@ func UpdateResourceSpecification(specId int64, req models.ResourceSpecificationR
return nil
}

func SyncGrampusSpecs(doerId int64) error {
r, err := grampus.GetResourceSpecs("")
if err != nil {
return err
}
specUpdateList := make([]models.ResourceSpecification, 0)
specInsertList := make([]models.ResourceSpecification, 0)
existIds := make([]int64, 0)
for _, spec := range r.Infos {
for _, c := range spec.Centers {
computeResource := models.ParseComputeResourceFormGrampus(spec.SpecInfo.AccDeviceKind)
if computeResource == "" {
continue
}
accCardType := strings.ToUpper(spec.SpecInfo.AccDeviceModel)
// get resource queue.if queue not exist,skip it
r, err := models.GetResourceQueue(&models.ResourceQueue{
Cluster: models.C2NetCluster,
AiCenterCode: c.ID,
ComputeResource: computeResource,
AccCardType: accCardType,
})
if err != nil || r == nil {
continue
}

//Determine if this specification already exists.if exist,update params
//if not exist,insert a new record and status is SpecNotVerified
oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{
QueueId: r.ID,
SourceSpecId: spec.ID,
})
if err != nil {
return err
}

if oldSpec == nil {
specInsertList = append(specInsertList, models.ResourceSpecification{
QueueId: r.ID,
SourceSpecId: spec.ID,
AccCardsNum: spec.SpecInfo.AccDeviceNum,
CpuCores: spec.SpecInfo.CpuCoreNum,
MemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.MemorySize),
GPUMemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.AccDeviceMemory),
Status: models.SpecNotVerified,
IsAutomaticSync: true,
CreatedBy: doerId,
UpdatedBy: doerId,
})
} else {
existIds = append(existIds, oldSpec.ID)
specUpdateList = append(specUpdateList, models.ResourceSpecification{
ID: oldSpec.ID,
AccCardsNum: spec.SpecInfo.AccDeviceNum,
CpuCores: spec.SpecInfo.CpuCoreNum,
MemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.MemorySize),
GPUMemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.AccDeviceMemory),
UpdatedBy: doerId,
})
}

}
}
return models.SyncGrampusSpecs(specUpdateList, specInsertList, existIds)
}

//GetResourceSpecificationList returns specification and queue
func GetResourceSpecificationList(opts models.SearchResourceSpecificationOptions) (*models.ResourceSpecAndQueueListRes, error) {
n, r, err := models.SearchResourceSpecification(opts)
@@ -34,7 +103,18 @@ func GetResourceSpecificationList(opts models.SearchResourceSpecificationOptions
}

func ResourceSpecOnShelf(id int64, req models.ResourceSpecificationReq) error {
_, err := models.ResourceSpecOnShelf(id, models.ResourceSpecification{
spec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: id})
if err != nil {
return err
}
if spec == nil {
return errors.New("specification not exist")
}
if q, err := models.GetResourceQueue(&models.ResourceQueue{ID: spec.QueueId}); err != nil || q == nil {
return errors.New("resource queue not available")
}

_, err = models.ResourceSpecOnShelf(id, models.ResourceSpecification{
UnitPrice: req.UnitPrice,
})
if err != nil {


Loading…
Cancel
Save