diff --git a/models/cloudbrain.go b/models/cloudbrain.go index bac044ce0..61b7abd47 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -1299,6 +1299,34 @@ type GrampusSpec struct { Name string `json:"name"` ProcessorType string `json:"processorType"` Centers []Center `json:"centers"` + SpecInfo SpecInfo `json:"specInfo"` +} + +type GrampusAiCenter struct { + AccDevices []GrampusAccDevice `json:"accDevices"` + Id string `json:"id"` + Name string `json:"name"` + Resource []GrampusCenterResource `json:"resource"` +} + +type GrampusAccDevice struct { + Kind string `json:"kind"` //加速卡类别, npu.huawei.com/NPU,nvidia.com/gpu,cambricon.com/mlu + Model string `json:"model"` //加速卡型号 +} + +type GrampusCenterResource struct { + Allocated string `json:"allocated"` + Capacity string `json:"capacity"` + Name string `json:"name"` +} + +type SpecInfo struct { + AccDeviceKind string `json:"accDeviceKind"` + AccDeviceMemory string `json:"accDeviceMemory"` + AccDeviceModel string `json:"accDeviceModel"` + AccDeviceNum int `json:"accDeviceNum"` + CpuCoreNum int `json:"cpuCoreNum"` + MemorySize string `json:"memorySize"` } type GetGrampusResourceSpecsResult struct { @@ -1306,6 +1334,12 @@ type GetGrampusResourceSpecsResult struct { Infos []GrampusSpec `json:"resourceSpecs"` } +type GetGrampusAiCentersResult struct { + GrampusResult + Infos []GrampusAiCenter `json:"aiCenterInfos"` + TotalSize int `json:"totalSize"` +} + type GrampusImage struct { CreatedAt int64 `json:"createdAt"` UpdatedAt int64 `json:"updatedAt"` diff --git a/models/resource_queue.go b/models/resource_queue.go index b6b29ee16..bf35c4b78 100644 --- a/models/resource_queue.go +++ b/models/resource_queue.go @@ -2,6 +2,7 @@ package models import ( "code.gitea.io/gitea/modules/timeutil" + "strings" "xorm.io/builder" ) @@ -10,11 +11,13 @@ type ResourceQueue struct { QueueCode string Cluster string `xorm:"notnull"` AiCenterCode string + AiCenterName string ComputeResource string AccCardType string CardsTotalNum int IsAutomaticSync bool Remark string + DeletedTime timeutil.TimeStamp `xorm:"deleted"` CreatedTime timeutil.TimeStamp `xorm:"created"` CreatedBy int64 UpdatedTime timeutil.TimeStamp `xorm:"updated"` @@ -27,6 +30,7 @@ func (r ResourceQueue) ConvertToRes() *ResourceQueueRes { QueueCode: r.QueueCode, Cluster: r.Cluster, AiCenterCode: r.AiCenterCode, + AiCenterName: r.AiCenterName, ComputeResource: r.ComputeResource, AccCardType: r.AccCardType, CardsTotalNum: r.CardsTotalNum, @@ -48,18 +52,26 @@ type ResourceQueueReq struct { } func (r ResourceQueueReq) ToDTO() ResourceQueue { - return ResourceQueue{ + q := ResourceQueue{ QueueCode: r.QueueCode, Cluster: r.Cluster, AiCenterCode: r.AiCenterCode, - ComputeResource: r.ComputeResource, - AccCardType: r.AccCardType, + ComputeResource: strings.ToUpper(r.ComputeResource), + AccCardType: strings.ToUpper(r.AccCardType), CardsTotalNum: r.CardsTotalNum, IsAutomaticSync: r.IsAutomaticSync, Remark: r.Remark, CreatedBy: r.CreatorId, UpdatedBy: r.CreatorId, } + if r.Cluster == OpenICluster { + if r.AiCenterCode == AICenterOfCloudBrainOne { + q.AiCenterName = "云脑一" + } else if r.AiCenterCode == AICenterOfCloudBrainTwo { + q.AiCenterName = "云脑二" + } + } + return q } type SearchResourceQueueOptions struct { @@ -82,6 +94,11 @@ type ResourceQueueCodesRes struct { AiCenterCode string } +type ResourceAiCenterRes struct { + AiCenterCode string + AiCenterName string +} + type GetQueueCodesOptions struct { Cluster string } @@ -102,6 +119,7 @@ type ResourceQueueRes struct { QueueCode string Cluster string AiCenterCode string + AiCenterName string ComputeResource string AccCardType string CardsTotalNum int @@ -134,13 +152,13 @@ func SearchResourceQueue(opts SearchResourceQueueOptions) (int64, []ResourceQueu if opts.AccCardType != "" { cond = cond.And(builder.Eq{"acc_card_type": opts.AccCardType}) } - n, err := x.Where(cond).Count(&ResourceQueue{}) + n, err := x.Where(cond).Unscoped().Count(&ResourceQueue{}) if err != nil { return 0, nil, err } r := make([]ResourceQueue, 0) - err = x.Where(cond).Desc("id").Limit(opts.PageSize, (opts.Page-1)*opts.PageSize).Find(&r) + err = x.Where(cond).Desc("id").Limit(opts.PageSize, (opts.Page-1)*opts.PageSize).Unscoped().Find(&r) if err != nil { return 0, nil, err } @@ -161,3 +179,98 @@ func GetResourceQueueCodes(opts GetQueueCodesOptions) ([]*ResourceQueueCodesRes, } return r, nil } + +func GetResourceQueue(r *ResourceQueue) (*ResourceQueue, error) { + has, err := x.Get(r) + if err != nil { + return nil, err + } else if !has { + return nil, nil + } + return r, nil +} + +func ParseComputeResourceFormGrampus(grampusDeviceKind string) string { + t := strings.Split(grampusDeviceKind, "/") + if len(t) < 2 { + return "" + } + return strings.ToUpper(t[1]) +} + +func ParseMemSizeFromGrampus(grampusMemSize string) float32 { + return 0 +} + +func SyncGrampusQueues(updateList []ResourceQueue, insertList []ResourceQueue, existIds []int64) error { + sess := x.NewSession() + var err error + defer func() { + if err != nil { + sess.Rollback() + } + sess.Close() + }() + + //delete queues that no longer exists + deleteQueueIds := make([]int64, 0) + queueCond := builder.NewCond() + queueCond = queueCond.And(builder.NotIn("resource_queue.id", existIds)).And(builder.Eq{"resource_queue.cluster": C2NetCluster}) + if err := sess.Cols("resource_queue.id").Table("resource_queue"). + Where(queueCond).Find(&deleteQueueIds); err != nil { + return err + } + + if len(deleteQueueIds) > 0 { + if _, err = sess.In("id", deleteQueueIds).Update(&ResourceQueue{Remark: "自动同步时被下架"}); err != nil { + return err + } + if _, err = sess.In("id", deleteQueueIds).Delete(&ResourceQueue{}); err != nil { + return err + } + + //delete specs and scene that no longer exists + deleteSpcIds := make([]int64, 0) + if err := sess.Cols("resource_specification.id").Table("resource_specification"). + In("queue_id", deleteQueueIds).Find(&deleteSpcIds); err != nil { + return err + } + if len(deleteSpcIds) > 0 { + if _, err = sess.In("id", deleteSpcIds).Update(&ResourceSpecification{Status: SpecOffShelf}); err != nil { + return err + } + if _, err = sess.In("spec_id", deleteSpcIds).Delete(&ResourceSceneSpec{}); err != nil { + return err + } + } + + } + + //update exists specs + if len(updateList) > 0 { + for _, v := range updateList { + if _, err = sess.ID(v.ID).Update(&v); err != nil { + return err + } + } + + } + + //insert new specs + if len(insertList) > 0 { + if _, err = sess.Insert(insertList); err != nil { + return err + } + } + + return sess.Commit() +} + +func GetResourceAiCenters() ([]ResourceAiCenterRes, error) { + r := make([]ResourceAiCenterRes, 0) + err := x.Table("resource_queue").Distinct("ai_center_code", "ai_center_name").Find(&r) + if err != nil { + return nil, err + } + return r, nil +} diff --git a/models/resource_specification.go b/models/resource_specification.go index 2a2093bf6..deeaf3f26 100644 --- a/models/resource_specification.go +++ b/models/resource_specification.go @@ -163,8 +163,9 @@ func SearchResourceSpecification(opts SearchResourceSpecificationOptions) (int64 if opts.Cluster != "" { cond = cond.And(builder.Eq{"resource_queue.cluster": opts.Cluster}) } + //cond = cond.And(builder.Or(builder.Eq{"resource_queue.deleted_time": 0}).Or(builder.IsNull{"resource_queue.deleted_time"})) n, err := x.Where(cond).Join("INNER", "resource_queue", "resource_queue.ID = resource_specification.queue_id"). - Count(&ResourceSpecAndQueue{}) + Unscoped().Count(&ResourceSpecAndQueue{}) if err != nil { return 0, nil, err } @@ -174,7 +175,7 @@ func SearchResourceSpecification(opts SearchResourceSpecificationOptions) (int64 Join("INNER", "resource_queue", "resource_queue.ID = resource_specification.queue_id"). Desc("resource_specification.id"). Limit(opts.PageSize, (opts.Page-1)*opts.PageSize). - Find(&r) + Unscoped().Find(&r) if err != nil { return 0, nil, err } @@ -210,3 +211,60 @@ func ResourceSpecOffShelf(id int64) (int64, error) { sess.Commit() return n, err } + +func GetResourceSpecification(r *ResourceSpecification) (*ResourceSpecification, error) { + has, err := x.Get(r) + if err != nil { + return nil, err + } else if !has { + return nil, nil + } + return r, nil +} + +func SyncGrampusSpecs(updateList []ResourceSpecification, insertList []ResourceSpecification, existIds []int64) error { + sess := x.NewSession() + var err error + defer func() { + if err != nil { + sess.Rollback() + } + sess.Close() + }() + //delete specs and scene that no longer exists + deleteIds := make([]int64, 0) + cond := builder.NewCond() + cond = cond.And(builder.NotIn("resource_specification.id", existIds)).And(builder.Eq{"resource_queue.cluster": C2NetCluster}) + if err := sess.Cols("resource_specification.id").Table("resource_specification"). + Where(cond).Join("INNER", "resource_queue", "resource_queue.id = resource_specification.queue_id"). + Find(&deleteIds); err != nil { + return err + } + if len(deleteIds) > 0 { + if _, err = sess.In("id", deleteIds).Update(&ResourceSpecification{Status: SpecOffShelf}); err != nil { + return err + } + if _, err = sess.In("spec_id", deleteIds).Delete(&ResourceSceneSpec{}); err != nil { + return err + } + } + + //update exists specs + if len(updateList) > 0 { + for _, v := range updateList { + if _, err = sess.ID(v.ID).Update(&v); err != nil { + return err + } + } + + } + + //insert new specs + if len(insertList) > 0 { + if _, err = sess.Insert(insertList); err != nil { + return err + } + } + + return sess.Commit() +} diff --git a/modules/grampus/resty.go b/modules/grampus/resty.go index 5e8722b4b..427b801cc 100755 --- a/modules/grampus/resty.go +++ b/modules/grampus/resty.go @@ -23,6 +23,7 @@ const ( urlGetToken = urlOpenApiV1 + "token" urlTrainJob = urlOpenApiV1 + "trainjob" urlGetResourceSpecs = urlOpenApiV1 + "resourcespec" + urlGetAiCenter = urlOpenApiV1 + "sharescreen/aicenter" urlGetImages = urlOpenApiV1 + "image" errorIllegalToken = 1005 @@ -275,3 +276,35 @@ sendjob: return &result, nil } + +func GetAiCenters(pageIndex, pageSize int) (*models.GetGrampusAiCentersResult, error) { + checkSetting() + client := getRestyClient() + var result models.GetGrampusAiCentersResult + + retry := 0 + +sendjob: + _, err := client.R(). + SetAuthToken(TOKEN). + SetResult(&result). + Get(HOST + urlGetAiCenter + "?pageIndex=" + fmt.Sprint(pageIndex) + "&pageSize=" + fmt.Sprint(pageSize)) + + if err != nil { + return nil, fmt.Errorf("resty GetAiCenters: %v", err) + } + + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + log.Info("retry get token") + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("GetAiCenters failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("GetAiCenters failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} diff --git a/routers/admin/resources.go b/routers/admin/resources.go index 9f45d4e3d..fda602426 100644 --- a/routers/admin/resources.go +++ b/routers/admin/resources.go @@ -44,7 +44,7 @@ func GetResourceQueueList(ctx *context.Context) { computeResource := ctx.Query("resource") accCardType := ctx.Query("card") list, err := resource.GetResourceQueueList(models.SearchResourceQueueOptions{ - ListOptions: models.ListOptions{Page: page, PageSize: 20}, + ListOptions: models.ListOptions{Page: page, PageSize: 10}, Cluster: cluster, AiCenterCode: aiCenterCode, ComputeResource: computeResource, @@ -69,6 +69,16 @@ func GetResourceQueueCodes(ctx *context.Context) { ctx.JSON(http.StatusOK, response.SuccessWithData(list)) } +func GetResourceAiCenters(ctx *context.Context) { + list, err := resource.GetResourceAiCenters() + if err != nil { + log.Error("GetResourceAiCenters error.%v", err) + ctx.JSON(http.StatusOK, response.ServerError(err.Error())) + return + } + ctx.JSON(http.StatusOK, response.SuccessWithData(list)) +} + func AddResourceQueue(ctx *context.Context, req models.ResourceQueueReq) { req.IsAutomaticSync = false req.CreatorId = ctx.User.ID @@ -93,13 +103,23 @@ func UpdateResourceQueue(ctx *context.Context, req models.ResourceQueueReq) { ctx.JSON(http.StatusOK, response.Success()) } +func SyncGrampusQueue(ctx *context.Context) { + err := resource.SyncGrampusQueue(ctx.User.ID) + if err != nil { + log.Error("AddResourceQueue error. %v", err) + ctx.JSON(http.StatusOK, response.ServerError(err.Error())) + return + } + ctx.JSON(http.StatusOK, response.Success()) +} + func GetResourceSpecificationList(ctx *context.Context) { page := ctx.QueryInt("page") queue := ctx.QueryInt64("queue") status := ctx.QueryInt("status") cluster := ctx.Query("cluster") list, err := resource.GetResourceSpecificationList(models.SearchResourceSpecificationOptions{ - ListOptions: models.ListOptions{Page: page, PageSize: 20}, + ListOptions: models.ListOptions{Page: page, PageSize: 10}, QueueId: queue, Status: status, Cluster: cluster, @@ -151,6 +171,16 @@ func UpdateResourceSpecification(ctx *context.Context, req models.ResourceSpecif ctx.JSON(http.StatusOK, response.Success()) } +func SyncGrampusSpecs(ctx *context.Context) { + err := resource.SyncGrampusSpecs(ctx.User.ID) + if err != nil { + log.Error("AddResourceQueue error. %v", err) + ctx.JSON(http.StatusOK, response.ServerError(err.Error())) + return + } + ctx.JSON(http.StatusOK, response.Success()) +} + func GetResourceSceneList(ctx *context.Context) { page := ctx.QueryInt("page") jobType := ctx.Query("jobType") @@ -158,7 +188,7 @@ func GetResourceSceneList(ctx *context.Context) { queueId := ctx.QueryInt64("queue") isExclusive := ctx.QueryInt("IsExclusive") list, err := resource.GetResourceSceneList(models.SearchResourceSceneOptions{ - ListOptions: models.ListOptions{Page: page, PageSize: 20}, + ListOptions: models.ListOptions{Page: page, PageSize: 10}, JobType: jobType, IsExclusive: isExclusive, AiCenterCode: aiCenterCode, diff --git a/routers/routes/routes.go b/routers/routes/routes.go index b52709099..5e59ccf25 100755 --- a/routers/routes/routes.go +++ b/routers/routes/routes.go @@ -610,13 +610,16 @@ func RegisterRoutes(m *macaron.Macaron) { m.Group("/queue", func() { m.Get("", admin.GetQueuePage) m.Get("/list", admin.GetResourceQueueList) + m.Post("/grampus/sync", admin.SyncGrampusQueue) m.Get("/codes", admin.GetResourceQueueCodes) + m.Get("/centers", admin.GetResourceAiCenters) m.Post("/add", binding.Bind(models.ResourceQueueReq{}), admin.AddResourceQueue) m.Post("/update/:id", binding.BindIgnErr(models.ResourceQueueReq{}), admin.UpdateResourceQueue) }) m.Group("/specification", func() { m.Get("", admin.GetSpecificationPage) m.Get("/list", admin.GetResourceSpecificationList) + m.Post("/grampus/sync", admin.SyncGrampusSpecs) m.Post("/add", binding.Bind(models.ResourceSpecificationReq{}), admin.AddResourceSpecification) m.Post("/update/:id", binding.BindIgnErr(models.ResourceSpecificationReq{}), admin.UpdateResourceSpecification) }) diff --git a/services/cloudbrain/resource/resource_queue.go b/services/cloudbrain/resource/resource_queue.go index d576307be..15122f944 100644 --- a/services/cloudbrain/resource/resource_queue.go +++ b/services/cloudbrain/resource/resource_queue.go @@ -2,6 +2,8 @@ package resource import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/grampus" + "strings" ) func AddResourceQueue(req models.ResourceQueueReq) error { @@ -38,3 +40,68 @@ func GetResourceQueueCodes(opts models.GetQueueCodesOptions) ([]*models.Resource return r, nil } + +func GetResourceAiCenters() ([]models.ResourceAiCenterRes, error) { + r, err := models.GetResourceAiCenters() + if err != nil { + return nil, err + } + + return r, nil +} + +func SyncGrampusQueue(doerId int64) error { + r, err := grampus.GetAiCenters(1, 100) + if err != nil { + return err + } + queueUpdateList := make([]models.ResourceQueue, 0) + queueInsertList := make([]models.ResourceQueue, 0) + existIds := make([]int64, 0) + + for _, center := range r.Infos { + for _, device := range center.AccDevices { + computeResource := models.ParseComputeResourceFormGrampus(device.Kind) + accCardType := strings.ToUpper(device.Model) + if computeResource == "" { + continue + } + //Determine if this quque already exists.if exist,update params + //if not exist,insert a new record + oldQueue, err := models.GetResourceQueue(&models.ResourceQueue{ + Cluster: models.C2NetCluster, + AiCenterCode: center.Id, + ComputeResource: computeResource, + AccCardType: accCardType, + }) + if err != nil { + return err + } + + if oldQueue == nil { + queueInsertList = append(queueInsertList, models.ResourceQueue{ + Cluster: models.C2NetCluster, + AiCenterCode: center.Id, + AiCenterName: center.Name, + ComputeResource: computeResource, + AccCardType: accCardType, + IsAutomaticSync: true, + CreatedBy: doerId, + UpdatedBy: doerId, + }) + } else { + existIds = append(existIds, oldQueue.ID) + queueUpdateList = append(queueUpdateList, models.ResourceQueue{ + ID: oldQueue.ID, + ComputeResource: computeResource, + AiCenterName: center.Name, + AccCardType: accCardType, + UpdatedBy: doerId, + }) + } + + } + } + //todo 调试两个虎鲸接口,注意批量update是否有问题。确定上架时有没有检查queue是否被删除 + return models.SyncGrampusQueues(queueUpdateList, queueInsertList, existIds) +} diff --git a/services/cloudbrain/resource/resource_specification.go b/services/cloudbrain/resource/resource_specification.go index f29d4874d..e7a6ae54d 100644 --- a/services/cloudbrain/resource/resource_specification.go +++ b/services/cloudbrain/resource/resource_specification.go @@ -2,6 +2,9 @@ package resource import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/grampus" + "errors" + "strings" ) func AddResourceSpecification(req models.ResourceSpecificationReq) error { @@ -23,6 +26,72 @@ func UpdateResourceSpecification(specId int64, req models.ResourceSpecificationR return nil } +func SyncGrampusSpecs(doerId int64) error { + r, err := grampus.GetResourceSpecs("") + if err != nil { + return err + } + specUpdateList := make([]models.ResourceSpecification, 0) + specInsertList := make([]models.ResourceSpecification, 0) + existIds := make([]int64, 0) + for _, spec := range r.Infos { + for _, c := range spec.Centers { + computeResource := models.ParseComputeResourceFormGrampus(spec.SpecInfo.AccDeviceKind) + if computeResource == "" { + continue + } + accCardType := strings.ToUpper(spec.SpecInfo.AccDeviceModel) + // get resource queue.if queue not exist,skip it + r, err := models.GetResourceQueue(&models.ResourceQueue{ + Cluster: models.C2NetCluster, + AiCenterCode: c.ID, + ComputeResource: computeResource, + AccCardType: accCardType, + }) + if err != nil || r == nil { + continue + } + + //Determine if this specification already exists.if exist,update params + //if not exist,insert a new record and status is SpecNotVerified + oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{ + QueueId: r.ID, + SourceSpecId: spec.ID, + }) + if err != nil { + return err + } + + if oldSpec == nil { + specInsertList = append(specInsertList, models.ResourceSpecification{ + QueueId: r.ID, + SourceSpecId: spec.ID, + AccCardsNum: spec.SpecInfo.AccDeviceNum, + CpuCores: spec.SpecInfo.CpuCoreNum, + MemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.MemorySize), + GPUMemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.AccDeviceMemory), + Status: models.SpecNotVerified, + IsAutomaticSync: true, + CreatedBy: doerId, + UpdatedBy: doerId, + }) + } else { + existIds = append(existIds, oldSpec.ID) + specUpdateList = append(specUpdateList, models.ResourceSpecification{ + ID: oldSpec.ID, + AccCardsNum: spec.SpecInfo.AccDeviceNum, + CpuCores: spec.SpecInfo.CpuCoreNum, + MemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.MemorySize), + GPUMemGiB: models.ParseMemSizeFromGrampus(spec.SpecInfo.AccDeviceMemory), + UpdatedBy: doerId, + }) + } + + } + } + return models.SyncGrampusSpecs(specUpdateList, specInsertList, existIds) +} + //GetResourceSpecificationList returns specification and queue func GetResourceSpecificationList(opts models.SearchResourceSpecificationOptions) (*models.ResourceSpecAndQueueListRes, error) { n, r, err := models.SearchResourceSpecification(opts) @@ -34,7 +103,18 @@ func GetResourceSpecificationList(opts models.SearchResourceSpecificationOptions } func ResourceSpecOnShelf(id int64, req models.ResourceSpecificationReq) error { - _, err := models.ResourceSpecOnShelf(id, models.ResourceSpecification{ + spec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: id}) + if err != nil { + return err + } + if spec == nil { + return errors.New("specification not exist") + } + if q, err := models.GetResourceQueue(&models.ResourceQueue{ID: spec.QueueId}); err != nil || q == nil { + return errors.New("resource queue not available") + } + + _, err = models.ResourceSpecOnShelf(id, models.ResourceSpecification{ UnitPrice: req.UnitPrice, }) if err != nil {