You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 15 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago

  1. package grampus
  2. import (
  3. "fmt"
  4. "strings"
  5. "code.gitea.io/gitea/models"
  6. "code.gitea.io/gitea/modules/cloudbrain"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/setting"
  11. "code.gitea.io/gitea/modules/timeutil"
  12. )
  13. const (
  14. JobPath = "job/"
  15. ProcessorTypeNPU = "npu.huawei.com/NPU"
  16. ProcessorTypeGPU = "nvidia.com/gpu"
  17. ProcessorTypeGCU = "enflame-tech.com/gcu"
  18. GpuWorkDir = "/tmp/"
  19. NpuWorkDir = "/cache/"
  20. NpuLocalLogUrl = "/tmp/train.log"
  21. CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
  22. CodeArchiveName = "master.zip"
  23. BucketRemote = "grampus"
  24. RemoteModelPath = "/output/" + models.ModelSuffix
  25. autoStopDurationMs = 4 * 60 * 60 * 1000
  26. CommandGpuDebug = "mkdir -p /dataset;%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --ServerApp.shutdown_no_activity_timeout=%s --TerminalManager.cull_inactive_timeout=%s --TerminalManager.cull_interval=%s --MappingKernelManager.cull_idle_timeout=%s --MappingKernelManager.cull_interval=%s --MappingKernelManager.cull_connected=True --MappingKernelManager.cull_busy=True --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;"
  27. CommandGrampusDebug = CommandGpuDebug + "unzip %s;rm %s;"
  28. )
  29. var (
  30. poolInfos *models.PoolInfos
  31. FlavorInfos *setting.StFlavorInfos
  32. ImageInfos *setting.StImageInfosModelArts
  33. SpecialPools *models.SpecialPools
  34. CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
  35. "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  36. )
  37. type GenerateTrainJobReq struct {
  38. JobName string
  39. Command string
  40. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  41. ImageId string
  42. DisplayJobName string
  43. Uuid string
  44. Description string
  45. CodeObsPath string
  46. BootFile string
  47. BootFileUrl string
  48. DataUrl string
  49. TrainUrl string
  50. WorkServerNumber int
  51. EngineID int64
  52. CommitID string
  53. IsLatestVersion string
  54. BranchName string
  55. PreVersionId int64
  56. PreVersionName string
  57. VersionCount int
  58. EngineName string
  59. TotalVersionCount int
  60. ComputeResource string
  61. ProcessType string
  62. DatasetNames string
  63. DatasetInfos map[string]models.DatasetInfo
  64. Params string
  65. ModelName string
  66. LabelName string
  67. CkptName string
  68. ModelVersion string
  69. PreTrainModelPath string
  70. PreTrainModelUrl string
  71. Spec *models.Specification
  72. CodeName string
  73. }
  74. type GenerateNotebookJobReq struct {
  75. JobName string
  76. Command string
  77. ImageUrl string
  78. ImageId string
  79. DisplayJobName string
  80. Uuid string
  81. Description string
  82. CodeStoragePath string
  83. CommitID string
  84. BranchName string
  85. ComputeResource string
  86. ProcessType string
  87. DatasetNames string
  88. DatasetInfos map[string]models.DatasetInfo
  89. ModelName string
  90. LabelName string
  91. CkptName string
  92. ModelVersion string
  93. PreTrainModelPath string
  94. PreTrainModelUrl string
  95. Spec *models.Specification
  96. CodeName string
  97. ModelPath string //参考启智GPU调试, 挂载/model目录用户的模型可以输出到这个目录
  98. }
  99. func getEndPoint() string {
  100. index := strings.Index(setting.Endpoint, "//")
  101. endpoint := setting.Endpoint[index+2:]
  102. return endpoint
  103. }
  104. func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
  105. var datasetGrampus []models.GrampusDataset
  106. endPoint := getEndPoint()
  107. for _, datasetInfo := range datasetInfos {
  108. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  109. Name: datasetInfo.FullName,
  110. Bucket: setting.Bucket,
  111. EndPoint: endPoint,
  112. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  113. })
  114. }
  115. return datasetGrampus
  116. }
  117. func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  118. var datasetGrampus []models.GrampusDataset
  119. var command = ""
  120. for uuid, datasetInfo := range datasetInfos {
  121. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  122. Name: datasetInfo.FullName,
  123. Bucket: setting.Attachment.Minio.Bucket,
  124. EndPoint: setting.Attachment.Minio.Endpoint,
  125. ObjectKey: datasetInfo.DataLocalPath,
  126. ReadOnly: true,
  127. ContainerPath: "/dataset1/" + datasetInfo.Name,
  128. })
  129. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  130. }
  131. return datasetGrampus, command
  132. }
  133. func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
  134. createTime := timeutil.TimeStampNow()
  135. var datasetGrampus []models.GrampusDataset
  136. var codeGrampus models.GrampusDataset
  137. var cpCommand string
  138. imageUrl := req.ImageUrl
  139. if ProcessorTypeNPU == req.ProcessType {
  140. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  141. if len(req.ModelName) != 0 {
  142. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  143. Name: req.ModelName,
  144. Bucket: setting.Bucket,
  145. EndPoint: getEndPoint(),
  146. ReadOnly: true,
  147. ObjectKey: req.PreTrainModelPath,
  148. })
  149. }
  150. codeGrampus = models.GrampusDataset{
  151. Name: req.CodeName,
  152. Bucket: setting.Bucket,
  153. EndPoint: getEndPoint(),
  154. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  155. ReadOnly: false,
  156. }
  157. imageUrl = ""
  158. req.Command = ""
  159. } else {
  160. datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos)
  161. if len(req.ModelName) != 0 {
  162. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  163. Name: req.ModelName,
  164. Bucket: setting.Attachment.Minio.Bucket,
  165. EndPoint: setting.Attachment.Minio.Endpoint,
  166. ObjectKey: req.PreTrainModelPath,
  167. ReadOnly: true,
  168. ContainerPath: cloudbrain.PretrainModelMountPath,
  169. })
  170. }
  171. codeArchiveName := cloudbrain.DefaultBranchName + ".zip"
  172. codeGrampus = models.GrampusDataset{
  173. Name: req.CodeName,
  174. Bucket: setting.Attachment.Minio.Bucket,
  175. EndPoint: setting.Attachment.Minio.Endpoint,
  176. ObjectKey: req.CodeStoragePath + codeArchiveName,
  177. ReadOnly: false,
  178. ContainerPath: cloudbrain.CodeMountPath,
  179. }
  180. if ProcessorTypeGCU == req.ProcessType {
  181. imageUrl = ""
  182. }
  183. req.Command = fmt.Sprintf(CommandGrampusDebug, cpCommand, setting.CullIdleTimeout, setting.CullIdleTimeout, setting.CullInterval, setting.CullIdleTimeout, setting.CullInterval, codeArchiveName, codeArchiveName)
  184. log.Info("debug command:" + req.Command)
  185. }
  186. jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
  187. Name: req.JobName,
  188. Tasks: []models.GrampusNotebookTask{
  189. {
  190. Name: req.JobName,
  191. ResourceSpecId: req.Spec.SourceSpecId,
  192. ImageId: req.ImageId,
  193. ImageUrl: imageUrl,
  194. Datasets: datasetGrampus,
  195. Code: codeGrampus,
  196. AutoStopDuration: autoStopDurationMs,
  197. Capacity: setting.Capacity,
  198. Command: req.Command,
  199. CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
  200. },
  201. },
  202. })
  203. if err != nil {
  204. log.Error("createNotebookJob failed: %v", err.Error())
  205. return "", err
  206. }
  207. jobID := jobResult.JobInfo.JobID
  208. err = models.CreateCloudbrain(&models.Cloudbrain{
  209. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  210. UserID: ctx.User.ID,
  211. RepoID: ctx.Repo.Repository.ID,
  212. JobID: jobID,
  213. JobName: req.JobName,
  214. DisplayJobName: req.DisplayJobName,
  215. JobType: string(models.JobTypeDebug),
  216. Type: models.TypeC2Net,
  217. Uuid: req.Uuid,
  218. DatasetName: req.DatasetNames,
  219. CommitID: req.CommitID,
  220. IsLatestVersion: "1",
  221. ComputeResource: req.ComputeResource,
  222. ImageID: req.ImageId,
  223. BranchName: req.BranchName,
  224. Description: req.Description,
  225. WorkServerNumber: 1,
  226. EngineName: req.ImageUrl,
  227. CreatedUnix: createTime,
  228. UpdatedUnix: createTime,
  229. Spec: req.Spec,
  230. ModelName: req.ModelName,
  231. ModelVersion: req.ModelVersion,
  232. LabelName: req.LabelName,
  233. PreTrainModelUrl: req.PreTrainModelUrl,
  234. CkptName: req.CkptName,
  235. })
  236. if err != nil {
  237. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  238. return "", err
  239. }
  240. var actionType models.ActionType
  241. if req.ComputeResource == models.NPUResource {
  242. actionType = models.ActionCreateGrampusNPUDebugTask
  243. } else if req.ComputeResource == models.GPUResource {
  244. actionType = models.ActionCreateGrampusGPUDebugTask
  245. } else if req.ComputeResource == models.GCUResource {
  246. actionType = models.ActionCreateGrampusGCUDebugTask
  247. }
  248. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  249. return jobID, nil
  250. }
  251. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
  252. createTime := timeutil.TimeStampNow()
  253. var datasetGrampus, modelGrampus []models.GrampusDataset
  254. var codeGrampus models.GrampusDataset
  255. if ProcessorTypeNPU == req.ProcessType {
  256. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  257. if len(req.ModelName) != 0 {
  258. modelGrampus = []models.GrampusDataset{
  259. {
  260. Name: req.ModelName,
  261. Bucket: setting.Bucket,
  262. EndPoint: getEndPoint(),
  263. ObjectKey: req.PreTrainModelPath,
  264. },
  265. }
  266. }
  267. codeGrampus = models.GrampusDataset{
  268. Name: req.CodeName,
  269. Bucket: setting.Bucket,
  270. EndPoint: getEndPoint(),
  271. ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
  272. }
  273. }
  274. jobResult, err := createJob(models.CreateGrampusJobRequest{
  275. Name: req.JobName,
  276. Tasks: []models.GrampusTasks{
  277. {
  278. Name: req.JobName,
  279. Command: req.Command,
  280. ResourceSpecId: req.Spec.SourceSpecId,
  281. ImageId: req.ImageId,
  282. ImageUrl: req.ImageUrl,
  283. CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
  284. ReplicaNum: 1,
  285. Datasets: datasetGrampus,
  286. Models: modelGrampus,
  287. Code: codeGrampus,
  288. BootFile: req.BootFile,
  289. },
  290. },
  291. })
  292. if err != nil {
  293. log.Error("createJob failed: %v", err.Error())
  294. return "", err
  295. }
  296. jobID := jobResult.JobInfo.JobID
  297. err = models.CreateCloudbrain(&models.Cloudbrain{
  298. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  299. UserID: ctx.User.ID,
  300. RepoID: ctx.Repo.Repository.ID,
  301. JobID: jobID,
  302. JobName: req.JobName,
  303. DisplayJobName: req.DisplayJobName,
  304. JobType: string(models.JobTypeTrain),
  305. Type: models.TypeC2Net,
  306. Uuid: req.Uuid,
  307. DatasetName: req.DatasetNames,
  308. CommitID: req.CommitID,
  309. IsLatestVersion: req.IsLatestVersion,
  310. ComputeResource: req.ComputeResource,
  311. ImageID: req.ImageId,
  312. TrainUrl: req.TrainUrl,
  313. BranchName: req.BranchName,
  314. Parameters: req.Params,
  315. BootFile: req.BootFile,
  316. DataUrl: req.DataUrl,
  317. Description: req.Description,
  318. WorkServerNumber: req.WorkServerNumber,
  319. EngineName: req.EngineName,
  320. VersionCount: req.VersionCount,
  321. TotalVersionCount: req.TotalVersionCount,
  322. CreatedUnix: createTime,
  323. UpdatedUnix: createTime,
  324. Spec: req.Spec,
  325. ModelName: req.ModelName,
  326. ModelVersion: req.ModelVersion,
  327. LabelName: req.LabelName,
  328. PreTrainModelUrl: req.PreTrainModelUrl,
  329. CkptName: req.CkptName,
  330. })
  331. if err != nil {
  332. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  333. return "", err
  334. }
  335. var actionType models.ActionType
  336. if req.ComputeResource == models.NPUResource {
  337. actionType = models.ActionCreateGrampusNPUTrainTask
  338. } else if req.ComputeResource == models.GPUResource {
  339. actionType = models.ActionCreateGrampusGPUTrainTask
  340. }
  341. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  342. return jobID, nil
  343. }
  344. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  345. var centerID []string
  346. var centerName []string
  347. includeCenters := make(map[string]string)
  348. excludeCenters := make(map[string]string)
  349. if SpecialPools != nil {
  350. for _, pool := range SpecialPools.Pools {
  351. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  352. org, _ := models.GetOrgByName(pool.Org)
  353. if org != nil {
  354. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  355. if isOrgMember {
  356. for _, info := range pool.Pool {
  357. includeCenters[info.Queue] = info.Value
  358. }
  359. } else {
  360. for _, info := range pool.Pool {
  361. excludeCenters[info.Queue] = info.Value
  362. }
  363. }
  364. }
  365. }
  366. }
  367. }
  368. if len(includeCenters) > 0 {
  369. //如果有专属资源池,根据专属资源池指定智算中心
  370. for k, v := range includeCenters {
  371. centerID = append(centerID, k)
  372. centerName = append(centerName, v)
  373. }
  374. } else if len(excludeCenters) > 0 {
  375. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  376. allCenters := make(map[string]string)
  377. specs, err := GetResourceSpecs(req.ProcessType)
  378. if err == nil {
  379. for _, info := range specs.Infos {
  380. for _, center := range info.Centers {
  381. allCenters[center.ID] = center.Name
  382. }
  383. }
  384. }
  385. for k, _ := range excludeCenters {
  386. delete(allCenters, k)
  387. }
  388. for k, v := range allCenters {
  389. centerID = append(centerID, k)
  390. centerName = append(centerName, v)
  391. }
  392. }
  393. return centerID, centerName
  394. }
  395. func TransTrainJobStatus(status string) string {
  396. if status == models.GrampusStatusPending {
  397. status = models.GrampusStatusWaiting
  398. }
  399. return strings.ToUpper(status)
  400. }
  401. func GetNpuModelRemoteObsUrl(jobName string) string {
  402. return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
  403. }
  404. func GetNpuModelObjectKey(jobName string) string {
  405. return setting.CodePathPrefix + jobName + RemoteModelPath
  406. }
  407. func GetRemoteEndPoint(aiCenterID string) string {
  408. var endPoint string
  409. for _, info := range setting.CenterInfos.Info {
  410. if info.CenterID == aiCenterID {
  411. endPoint = info.Endpoint
  412. break
  413. }
  414. }
  415. return endPoint
  416. }
  417. func GetCenterProxy(aiCenterID string) string {
  418. var proxy string
  419. for _, info := range setting.CenterInfos.Info {
  420. if info.CenterID == aiCenterID {
  421. proxy = info.StorageProxyServer
  422. break
  423. }
  424. }
  425. return proxy
  426. }