You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 6.3 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package grampus
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "code.gitea.io/gitea/modules/setting"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/timeutil"
  11. )
  12. const (
  13. JobPath = "job/"
  14. ProcessorTypeNPU = "npu.huawei.com/NPU"
  15. ProcessorTypeGPU = "nvidia.com/gpu"
  16. GpuWorkDir = "/tmp/"
  17. NpuWorkDir = "/cache/"
  18. CommandPrepareScript = ";mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" +
  19. "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_obs downloader_for_minio uploader_for_minio;"
  20. //CommandPrepareScript = "pwd;cd /cache;mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" +
  21. // "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_obs downloader_for_minio uploader_for_minio;"
  22. CodeArchiveName = "master.zip"
  23. )
  24. var (
  25. poolInfos *models.PoolInfos
  26. FlavorInfos *models.FlavorInfos
  27. ImageInfos *models.ImageInfosModelArts
  28. SpecialPools *models.SpecialPools
  29. )
  30. type GenerateTrainJobReq struct {
  31. JobName string
  32. Command string
  33. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  34. ImageId string
  35. DisplayJobName string
  36. Uuid string
  37. Description string
  38. CodeObsPath string
  39. BootFile string
  40. BootFileUrl string
  41. DataUrl string
  42. TrainUrl string
  43. WorkServerNumber int
  44. EngineID int64
  45. CommitID string
  46. IsLatestVersion string
  47. BranchName string
  48. PreVersionId int64
  49. PreVersionName string
  50. VersionCount int
  51. EngineName string
  52. TotalVersionCount int
  53. ComputeResource string
  54. ProcessType string
  55. DatasetName string
  56. Params string
  57. Spec *models.Specification
  58. }
  59. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) {
  60. createTime := timeutil.TimeStampNow()
  61. centerID, centerName := getCentersParamter(ctx, req)
  62. jobResult, err := createJob(models.CreateGrampusJobRequest{
  63. Name: req.JobName,
  64. Tasks: []models.GrampusTasks{
  65. {
  66. Name: req.JobName,
  67. Command: req.Command,
  68. ResourceSpecId: req.Spec.SourceSpecId,
  69. ImageId: req.ImageId,
  70. ImageUrl: req.ImageUrl,
  71. CenterID: centerID,
  72. CenterName: centerName,
  73. ReplicaNum: 1,
  74. },
  75. },
  76. })
  77. if err != nil {
  78. log.Error("createJob failed: %v", err.Error())
  79. return err
  80. }
  81. jobID := jobResult.JobInfo.JobID
  82. err = models.CreateCloudbrain(&models.Cloudbrain{
  83. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  84. UserID: ctx.User.ID,
  85. RepoID: ctx.Repo.Repository.ID,
  86. JobID: jobID,
  87. JobName: req.JobName,
  88. DisplayJobName: req.DisplayJobName,
  89. JobType: string(models.JobTypeTrain),
  90. Type: models.TypeC2Net,
  91. Uuid: req.Uuid,
  92. DatasetName: req.DatasetName,
  93. CommitID: req.CommitID,
  94. IsLatestVersion: req.IsLatestVersion,
  95. ComputeResource: req.ComputeResource,
  96. ImageID: req.ImageId,
  97. TrainUrl: req.TrainUrl,
  98. BranchName: req.BranchName,
  99. Parameters: req.Params,
  100. BootFile: req.BootFile,
  101. DataUrl: req.DataUrl,
  102. Description: req.Description,
  103. WorkServerNumber: req.WorkServerNumber,
  104. EngineName: req.EngineName,
  105. VersionCount: req.VersionCount,
  106. TotalVersionCount: req.TotalVersionCount,
  107. CreatedUnix: createTime,
  108. UpdatedUnix: createTime,
  109. Spec: req.Spec,
  110. })
  111. if err != nil {
  112. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  113. return err
  114. }
  115. var actionType models.ActionType
  116. if req.ComputeResource == models.NPUResource {
  117. actionType = models.ActionCreateGrampusNPUTrainTask
  118. } else if req.ComputeResource == models.GPUResource {
  119. actionType = models.ActionCreateGrampusGPUTrainTask
  120. }
  121. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  122. return nil
  123. }
  124. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  125. var centerID []string
  126. var centerName []string
  127. includeCenters := make(map[string]string)
  128. excludeCenters := make(map[string]string)
  129. if SpecialPools != nil {
  130. for _, pool := range SpecialPools.Pools {
  131. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  132. org, _ := models.GetOrgByName(pool.Org)
  133. if org != nil {
  134. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  135. if isOrgMember {
  136. for _, info := range pool.Pool {
  137. includeCenters[info.Queue] = info.Value
  138. }
  139. } else {
  140. for _, info := range pool.Pool {
  141. excludeCenters[info.Queue] = info.Value
  142. }
  143. }
  144. }
  145. }
  146. }
  147. }
  148. if len(includeCenters) > 0 {
  149. //如果有专属资源池,根据专属资源池指定智算中心
  150. for k, v := range includeCenters {
  151. centerID = append(centerID, k)
  152. centerName = append(centerName, v)
  153. }
  154. } else if len(excludeCenters) > 0 {
  155. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  156. allCenters := make(map[string]string)
  157. specs, err := GetResourceSpecs(req.ProcessType)
  158. if err == nil {
  159. for _, info := range specs.Infos {
  160. for _, center := range info.Centers {
  161. allCenters[center.ID] = center.Name
  162. }
  163. }
  164. }
  165. for k, _ := range excludeCenters {
  166. delete(allCenters, k)
  167. }
  168. for k, v := range allCenters {
  169. centerID = append(centerID, k)
  170. centerName = append(centerName, v)
  171. }
  172. }
  173. return centerID, centerName
  174. }
  175. func TransTrainJobStatus(status string) string {
  176. if status == models.GrampusStatusPending {
  177. status = models.GrampusStatusWaiting
  178. }
  179. return strings.ToUpper(status)
  180. }
  181. func InitSpecialPool() {
  182. if SpecialPools == nil && setting.Grampus.SpecialPools != "" {
  183. json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
  184. }
  185. }