You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 14 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. package grampus
  2. import (
  3. "fmt"
  4. "strings"
  5. "code.gitea.io/gitea/models"
  6. "code.gitea.io/gitea/modules/cloudbrain"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/setting"
  11. "code.gitea.io/gitea/modules/timeutil"
  12. )
  13. const (
  14. JobPath = "job/"
  15. ProcessorTypeNPU = "npu.huawei.com/NPU"
  16. ProcessorTypeGPU = "nvidia.com/gpu"
  17. GpuWorkDir = "/tmp/"
  18. NpuWorkDir = "/cache/"
  19. NpuLocalLogUrl = "/tmp/train.log"
  20. CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
  21. CodeArchiveName = "master.zip"
  22. BucketRemote = "grampus"
  23. RemoteModelPath = "/output/" + models.ModelSuffix
  24. autoStopDurationMs = 4 * 60 * 60 * 1000
  25. CommandGpuDebug = "mkdir -p /dataset;%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;"
  26. )
  27. var (
  28. poolInfos *models.PoolInfos
  29. FlavorInfos *setting.StFlavorInfos
  30. ImageInfos *setting.StImageInfosModelArts
  31. SpecialPools *models.SpecialPools
  32. CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
  33. "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  34. )
  35. type GenerateTrainJobReq struct {
  36. JobName string
  37. Command string
  38. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  39. ImageId string
  40. DisplayJobName string
  41. Uuid string
  42. Description string
  43. CodeObsPath string
  44. BootFile string
  45. BootFileUrl string
  46. DataUrl string
  47. TrainUrl string
  48. WorkServerNumber int
  49. EngineID int64
  50. CommitID string
  51. IsLatestVersion string
  52. BranchName string
  53. PreVersionId int64
  54. PreVersionName string
  55. VersionCount int
  56. EngineName string
  57. TotalVersionCount int
  58. ComputeResource string
  59. ProcessType string
  60. DatasetNames string
  61. DatasetInfos map[string]models.DatasetInfo
  62. Params string
  63. ModelName string
  64. LabelName string
  65. CkptName string
  66. ModelVersion string
  67. PreTrainModelPath string
  68. PreTrainModelUrl string
  69. Spec *models.Specification
  70. CodeName string
  71. }
  72. type GenerateNotebookJobReq struct {
  73. JobName string
  74. Command string
  75. ImageUrl string
  76. ImageId string
  77. DisplayJobName string
  78. Uuid string
  79. Description string
  80. CodeStoragePath string
  81. CommitID string
  82. BranchName string
  83. ComputeResource string
  84. ProcessType string
  85. DatasetNames string
  86. DatasetInfos map[string]models.DatasetInfo
  87. ModelName string
  88. LabelName string
  89. CkptName string
  90. ModelVersion string
  91. PreTrainModelPath string
  92. PreTrainModelUrl string
  93. Spec *models.Specification
  94. CodeName string
  95. }
  96. func getEndPoint() string {
  97. index := strings.Index(setting.Endpoint, "//")
  98. endpoint := setting.Endpoint[index+2:]
  99. return endpoint
  100. }
  101. func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
  102. var datasetGrampus []models.GrampusDataset
  103. endPoint := getEndPoint()
  104. for _, datasetInfo := range datasetInfos {
  105. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  106. Name: datasetInfo.FullName,
  107. Bucket: setting.Bucket,
  108. EndPoint: endPoint,
  109. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  110. })
  111. }
  112. return datasetGrampus
  113. }
  114. func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  115. var datasetGrampus []models.GrampusDataset
  116. var command = ""
  117. for uuid, datasetInfo := range datasetInfos {
  118. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  119. Name: datasetInfo.FullName,
  120. Bucket: setting.Attachment.Minio.Bucket,
  121. EndPoint: setting.Attachment.Minio.Endpoint,
  122. ObjectKey: datasetInfo.DataLocalPath,
  123. ReadOnly: true,
  124. ContainerPath: "/dataset1/" + datasetInfo.Name,
  125. })
  126. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  127. }
  128. return datasetGrampus, command
  129. }
  130. func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
  131. createTime := timeutil.TimeStampNow()
  132. var datasetGrampus []models.GrampusDataset
  133. var codeGrampus models.GrampusDataset
  134. var cpCommand string
  135. imageUrl := req.ImageUrl
  136. if ProcessorTypeNPU == req.ProcessType {
  137. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  138. if len(req.ModelName) != 0 {
  139. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  140. Name: req.ModelName,
  141. Bucket: setting.Bucket,
  142. EndPoint: getEndPoint(),
  143. ReadOnly: true,
  144. ObjectKey: req.PreTrainModelPath,
  145. })
  146. }
  147. codeGrampus = models.GrampusDataset{
  148. Name: req.CodeName,
  149. Bucket: setting.Bucket,
  150. EndPoint: getEndPoint(),
  151. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  152. ReadOnly: false,
  153. }
  154. imageUrl = ""
  155. req.Command = ""
  156. } else {
  157. datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos)
  158. if len(req.ModelName) != 0 {
  159. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  160. Name: req.ModelName,
  161. Bucket: setting.Attachment.Minio.Bucket,
  162. EndPoint: setting.Attachment.Minio.Endpoint,
  163. ObjectKey: req.PreTrainModelPath,
  164. ReadOnly: true,
  165. ContainerPath: "/model",
  166. })
  167. }
  168. codeGrampus = models.GrampusDataset{
  169. Name: req.CodeName,
  170. Bucket: setting.Attachment.Minio.Bucket,
  171. EndPoint: setting.Attachment.Minio.Endpoint,
  172. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  173. ReadOnly: false,
  174. ContainerPath: "/code",
  175. }
  176. req.Command = fmt.Sprintf(CommandGpuDebug, cpCommand)
  177. log.Info("debug command:" + req.Command)
  178. }
  179. jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
  180. Name: req.JobName,
  181. Tasks: []models.GrampusNotebookTask{
  182. {
  183. Name: req.JobName,
  184. ResourceSpecId: req.Spec.SourceSpecId,
  185. ImageId: req.ImageId,
  186. ImageUrl: imageUrl,
  187. Datasets: datasetGrampus,
  188. Code: codeGrampus,
  189. AutoStopDuration: autoStopDurationMs,
  190. Capacity: setting.Capacity,
  191. Command: req.Command,
  192. },
  193. },
  194. })
  195. if err != nil {
  196. log.Error("createNotebookJob failed: %v", err.Error())
  197. return "", err
  198. }
  199. jobID := jobResult.JobInfo.JobID
  200. err = models.CreateCloudbrain(&models.Cloudbrain{
  201. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  202. UserID: ctx.User.ID,
  203. RepoID: ctx.Repo.Repository.ID,
  204. JobID: jobID,
  205. JobName: req.JobName,
  206. DisplayJobName: req.DisplayJobName,
  207. JobType: string(models.JobTypeDebug),
  208. Type: models.TypeC2Net,
  209. Uuid: req.Uuid,
  210. DatasetName: req.DatasetNames,
  211. CommitID: req.CommitID,
  212. IsLatestVersion: "1",
  213. ComputeResource: req.ComputeResource,
  214. ImageID: req.ImageId,
  215. BranchName: req.BranchName,
  216. Description: req.Description,
  217. WorkServerNumber: 1,
  218. EngineName: req.ImageUrl,
  219. CreatedUnix: createTime,
  220. UpdatedUnix: createTime,
  221. Spec: req.Spec,
  222. ModelName: req.ModelName,
  223. ModelVersion: req.ModelVersion,
  224. LabelName: req.LabelName,
  225. PreTrainModelUrl: req.PreTrainModelUrl,
  226. CkptName: req.CkptName,
  227. })
  228. if err != nil {
  229. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  230. return "", err
  231. }
  232. var actionType models.ActionType
  233. if req.ComputeResource == models.NPUResource {
  234. actionType = models.ActionCreateGrampusNPUDebugTask
  235. } else if req.ComputeResource == models.GPUResource {
  236. actionType = models.ActionCreateGrampusGPUDebugTask
  237. }
  238. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  239. return jobID, nil
  240. }
  241. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
  242. createTime := timeutil.TimeStampNow()
  243. centerID, centerName := getCentersParamter(ctx, req)
  244. var datasetGrampus, modelGrampus []models.GrampusDataset
  245. var codeGrampus models.GrampusDataset
  246. if ProcessorTypeNPU == req.ProcessType {
  247. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  248. if len(req.ModelName) != 0 {
  249. modelGrampus = []models.GrampusDataset{
  250. {
  251. Name: req.ModelName,
  252. Bucket: setting.Bucket,
  253. EndPoint: getEndPoint(),
  254. ObjectKey: req.PreTrainModelPath,
  255. },
  256. }
  257. }
  258. codeGrampus = models.GrampusDataset{
  259. Name: req.CodeName,
  260. Bucket: setting.Bucket,
  261. EndPoint: getEndPoint(),
  262. ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
  263. }
  264. }
  265. jobResult, err := createJob(models.CreateGrampusJobRequest{
  266. Name: req.JobName,
  267. Tasks: []models.GrampusTasks{
  268. {
  269. Name: req.JobName,
  270. Command: req.Command,
  271. ResourceSpecId: req.Spec.SourceSpecId,
  272. ImageId: req.ImageId,
  273. ImageUrl: req.ImageUrl,
  274. CenterID: centerID,
  275. CenterName: centerName,
  276. ReplicaNum: 1,
  277. Datasets: datasetGrampus,
  278. Models: modelGrampus,
  279. Code: codeGrampus,
  280. BootFile: req.BootFile,
  281. },
  282. },
  283. })
  284. if err != nil {
  285. log.Error("createJob failed: %v", err.Error())
  286. return "", err
  287. }
  288. jobID := jobResult.JobInfo.JobID
  289. err = models.CreateCloudbrain(&models.Cloudbrain{
  290. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  291. UserID: ctx.User.ID,
  292. RepoID: ctx.Repo.Repository.ID,
  293. JobID: jobID,
  294. JobName: req.JobName,
  295. DisplayJobName: req.DisplayJobName,
  296. JobType: string(models.JobTypeTrain),
  297. Type: models.TypeC2Net,
  298. Uuid: req.Uuid,
  299. DatasetName: req.DatasetNames,
  300. CommitID: req.CommitID,
  301. IsLatestVersion: req.IsLatestVersion,
  302. ComputeResource: req.ComputeResource,
  303. ImageID: req.ImageId,
  304. TrainUrl: req.TrainUrl,
  305. BranchName: req.BranchName,
  306. Parameters: req.Params,
  307. BootFile: req.BootFile,
  308. DataUrl: req.DataUrl,
  309. Description: req.Description,
  310. WorkServerNumber: req.WorkServerNumber,
  311. EngineName: req.EngineName,
  312. VersionCount: req.VersionCount,
  313. TotalVersionCount: req.TotalVersionCount,
  314. CreatedUnix: createTime,
  315. UpdatedUnix: createTime,
  316. Spec: req.Spec,
  317. ModelName: req.ModelName,
  318. ModelVersion: req.ModelVersion,
  319. LabelName: req.LabelName,
  320. PreTrainModelUrl: req.PreTrainModelUrl,
  321. CkptName: req.CkptName,
  322. })
  323. if err != nil {
  324. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  325. return "", err
  326. }
  327. var actionType models.ActionType
  328. if req.ComputeResource == models.NPUResource {
  329. actionType = models.ActionCreateGrampusNPUTrainTask
  330. } else if req.ComputeResource == models.GPUResource {
  331. actionType = models.ActionCreateGrampusGPUTrainTask
  332. }
  333. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  334. return jobID, nil
  335. }
  336. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  337. var centerID []string
  338. var centerName []string
  339. includeCenters := make(map[string]string)
  340. excludeCenters := make(map[string]string)
  341. if SpecialPools != nil {
  342. for _, pool := range SpecialPools.Pools {
  343. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  344. org, _ := models.GetOrgByName(pool.Org)
  345. if org != nil {
  346. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  347. if isOrgMember {
  348. for _, info := range pool.Pool {
  349. includeCenters[info.Queue] = info.Value
  350. }
  351. } else {
  352. for _, info := range pool.Pool {
  353. excludeCenters[info.Queue] = info.Value
  354. }
  355. }
  356. }
  357. }
  358. }
  359. }
  360. if len(includeCenters) > 0 {
  361. //如果有专属资源池,根据专属资源池指定智算中心
  362. for k, v := range includeCenters {
  363. centerID = append(centerID, k)
  364. centerName = append(centerName, v)
  365. }
  366. } else if len(excludeCenters) > 0 {
  367. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  368. allCenters := make(map[string]string)
  369. specs, err := GetResourceSpecs(req.ProcessType)
  370. if err == nil {
  371. for _, info := range specs.Infos {
  372. for _, center := range info.Centers {
  373. allCenters[center.ID] = center.Name
  374. }
  375. }
  376. }
  377. for k, _ := range excludeCenters {
  378. delete(allCenters, k)
  379. }
  380. for k, v := range allCenters {
  381. centerID = append(centerID, k)
  382. centerName = append(centerName, v)
  383. }
  384. }
  385. return centerID, centerName
  386. }
  387. func TransTrainJobStatus(status string) string {
  388. if status == models.GrampusStatusPending {
  389. status = models.GrampusStatusWaiting
  390. }
  391. return strings.ToUpper(status)
  392. }
  393. func GetNpuModelRemoteObsUrl(jobName string) string {
  394. return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
  395. }
  396. func GetNpuModelObjectKey(jobName string) string {
  397. return setting.CodePathPrefix + jobName + RemoteModelPath
  398. }
  399. func GetRemoteEndPoint(aiCenterID string) string {
  400. var endPoint string
  401. for _, info := range setting.CenterInfos.Info {
  402. if info.CenterID == aiCenterID {
  403. endPoint = info.Endpoint
  404. break
  405. }
  406. }
  407. return endPoint
  408. }
  409. func GetCenterProxy(aiCenterID string) string {
  410. var proxy string
  411. for _, info := range setting.CenterInfos.Info {
  412. if info.CenterID == aiCenterID {
  413. proxy = info.StorageProxyServer
  414. break
  415. }
  416. }
  417. return proxy
  418. }