You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

sync_status.go 4.4 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package cloudbrainTask
  2. import (
  3. "net/http"
  4. "code.gitea.io/gitea/modules/modelarts"
  5. "code.gitea.io/gitea/modules/modelarts_cd"
  6. "code.gitea.io/gitea/modules/grampus"
  7. "code.gitea.io/gitea/modules/timeutil"
  8. "code.gitea.io/gitea/models"
  9. "code.gitea.io/gitea/modules/cloudbrain"
  10. "code.gitea.io/gitea/modules/httplib"
  11. "code.gitea.io/gitea/modules/log"
  12. "code.gitea.io/gitea/modules/notification"
  13. "code.gitea.io/gitea/modules/setting"
  14. )
  15. var noteBookOKMap = make(map[int64]int, 20)
  16. //if a task notebook url can get two times, the notebook can browser.
  17. const successfulCount = 3
  18. func SyncCloudBrainOneStatus(task *models.Cloudbrain) (*models.Cloudbrain, error) {
  19. jobResult, err := cloudbrain.GetJob(task.JobID)
  20. if err != nil {
  21. log.Error("GetJob failed:", err)
  22. return task, err
  23. }
  24. result, err := models.ConvertToJobResultPayload(jobResult.Payload)
  25. if err != nil {
  26. log.Error("ConvertToJobResultPayload failed:", err)
  27. return task, err
  28. }
  29. oldStatus := task.Status
  30. if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) {
  31. taskRoles := result.TaskRoles
  32. taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
  33. task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP
  34. task.ContainerID = taskRes.TaskStatuses[0].ContainerID
  35. }
  36. if (result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobRunning)) ||
  37. task.Status == string(models.JobRunning) || (result.JobStatus.State == string(models.JobRunning) && isNoteBookReady(task)) {
  38. models.ParseAndSetDurationFromCloudBrainOne(result, task)
  39. task.Status = result.JobStatus.State
  40. if oldStatus != task.Status {
  41. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  42. }
  43. err = models.UpdateJob(task)
  44. if err != nil {
  45. log.Error("UpdateJob failed:", err)
  46. return task, err
  47. }
  48. }
  49. return task, nil
  50. }
  51. func SyncGrampusNotebookStatus(job *models.Cloudbrain) (*models.Cloudbrain, error) {
  52. result, err := grampus.GetNotebookJob(job.JobID)
  53. if err != nil {
  54. log.Error("GetJob(%s) failed:%v", job.JobName, err)
  55. return job, err
  56. }
  57. if job.StartTime == 0 && result.JobInfo.StartedAt > 0 {
  58. job.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt)
  59. }
  60. oldStatus := job.Status
  61. job.Status = grampus.TransTrainJobStatus(result.JobInfo.Status)
  62. job.Duration = result.JobInfo.RunSec
  63. job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)
  64. if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 {
  65. job.EndTime = job.StartTime.Add(job.Duration)
  66. }
  67. job.CorrectCreateUnix()
  68. if len(job.AiCenter) == 0 {
  69. if len(result.JobInfo.Tasks) > 0 {
  70. if len(result.JobInfo.Tasks[0].CenterID) > 0 && len(result.JobInfo.Tasks[0].CenterName) > 0 {
  71. job.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0]
  72. }
  73. }
  74. }
  75. if job.Status != models.GrampusStatusWaiting {
  76. if oldStatus != job.Status {
  77. notification.NotifyChangeCloudbrainStatus(job, oldStatus)
  78. }
  79. job.TrainUrl = result.JobInfo.Tasks[0].CodeUrl
  80. job.DataUrl = result.JobInfo.Tasks[0].DataUrl
  81. err = models.UpdateJob(job)
  82. if err != nil {
  83. log.Error("UpdateJob failed:", err)
  84. return nil, err
  85. }
  86. }
  87. return job, nil
  88. }
  89. func isNoteBookReady(task *models.Cloudbrain) bool {
  90. if task.JobType != string(models.JobTypeDebug) {
  91. return true
  92. }
  93. noteBookUrl := setting.DebugServerHost + "jpylab_" + task.JobID + "_" + task.SubTaskName
  94. r := httplib.Get(noteBookUrl)
  95. res, err := r.Response()
  96. if err != nil {
  97. return false
  98. }
  99. if res.StatusCode == http.StatusOK {
  100. count := noteBookOKMap[task.ID]
  101. if count < successfulCount-1 {
  102. noteBookOKMap[task.ID] = count + 1
  103. return false
  104. } else {
  105. delete(noteBookOKMap, task.ID)
  106. return true
  107. }
  108. }
  109. return false
  110. }
  111. func StopDebugJob(task *models.Cloudbrain) error {
  112. param := models.NotebookAction{
  113. Action: models.ActionStop,
  114. }
  115. var err error = nil
  116. if task.JobType == string(models.JobTypeDebug) {
  117. if task.Type == models.TypeCloudBrainOne {
  118. return cloudbrain.StopJob(task.JobID)
  119. } else if task.Type == models.TypeCloudBrainTwo {
  120. _, err = modelarts.ManageNotebook2(task.JobID, param)
  121. } else if task.Type == models.TypeCDCenter {
  122. _, err = modelarts_cd.ManageNotebook(task.JobID, param)
  123. } else if task.Type == models.TypeC2Net {
  124. _, err = grampus.StopJob(task.JobID, task.JobType)
  125. }
  126. }
  127. return err
  128. }