You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

resource_specification.go 17 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago

  1. package resource
  2. import (
  3. "code.gitea.io/gitea/models"
  4. "code.gitea.io/gitea/modules/cloudbrain"
  5. "code.gitea.io/gitea/modules/grampus"
  6. "code.gitea.io/gitea/modules/log"
  7. "code.gitea.io/gitea/modules/modelarts"
  8. "code.gitea.io/gitea/modules/setting"
  9. "code.gitea.io/gitea/routers/response"
  10. "code.gitea.io/gitea/services/admin/operate_log"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. func AddResourceSpecification(doerId int64, req models.ResourceSpecificationReq) error {
  19. if req.Status == 0 {
  20. req.Status = models.SpecNotVerified
  21. }
  22. spec := req.ToDTO()
  23. if _, err := models.InsertResourceSpecification(spec); err != nil {
  24. return err
  25. }
  26. return nil
  27. }
  28. func UpdateSpecUnitPrice(doerId int64, specId int64, unitPrice int) *response.BizError {
  29. oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: specId})
  30. if err != nil {
  31. return response.NewBizError(err)
  32. }
  33. if oldSpec == nil {
  34. return response.SPECIFICATION_NOT_EXIST
  35. }
  36. err = models.UpdateSpecUnitPriceById(specId, unitPrice)
  37. if err != nil {
  38. return response.NewBizError(err)
  39. }
  40. if oldSpec.UnitPrice != unitPrice {
  41. AddSpecOperateLog(doerId, "edit", operate_log.NewLogValues().Add("unitPrice", unitPrice), operate_log.NewLogValues().Add("unitPrice", oldSpec.UnitPrice), specId, fmt.Sprintf("修改资源规格单价从%d积分到%d积分", oldSpec.UnitPrice, unitPrice))
  42. }
  43. return nil
  44. }
  45. func SyncGrampusSpecs(doerId int64) error {
  46. r, err := grampus.GetResourceSpecs("")
  47. if err != nil {
  48. return err
  49. }
  50. log.Info("SyncGrampusSpecs result = %+v", r)
  51. specUpdateList := make([]models.ResourceSpecification, 0)
  52. specInsertList := make([]models.ResourceSpecification, 0)
  53. existIds := make([]int64, 0)
  54. for _, spec := range r.Infos {
  55. for _, c := range spec.Centers {
  56. computeResource := models.ParseComputeResourceFormGrampus(spec.SpecInfo.AccDeviceKind)
  57. if computeResource == "" {
  58. continue
  59. }
  60. accCardType := strings.ToUpper(spec.SpecInfo.AccDeviceModel)
  61. memGiB, err := models.ParseMemSizeFromGrampus(spec.SpecInfo.MemorySize)
  62. gpuMemGiB, err := models.ParseMemSizeFromGrampus(spec.SpecInfo.AccDeviceMemory)
  63. if err != nil {
  64. log.Error("ParseMemSizeFromGrampus error. MemorySize=%s AccDeviceMemory=%s", spec.SpecInfo.MemorySize, spec.SpecInfo.AccDeviceMemory)
  65. }
  66. // get resource queue.if queue not exist,skip it
  67. r, err := models.GetResourceQueue(&models.ResourceQueue{
  68. Cluster: models.C2NetCluster,
  69. AiCenterCode: c.ID,
  70. ComputeResource: computeResource,
  71. AccCardType: accCardType,
  72. })
  73. if err != nil || r == nil {
  74. continue
  75. }
  76. //Determine if this specification already exists.if exist,update params
  77. //if not exist,insert a new record and status is SpecNotVerified
  78. oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{
  79. QueueId: r.ID,
  80. SourceSpecId: spec.ID,
  81. })
  82. if err != nil {
  83. return err
  84. }
  85. if oldSpec == nil {
  86. specInsertList = append(specInsertList, models.ResourceSpecification{
  87. QueueId: r.ID,
  88. SourceSpecId: spec.ID,
  89. AccCardsNum: spec.SpecInfo.AccDeviceNum,
  90. CpuCores: spec.SpecInfo.CpuCoreNum,
  91. MemGiB: memGiB,
  92. GPUMemGiB: gpuMemGiB,
  93. Status: models.SpecNotVerified,
  94. IsAutomaticSync: true,
  95. CreatedBy: doerId,
  96. UpdatedBy: doerId,
  97. })
  98. } else {
  99. existIds = append(existIds, oldSpec.ID)
  100. specUpdateList = append(specUpdateList, models.ResourceSpecification{
  101. ID: oldSpec.ID,
  102. AccCardsNum: spec.SpecInfo.AccDeviceNum,
  103. CpuCores: spec.SpecInfo.CpuCoreNum,
  104. MemGiB: memGiB,
  105. GPUMemGiB: gpuMemGiB,
  106. UpdatedBy: doerId,
  107. })
  108. }
  109. }
  110. }
  111. return models.SyncGrampusSpecs(specUpdateList, specInsertList, existIds)
  112. }
  113. //GetResourceSpecificationList returns specification and queue
  114. func GetResourceSpecificationList(opts models.SearchResourceSpecificationOptions) (*models.ResourceSpecAndQueueListRes, error) {
  115. n, r, err := models.SearchResourceSpecification(opts)
  116. if err != nil {
  117. return nil, err
  118. }
  119. return models.NewResourceSpecAndQueueListRes(n, r), nil
  120. }
  121. func GetResourceSpecificationScenes(specId int64) ([]models.ResourceSceneBriefRes, error) {
  122. r, err := models.GetSpecScenes(specId)
  123. if err != nil {
  124. return nil, err
  125. }
  126. return r, nil
  127. }
  128. func ResourceSpecOnShelf(doerId int64, id int64, unitPrice int) *response.BizError {
  129. spec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: id})
  130. if err != nil {
  131. return response.NewBizError(err)
  132. }
  133. if spec == nil {
  134. return response.SPECIFICATION_NOT_EXIST
  135. }
  136. if q, err := models.GetResourceQueue(&models.ResourceQueue{ID: spec.QueueId}); err != nil || q == nil {
  137. return response.RESOURCE_QUEUE_NOT_AVAILABLE
  138. }
  139. err = models.ResourceSpecOnShelf(id, unitPrice)
  140. if err != nil {
  141. return response.NewBizError(err)
  142. }
  143. if spec.UnitPrice != unitPrice {
  144. AddSpecOperateLog(doerId, "on-shelf", operate_log.NewLogValues().Add("UnitPrice", unitPrice), operate_log.NewLogValues().Add("UnitPrice", spec.UnitPrice), id, fmt.Sprintf("定价上架资源规格,单价为%d", unitPrice))
  145. } else {
  146. AddSpecOperateLog(doerId, "on-shelf", nil, nil, id, "上架资源规格")
  147. }
  148. return nil
  149. }
  150. func ResourceSpecOffShelf(doerId int64, id int64) *response.BizError {
  151. _, err := models.ResourceSpecOffShelf(id)
  152. if err != nil {
  153. return response.NewBizError(err)
  154. }
  155. AddSpecOperateLog(doerId, "off-shelf", nil, nil, id, "下架资源规格")
  156. return nil
  157. }
  158. func AddSpecOperateLog(doerId int64, operateType string, newValue, oldValue *models.LogValues, specId int64, comment string) {
  159. var newString = ""
  160. var oldString = ""
  161. if newValue != nil {
  162. newString = newValue.JsonString()
  163. }
  164. if oldValue != nil {
  165. oldString = oldValue.JsonString()
  166. }
  167. operate_log.Log(models.AdminOperateLog{
  168. BizType: "SpecOperate",
  169. OperateType: operateType,
  170. OldValue: oldString,
  171. NewValue: newString,
  172. RelatedId: fmt.Sprint(specId),
  173. CreatedBy: doerId,
  174. Comment: comment,
  175. })
  176. }
  177. func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.Specification, error) {
  178. r, err := models.FindSpecs(opts)
  179. if err != nil {
  180. log.Error("FindAvailableSpecs error.%v", err)
  181. return nil, err
  182. }
  183. //filter exclusive specs
  184. specs := filterExclusiveSpecs(r, userId)
  185. //distinct by sourceSpecId
  186. specs = distinctSpecs(specs)
  187. return specs, nil
  188. }
  189. func filterExclusiveSpecs(r []*models.Specification, userId int64) []*models.Specification {
  190. specs := make([]*models.Specification, 0, len(r))
  191. for i := 0; i < len(r); i++ {
  192. spec := r[i]
  193. if !spec.IsExclusive {
  194. specs = append(specs, spec)
  195. continue
  196. }
  197. orgs := strings.Split(spec.ExclusiveOrg, ";")
  198. for _, org := range orgs {
  199. isMember, _ := models.IsOrganizationMemberByOrgName(org, userId)
  200. if isMember {
  201. specs = append(specs, spec)
  202. }
  203. }
  204. }
  205. return specs
  206. }
  207. func distinctSpecs(r []*models.Specification) []*models.Specification {
  208. specs := make([]*models.Specification, 0, len(r))
  209. sourceSpecIdMap := make(map[string]string, 0)
  210. for i := 0; i < len(r); i++ {
  211. spec := r[i]
  212. if spec.SourceSpecId == "" {
  213. specs = append(specs, spec)
  214. continue
  215. }
  216. if _, has := sourceSpecIdMap[spec.SourceSpecId]; has {
  217. continue
  218. }
  219. specs = append(specs, spec)
  220. sourceSpecIdMap[spec.SourceSpecId] = ""
  221. }
  222. return specs
  223. }
  224. func GetAndCheckSpec(userId int64, specId int64, opts models.FindSpecsOptions) (*models.Specification, error) {
  225. if specId == 0 {
  226. return nil, nil
  227. }
  228. opts.SpecId = specId
  229. r, err := FindAvailableSpecs(userId, opts)
  230. if err != nil {
  231. return nil, err
  232. }
  233. if r == nil || len(r) == 0 {
  234. return nil, nil
  235. }
  236. return r[0], nil
  237. }
  238. func InsertCloudbrainSpec(cloudbrainId int64, s *models.Specification) error {
  239. c := models.CloudbrainSpec{
  240. CloudbrainID: cloudbrainId,
  241. SpecId: s.ID,
  242. SourceSpecId: s.SourceSpecId,
  243. AccCardsNum: s.AccCardsNum,
  244. AccCardType: s.AccCardType,
  245. CpuCores: s.CpuCores,
  246. MemGiB: s.MemGiB,
  247. GPUMemGiB: s.GPUMemGiB,
  248. ShareMemGiB: s.ShareMemGiB,
  249. ComputeResource: s.ComputeResource,
  250. UnitPrice: s.UnitPrice,
  251. QueueId: s.QueueId,
  252. QueueCode: s.QueueCode,
  253. Cluster: s.Cluster,
  254. AiCenterCode: s.AiCenterCode,
  255. AiCenterName: s.AiCenterName,
  256. IsExclusive: s.IsExclusive,
  257. ExclusiveOrg: s.ExclusiveOrg,
  258. }
  259. _, err := models.InsertCloudbrainSpec(c)
  260. if err != nil {
  261. log.Error("InsertCloudbrainSpec error.CloudbrainSpec=%v. err=%v", c, err)
  262. return err
  263. }
  264. return nil
  265. }
  266. func GetCloudbrainSpec(cloudbrainId int64) (*models.Specification, error) {
  267. c, err := models.GetCloudbrainSpecByID(cloudbrainId)
  268. if err != nil {
  269. return nil, err
  270. }
  271. if c == nil {
  272. return nil, nil
  273. }
  274. return c.ConvertToSpecification(), nil
  275. }
  276. func RefreshHistorySpec(scopeAll bool, ids []int64) (int64, int64, error) {
  277. var success int64
  278. var total int64
  279. if !scopeAll {
  280. if ids == nil || len(ids) == 0 {
  281. return 0, 0, nil
  282. }
  283. total = int64(len(ids))
  284. tasks, err := models.GetCloudbrainWithDeletedByIDs(ids)
  285. if err != nil {
  286. return total, 0, err
  287. }
  288. for _, task := range tasks {
  289. err = RefreshOneHistorySpec(task)
  290. if err != nil {
  291. log.Error("RefreshOneHistorySpec error.%v", err)
  292. continue
  293. }
  294. success++
  295. time.Sleep(500 * time.Millisecond)
  296. }
  297. } else {
  298. page := 1
  299. pageSize := 100
  300. n, err := models.CountNoSpecHistoricTask()
  301. if err != nil {
  302. log.Error("FindNoSpecHistoricTask CountNoSpecHistoricTask error. e=%v", err)
  303. return 0, 0, err
  304. }
  305. total = n
  306. for i := 0; i < 1000; i++ {
  307. list, err := models.FindNoSpecHistoricTask(page, pageSize)
  308. if err != nil {
  309. log.Error("FindNoSpecHistoricTask error.page=%d pageSize=%d e=%v", page, pageSize, err)
  310. return total, success, err
  311. }
  312. if len(list) == 0 {
  313. log.Info("RefreshHistorySpec. list is empty")
  314. break
  315. }
  316. for _, task := range list {
  317. err = RefreshOneHistorySpec(task)
  318. if err != nil {
  319. log.Error("RefreshOneHistorySpec error.%v", err)
  320. continue
  321. }
  322. success++
  323. time.Sleep(500 * time.Millisecond)
  324. }
  325. if len(list) < pageSize {
  326. log.Info("RefreshHistorySpec. list < pageSize")
  327. break
  328. }
  329. }
  330. }
  331. return total, success, nil
  332. }
  333. func RefreshOneHistorySpec(task *models.Cloudbrain) error {
  334. var spec *models.Specification
  335. var err error
  336. switch task.Type {
  337. case models.TypeCloudBrainOne:
  338. spec, err = getCloudbrainOneSpec(task)
  339. case models.TypeCloudBrainTwo:
  340. spec, err = getCloudbrainTwoSpec(task)
  341. }
  342. if err != nil {
  343. log.Error("find spec error,task.ID=%d err=%v", task.ID, err)
  344. return err
  345. }
  346. if spec == nil {
  347. log.Error("find spec failed,task.ID=%d", task.ID)
  348. return errors.New("find spec failed")
  349. }
  350. return InsertCloudbrainSpec(task.ID, spec)
  351. }
  352. func getCloudbrainOneSpec(task *models.Cloudbrain) (*models.Specification, error) {
  353. //find from remote
  354. result, err := cloudbrain.GetJob(task.JobID)
  355. if err != nil {
  356. log.Error("getCloudbrainOneSpec error. %v", err)
  357. return nil, err
  358. }
  359. if result != nil {
  360. jobRes, _ := models.ConvertToJobResultPayload(result.Payload)
  361. memSize, _ := models.ParseMemSizeFromGrampus(jobRes.Resource.Memory)
  362. if task.ComputeResource == "CPU/GPU" {
  363. task.ComputeResource = models.GPU
  364. }
  365. var shmMB float32
  366. if jobRes.Config.TaskRoles != nil && len(jobRes.Config.TaskRoles) > 0 {
  367. shmMB = float32(jobRes.Config.TaskRoles[0].ShmMB) / 1024
  368. }
  369. opt := models.FindSpecsOptions{
  370. ComputeResource: task.ComputeResource,
  371. Cluster: models.OpenICluster,
  372. AiCenterCode: models.AICenterOfCloudBrainOne,
  373. QueueCode: task.GpuQueue,
  374. AccCardsNum: jobRes.Resource.NvidiaComGpu,
  375. UseAccCardsNum: true,
  376. CpuCores: jobRes.Resource.CPU,
  377. UseCpuCores: true,
  378. MemGiB: memSize,
  379. UseMemGiB: memSize > 0,
  380. ShareMemGiB: shmMB,
  381. UseShareMemGiB: shmMB > 0,
  382. RequestAll: true,
  383. }
  384. specs, err := models.FindSpecs(opt)
  385. if err != nil {
  386. log.Error("getCloudbrainOneSpec from remote error,%v", err)
  387. return nil, err
  388. }
  389. if len(specs) == 1 {
  390. return specs[0], nil
  391. }
  392. if len(specs) == 0 {
  393. s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
  394. if err != nil {
  395. log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
  396. return nil, nil
  397. }
  398. return s, nil
  399. }
  400. if len(specs) > 1 {
  401. log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
  402. return nil, nil
  403. }
  404. } else {
  405. //find from config
  406. var specConfig *models.ResourceSpec
  407. hasSpec := false
  408. if task.JobType == string(models.JobTypeTrain) {
  409. if cloudbrain.TrainResourceSpecs == nil {
  410. json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs)
  411. }
  412. for _, tmp := range cloudbrain.TrainResourceSpecs.ResourceSpec {
  413. if tmp.Id == task.ResourceSpecId {
  414. hasSpec = true
  415. specConfig = tmp
  416. break
  417. }
  418. }
  419. } else if task.JobType == string(models.JobTypeInference) {
  420. if cloudbrain.InferenceResourceSpecs == nil {
  421. json.Unmarshal([]byte(setting.InferenceResourceSpecs), &cloudbrain.InferenceResourceSpecs)
  422. }
  423. for _, tmp := range cloudbrain.InferenceResourceSpecs.ResourceSpec {
  424. if tmp.Id == task.ResourceSpecId {
  425. hasSpec = true
  426. specConfig = tmp
  427. break
  428. }
  429. }
  430. } else {
  431. if cloudbrain.ResourceSpecs == nil {
  432. json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs)
  433. }
  434. for _, tmp := range cloudbrain.ResourceSpecs.ResourceSpec {
  435. if tmp.Id == task.ResourceSpecId {
  436. hasSpec = true
  437. specConfig = tmp
  438. break
  439. }
  440. }
  441. }
  442. if !hasSpec && cloudbrain.SpecialPools != nil {
  443. for _, specialPool := range cloudbrain.SpecialPools.Pools {
  444. if specialPool.ResourceSpec != nil {
  445. for _, spec := range specialPool.ResourceSpec {
  446. if task.ResourceSpecId == spec.Id {
  447. hasSpec = true
  448. specConfig = spec
  449. break
  450. }
  451. }
  452. }
  453. }
  454. }
  455. if specConfig == nil {
  456. log.Error("getCloudbrainOneSpec from config failed,task.ResourceSpecId=%d", task.ResourceSpecId)
  457. return nil, nil
  458. }
  459. opt := models.FindSpecsOptions{
  460. JobType: models.JobType(task.JobType),
  461. ComputeResource: task.ComputeResource,
  462. Cluster: models.OpenICluster,
  463. AiCenterCode: models.AICenterOfCloudBrainOne,
  464. QueueCode: task.GpuQueue,
  465. AccCardsNum: specConfig.GpuNum,
  466. UseAccCardsNum: true,
  467. CpuCores: specConfig.GpuNum,
  468. UseCpuCores: true,
  469. MemGiB: float32(specConfig.MemMiB) / 1024,
  470. UseMemGiB: true,
  471. ShareMemGiB: float32(specConfig.ShareMemMiB) / 1024,
  472. UseShareMemGiB: true,
  473. RequestAll: true,
  474. }
  475. specs, err := models.FindSpecs(opt)
  476. if err != nil {
  477. log.Error("getCloudbrainOneSpec from config error,%v", err)
  478. return nil, err
  479. }
  480. if len(specs) > 1 {
  481. log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
  482. return nil, nil
  483. }
  484. if len(specs) == 0 {
  485. s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
  486. if err != nil {
  487. log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
  488. return nil, nil
  489. }
  490. return s, nil
  491. }
  492. return specs[0], nil
  493. }
  494. return nil, nil
  495. }
  496. func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error) {
  497. specMap, err := models.GetCloudbrainTwoSpecs()
  498. if err != nil {
  499. log.Error("InitCloudbrainTwoSpecs err.%v", err)
  500. return nil, err
  501. }
  502. if task.FlavorCode != "" {
  503. return specMap[task.FlavorCode], nil
  504. }
  505. if task.JobType == string(models.JobTypeDebug) {
  506. result, err := modelarts.GetNotebook2(task.JobID)
  507. if err != nil {
  508. log.Error("getCloudbrainTwoSpec GetNotebook2 error.%v", err)
  509. return nil, err
  510. }
  511. if result != nil {
  512. return specMap[result.Flavor], nil
  513. }
  514. } else if task.JobType == string(models.JobTypeTrain) || task.JobType == string(models.JobTypeInference) {
  515. result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10))
  516. if err != nil {
  517. log.Error("getCloudbrainTwoSpec GetTrainJob error:%v", task.JobName, err)
  518. return nil, err
  519. }
  520. if result != nil {
  521. return specMap[result.Flavor.Code], nil
  522. }
  523. }
  524. return nil, nil
  525. }
  526. func RefreshCloudbrainTwoSpec(task *models.Cloudbrain) error {
  527. return nil
  528. }
  529. func RefreshC2NetSpec(task *models.Cloudbrain) error {
  530. return nil
  531. }
  532. func InitQueueAndSpec(opt models.FindSpecsOptions, aiCenterName string, remark string) (*models.Specification, error) {
  533. return models.InitQueueAndSpec(models.ResourceQueue{
  534. QueueCode: opt.QueueCode,
  535. Cluster: opt.Cluster,
  536. AiCenterCode: opt.AiCenterCode,
  537. AiCenterName: aiCenterName,
  538. ComputeResource: opt.ComputeResource,
  539. AccCardType: models.GetCloudbrainOneAccCardType(opt.QueueCode),
  540. Remark: remark,
  541. }, models.ResourceSpecification{
  542. AccCardsNum: opt.AccCardsNum,
  543. CpuCores: opt.CpuCores,
  544. MemGiB: opt.MemGiB,
  545. GPUMemGiB: opt.GPUMemGiB,
  546. ShareMemGiB: opt.ShareMemGiB,
  547. Status: models.SpecOffShelf,
  548. })
  549. }