You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 38 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago

  1. package repo
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "os"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "code.gitea.io/gitea/services/cloudbrain/resource"
  14. "code.gitea.io/gitea/modules/auth"
  15. "code.gitea.io/gitea/modules/git"
  16. "code.gitea.io/gitea/modules/grampus"
  17. "code.gitea.io/gitea/modules/modelarts"
  18. "code.gitea.io/gitea/modules/notification"
  19. "code.gitea.io/gitea/modules/timeutil"
  20. "code.gitea.io/gitea/modules/util"
  21. "github.com/unknwon/com"
  22. "code.gitea.io/gitea/models"
  23. "code.gitea.io/gitea/modules/base"
  24. "code.gitea.io/gitea/modules/cloudbrain"
  25. "code.gitea.io/gitea/modules/context"
  26. "code.gitea.io/gitea/modules/log"
  27. "code.gitea.io/gitea/modules/setting"
  28. )
  29. const (
  30. tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show"
  31. //GPU
  32. tplGrampusTrainJobGPUNew base.TplName = "repo/grampus/trainjob/gpu/new"
  33. tplGrampusTrainJobGPUVersionNew base.TplName = "repo/grampus/trainjob/gpu/versionnew"
  34. //NPU
  35. tplGrampusTrainJobNPUNew base.TplName = "repo/grampus/trainjob/npu/new"
  36. tplGrampusTrainJobNPUVersionNew base.TplName = "repo/grampus/trainjob/npu/versionnew"
  37. )
  38. func GrampusTrainJobGPUNew(ctx *context.Context) {
  39. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  40. err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  41. if err != nil {
  42. ctx.ServerError("get new train-job info failed", err)
  43. return
  44. }
  45. ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew)
  46. }
  47. func GrampusTrainJobNPUNew(ctx *context.Context) {
  48. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  49. err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  50. if err != nil {
  51. ctx.ServerError("get new train-job info failed", err)
  52. return
  53. }
  54. ctx.HTML(200, tplGrampusTrainJobNPUNew)
  55. }
  56. func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error {
  57. ctx.Data["PageIsCloudBrain"] = true
  58. t := time.Now()
  59. var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
  60. ctx.Data["display_job_name"] = displayJobName
  61. //get valid images
  62. images, err := grampus.GetImages(processType)
  63. if err != nil {
  64. log.Error("GetImages failed:", err.Error())
  65. } else {
  66. ctx.Data["images"] = images.Infos
  67. }
  68. grampus.InitSpecialPool()
  69. ctx.Data["GPUEnabled"] = true
  70. ctx.Data["NPUEnabled"] = true
  71. includeCenters := make(map[string]struct{})
  72. excludeCenters := make(map[string]struct{})
  73. if grampus.SpecialPools != nil {
  74. for _, pool := range grampus.SpecialPools.Pools {
  75. if pool.IsExclusive {
  76. if !IsUserInOrgPool(ctx.User.ID, pool) {
  77. ctx.Data[pool.Type+"Enabled"] = false
  78. }
  79. } else {
  80. if strings.Contains(strings.ToLower(processType), strings.ToLower(pool.Type)) {
  81. if IsUserInOrgPool(ctx.User.ID, pool) {
  82. for _, center := range pool.Pool {
  83. includeCenters[center.Queue] = struct{}{}
  84. }
  85. } else {
  86. for _, center := range pool.Pool {
  87. excludeCenters[center.Queue] = struct{}{}
  88. }
  89. }
  90. }
  91. }
  92. }
  93. }
  94. //prepare available specs
  95. if processType == grampus.ProcessorTypeNPU {
  96. prepareGrampusTrainSpecs(ctx, models.NPU)
  97. } else if processType == grampus.ProcessorTypeGPU {
  98. prepareGrampusTrainSpecs(ctx, models.GPU)
  99. }
  100. //get branches
  101. branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0)
  102. if err != nil {
  103. log.Error("GetBranches error:", err.Error())
  104. } else {
  105. ctx.Data["branches"] = branches
  106. }
  107. ctx.Data["branchName"] = ctx.Repo.BranchName
  108. if processType == grampus.ProcessorTypeGPU {
  109. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  110. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.GPUResource, models.JobTypeTrain)
  111. ctx.Data["WaitCount"] = waitCount
  112. } else if processType == grampus.ProcessorTypeNPU {
  113. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  114. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.NPUResource, models.JobTypeTrain)
  115. ctx.Data["WaitCount"] = waitCount
  116. }
  117. return nil
  118. }
  119. func GrampusTrainJobVersionNew(ctx *context.Context) {
  120. task := ctx.Cloudbrain
  121. if task.ComputeResource == models.GPUResource {
  122. err := grampusTrainJobVersionNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  123. if err != nil {
  124. ctx.ServerError("get new train-job version info failed", err)
  125. return
  126. }
  127. ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUVersionNew)
  128. } else if task.ComputeResource == models.NPUResource {
  129. err := grampusTrainJobVersionNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  130. if err != nil {
  131. ctx.ServerError("get new train-job version info failed", err)
  132. return
  133. }
  134. ctx.HTML(200, tplGrampusTrainJobNPUVersionNew)
  135. }
  136. }
  137. func grampusTrainJobVersionNewDataPrepare(ctx *context.Context, processType string) error {
  138. ctx.Data["PageIsCloudBrain"] = true
  139. t := time.Now()
  140. var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
  141. ctx.Data["display_job_name"] = displayJobName
  142. //get valid images
  143. images, err := grampus.GetImages(processType)
  144. if err != nil {
  145. log.Error("GetImages failed:", err.Error())
  146. } else {
  147. ctx.Data["images"] = images.Infos
  148. }
  149. grampus.InitSpecialPool()
  150. ctx.Data["GPUEnabled"] = true
  151. ctx.Data["NPUEnabled"] = true
  152. includeCenters := make(map[string]struct{})
  153. excludeCenters := make(map[string]struct{})
  154. if grampus.SpecialPools != nil {
  155. for _, pool := range grampus.SpecialPools.Pools {
  156. if pool.IsExclusive {
  157. if !IsUserInOrgPool(ctx.User.ID, pool) {
  158. ctx.Data[pool.Type+"Enabled"] = false
  159. }
  160. } else {
  161. if strings.Contains(strings.ToLower(processType), strings.ToLower(pool.Type)) {
  162. if IsUserInOrgPool(ctx.User.ID, pool) {
  163. for _, center := range pool.Pool {
  164. includeCenters[center.Queue] = struct{}{}
  165. }
  166. } else {
  167. for _, center := range pool.Pool {
  168. excludeCenters[center.Queue] = struct{}{}
  169. }
  170. }
  171. }
  172. }
  173. }
  174. }
  175. //prepare available specs
  176. if processType == grampus.ProcessorTypeNPU {
  177. prepareGrampusTrainSpecs(ctx, models.NPU)
  178. } else if processType == grampus.ProcessorTypeGPU {
  179. prepareGrampusTrainSpecs(ctx, models.GPU)
  180. }
  181. //get branches
  182. branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0)
  183. if err != nil {
  184. log.Error("GetBranches error:", err.Error())
  185. } else {
  186. ctx.Data["branches"] = branches
  187. }
  188. ctx.Data["branch_name"] = ctx.Cloudbrain.BranchName
  189. ctx.Data["image_name"] = ctx.Cloudbrain.Image
  190. ctx.Data["image_id"] = ctx.Cloudbrain.ImageID
  191. ctx.Data["boot_file"] = ctx.Cloudbrain.BootFile
  192. ctx.Data["description"] = ctx.Cloudbrain.Description
  193. spec, _ := resource.GetCloudbrainSpec(ctx.Cloudbrain.ID)
  194. if spec != nil {
  195. log.Info("spec_id = %d", spec.ID)
  196. ctx.Data["spec_id"] = spec.ID
  197. }
  198. var Parameters modelarts.Parameters
  199. if err = json.Unmarshal([]byte(ctx.Cloudbrain.Parameters), &Parameters); err != nil {
  200. ctx.ServerError("json.Unmarshal failed:", err)
  201. return err
  202. }
  203. ctx.Data["params"] = Parameters.Parameter
  204. _, _, datasetNames, _, err := getDatasUrlListByUUIDS(ctx.Cloudbrain.Uuid)
  205. if err != nil {
  206. log.Info("query dataset error," + err.Error())
  207. ctx.Data["dataset_name"] = ""
  208. } else {
  209. ctx.Data["dataset_name"] = datasetNames
  210. }
  211. ctx.Data["uuid"] = ctx.Cloudbrain.Uuid
  212. ctx.Data["cloudbrain_type"] = models.C2NetCluster
  213. ctx.Data["compute_resource"] = ctx.Cloudbrain.ComputeResource
  214. if processType == grampus.ProcessorTypeGPU {
  215. ctx.Data["dataset_type"] = models.TypeCloudBrainOne
  216. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.GPUResource, models.JobTypeTrain)
  217. ctx.Data["wait_count"] = waitCount
  218. } else if processType == grampus.ProcessorTypeNPU {
  219. ctx.Data["dataset_type"] = models.TypeCloudBrainTwo
  220. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.NPUResource, models.JobTypeTrain)
  221. ctx.Data["wait_count"] = waitCount
  222. ctx.Data["work_server_number"] = ctx.Cloudbrain.WorkServerNumber
  223. }
  224. ctx.Data["model_name"] = ctx.Cloudbrain.ModelName
  225. ctx.Data["label_name"] = ctx.Cloudbrain.LabelName
  226. ctx.Data["ckpt_name"] = ctx.Cloudbrain.CkptName
  227. ctx.Data["model_version"] = ctx.Cloudbrain.ModelVersion
  228. ctx.Data["pre_train_model_url"] = ctx.Cloudbrain.PreTrainModelUrl
  229. return nil
  230. }
  231. func prepareGrampusTrainSpecs(ctx *context.Context, computeResource string) {
  232. noteBookSpecs, _ := resource.FindAvailableSpecs(ctx.User.ID, models.FindSpecsOptions{
  233. JobType: models.JobTypeTrain,
  234. ComputeResource: computeResource,
  235. Cluster: models.C2NetCluster,
  236. })
  237. ctx.Data["Specs"] = noteBookSpecs
  238. }
  239. func getFilterSpecBySpecialPool(specs *models.GetGrampusResourceSpecsResult, includeCenters map[string]struct{}, excludeCenters map[string]struct{}) []models.GrampusSpec {
  240. if len(includeCenters) == 0 && len(excludeCenters) == 0 {
  241. return specs.Infos
  242. }
  243. var grampusSpecs []models.GrampusSpec
  244. for _, info := range specs.Infos {
  245. if isInIncludeCenters(info, includeCenters) || (len(excludeCenters) != 0 && isNotAllInExcludeCenters(info, excludeCenters)) {
  246. grampusSpecs = append(grampusSpecs, info)
  247. }
  248. }
  249. return grampusSpecs
  250. }
  251. func isInIncludeCenters(grampusSpec models.GrampusSpec, centers map[string]struct{}) bool {
  252. for _, center := range grampusSpec.Centers {
  253. if _, ok := centers[center.ID]; ok {
  254. return true
  255. }
  256. }
  257. return false
  258. }
  259. func isNotAllInExcludeCenters(grampusSpec models.GrampusSpec, centers map[string]struct{}) bool {
  260. for _, center := range grampusSpec.Centers {
  261. if _, ok := centers[center.ID]; !ok {
  262. return true
  263. }
  264. }
  265. return false
  266. }
  267. func IsUserInOrgPool(userId int64, pool *models.SpecialPool) bool {
  268. org, _ := models.GetOrgByName(pool.Org)
  269. if org != nil {
  270. isOrgMember, _ := models.IsOrganizationMember(org.ID, userId)
  271. return isOrgMember
  272. }
  273. return false
  274. }
  275. func grampusParamCheckCreateTrainJob(form auth.CreateGrampusTrainJobForm) error {
  276. if !strings.HasSuffix(strings.TrimSpace(form.BootFile), ".py") {
  277. log.Error("the boot file(%s) must be a python file", form.BootFile)
  278. return errors.New("启动文件必须是python文件")
  279. }
  280. if form.BranchName == "" {
  281. log.Error("the branch must not be null!", form.BranchName)
  282. return errors.New("代码分支不能为空!")
  283. }
  284. return nil
  285. }
  286. func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) {
  287. displayJobName := form.DisplayJobName
  288. jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
  289. uuid := form.Attachment
  290. description := form.Description
  291. bootFile := strings.TrimSpace(form.BootFile)
  292. params := form.Params
  293. repo := ctx.Repo.Repository
  294. codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/"
  295. codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/"
  296. branchName := form.BranchName
  297. image := strings.TrimSpace(form.Image)
  298. tpl := tplGrampusTrainJobGPUNew
  299. var jobID = ctx.Params(":jobid")
  300. if jobID != "" {
  301. tpl = tplGrampusTrainJobGPUVersionNew
  302. }
  303. if !jobNamePattern.MatchString(displayJobName) {
  304. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  305. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form)
  306. return
  307. }
  308. bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName)
  309. if err != nil || !bootFileExist {
  310. log.Error("Get bootfile error:", err, ctx.Data["MsgID"])
  311. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  312. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpl, &form)
  313. return
  314. }
  315. errStr := checkSpecialPool(ctx, "GPU")
  316. if errStr != "" {
  317. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  318. ctx.RenderWithErr(errStr, tpl, &form)
  319. return
  320. }
  321. //check count limit
  322. count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.GPUResource)
  323. if err != nil {
  324. log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"])
  325. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  326. ctx.RenderWithErr("system error", tpl, &form)
  327. return
  328. } else {
  329. if count >= 1 {
  330. log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
  331. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  332. ctx.RenderWithErr("you have already a running or waiting task, can not create more", tpl, &form)
  333. return
  334. }
  335. }
  336. //check param
  337. if err := grampusParamCheckCreateTrainJob(form); err != nil {
  338. log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"])
  339. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  340. ctx.RenderWithErr(err.Error(), tpl, &form)
  341. return
  342. }
  343. //check whether the task name in the project is duplicated
  344. tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName)
  345. if err == nil {
  346. if len(tasks) != 0 {
  347. log.Error("the job name did already exist", ctx.Data["MsgID"])
  348. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  349. ctx.RenderWithErr("the job name did already exist", tpl, &form)
  350. return
  351. }
  352. } else {
  353. if !models.IsErrJobNotExist(err) {
  354. log.Error("system error, %v", err, ctx.Data["MsgID"])
  355. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  356. ctx.RenderWithErr("system error", tpl, &form)
  357. return
  358. }
  359. }
  360. //check specification
  361. spec, err := resource.GetAndCheckSpec(ctx.User.ID, form.SpecId, models.FindSpecsOptions{
  362. JobType: models.JobTypeTrain,
  363. ComputeResource: models.GPU,
  364. Cluster: models.C2NetCluster,
  365. })
  366. if err != nil || spec == nil {
  367. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  368. ctx.RenderWithErr("Resource specification not available", tpl, &form)
  369. return
  370. }
  371. //check dataset
  372. datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid, models.GPU)
  373. if err != nil {
  374. log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"])
  375. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  376. ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form)
  377. return
  378. }
  379. //prepare code and out path
  380. _, err = ioutil.ReadDir(codeLocalPath)
  381. if err == nil {
  382. os.RemoveAll(codeLocalPath)
  383. }
  384. if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
  385. log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  386. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  387. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
  388. return
  389. }
  390. //todo: upload code (send to file_server todo this work?)
  391. //upload code
  392. if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil {
  393. log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  394. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  395. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
  396. return
  397. }
  398. modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/"
  399. if err := mkModelPath(modelPath); err != nil {
  400. log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  401. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  402. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
  403. return
  404. }
  405. //init model readme
  406. if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil {
  407. log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  408. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  409. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
  410. return
  411. }
  412. var datasetRemotePath, allFileName string
  413. for _, datasetInfo := range datasetInfos {
  414. if datasetRemotePath == "" {
  415. datasetRemotePath = datasetInfo.DataLocalPath
  416. allFileName = datasetInfo.FullName
  417. } else {
  418. datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath
  419. allFileName = allFileName + ";" + datasetInfo.FullName
  420. }
  421. }
  422. //prepare command
  423. preTrainModelPath := getPreTrainModelPath(form.PreTrainModelUrl, form.CkptName)
  424. command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, form.CkptName)
  425. if err != nil {
  426. log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
  427. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  428. ctx.RenderWithErr("Create task failed, internal error", tpl, &form)
  429. return
  430. }
  431. commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)
  432. req := &grampus.GenerateTrainJobReq{
  433. JobName: jobName,
  434. DisplayJobName: displayJobName,
  435. ComputeResource: models.GPUResource,
  436. ProcessType: grampus.ProcessorTypeGPU,
  437. Command: command,
  438. ImageUrl: image,
  439. Description: description,
  440. BootFile: bootFile,
  441. Uuid: uuid,
  442. CommitID: commitID,
  443. BranchName: branchName,
  444. Params: form.Params,
  445. EngineName: image,
  446. DatasetNames: datasetNames,
  447. DatasetInfos: datasetInfos,
  448. IsLatestVersion: modelarts.IsLatestVersion,
  449. VersionCount: modelarts.VersionCountOne,
  450. WorkServerNumber: 1,
  451. Spec: spec,
  452. }
  453. if form.ModelName != "" { //使用预训练模型训练
  454. req.ModelName = form.ModelName
  455. req.LabelName = form.LabelName
  456. req.CkptName = form.CkptName
  457. req.ModelVersion = form.ModelVersion
  458. req.PreTrainModelUrl = form.PreTrainModelUrl
  459. }
  460. err = grampus.GenerateTrainJob(ctx, req)
  461. if err != nil {
  462. log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"])
  463. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  464. ctx.RenderWithErr(err.Error(), tpl, &form)
  465. return
  466. }
  467. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job")
  468. }
  469. func getPreTrainModelPath(pretrainModelDir string, fileName string) string {
  470. index := strings.Index(pretrainModelDir, "/")
  471. if index > 0 {
  472. filterBucket := pretrainModelDir[index+1:]
  473. return filterBucket + fileName
  474. } else {
  475. return ""
  476. }
  477. }
  478. func GrampusTrainJobVersionCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) {
  479. computeResource := ctx.Query("compute_resource")
  480. if computeResource == models.GPUResource {
  481. GrampusTrainJobGpuCreate(ctx, form)
  482. } else if computeResource == models.NPUResource {
  483. GrampusTrainJobNpuCreate(ctx, form)
  484. } else {
  485. ctx.ServerError("resource error", errors.New("compute resource is not support"))
  486. return
  487. }
  488. }
  489. func checkSpecialPool(ctx *context.Context, resourceType string) string {
  490. grampus.InitSpecialPool()
  491. if grampus.SpecialPools != nil {
  492. for _, pool := range grampus.SpecialPools.Pools {
  493. if pool.IsExclusive && pool.Type == resourceType {
  494. org, _ := models.GetOrgByName(pool.Org)
  495. if org != nil {
  496. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  497. if !isOrgMember {
  498. return ctx.Tr("repo.grampus.no_operate_right")
  499. }
  500. }
  501. }
  502. }
  503. }
  504. return ""
  505. }
  506. func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) {
  507. displayJobName := form.DisplayJobName
  508. jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
  509. uuid := form.Attachment
  510. description := form.Description
  511. bootFile := strings.TrimSpace(form.BootFile)
  512. params := form.Params
  513. repo := ctx.Repo.Repository
  514. codeLocalPath := setting.JobPath + jobName + modelarts.CodePath
  515. codeObsPath := grampus.JobPath + jobName + modelarts.CodePath
  516. //dataObsPath := setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + "/"
  517. branchName := form.BranchName
  518. isLatestVersion := modelarts.IsLatestVersion
  519. versionCount := modelarts.VersionCountOne
  520. engineName := form.EngineName
  521. tpl := tplGrampusTrainJobNPUNew
  522. //判断路由是否存在jobID,若存在,则说明是创建版本
  523. var jobID = ctx.Params(":jobid")
  524. if jobID != "" {
  525. tpl = tplGrampusTrainJobNPUVersionNew
  526. }
  527. if !jobNamePattern.MatchString(displayJobName) {
  528. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  529. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form)
  530. return
  531. }
  532. bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName)
  533. if err != nil || !bootFileExist {
  534. log.Error("Get bootfile error:", err, ctx.Data["MsgID"])
  535. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  536. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpl, &form)
  537. return
  538. }
  539. errStr := checkSpecialPool(ctx, "NPU")
  540. if errStr != "" {
  541. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  542. ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form)
  543. return
  544. }
  545. //check count limit
  546. count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.NPUResource)
  547. if err != nil {
  548. log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"])
  549. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  550. ctx.RenderWithErr("system error", tpl, &form)
  551. return
  552. } else {
  553. if count >= 1 {
  554. log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
  555. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  556. ctx.RenderWithErr("you have already a running or waiting task, can not create more", tpl, &form)
  557. return
  558. }
  559. }
  560. //check param
  561. if err := grampusParamCheckCreateTrainJob(form); err != nil {
  562. log.Error("paramCheckCreateTrainJob failed:(%v)", err)
  563. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  564. ctx.RenderWithErr(err.Error(), tpl, &form)
  565. return
  566. }
  567. //check whether the task name in the project is duplicated
  568. tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName)
  569. if err == nil {
  570. if len(tasks) != 0 {
  571. log.Error("the job name did already exist", ctx.Data["MsgID"])
  572. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  573. ctx.RenderWithErr("the job name did already exist", tpl, &form)
  574. return
  575. }
  576. } else {
  577. if !models.IsErrJobNotExist(err) {
  578. log.Error("system error, %v", err, ctx.Data["MsgID"])
  579. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  580. ctx.RenderWithErr("system error", tpl, &form)
  581. return
  582. }
  583. }
  584. //check specification
  585. spec, err := resource.GetAndCheckSpec(ctx.User.ID, form.SpecId, models.FindSpecsOptions{
  586. JobType: models.JobTypeTrain,
  587. ComputeResource: models.NPU,
  588. Cluster: models.C2NetCluster,
  589. })
  590. if err != nil || spec == nil {
  591. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  592. ctx.RenderWithErr("Resource specification not available", tpl, &form)
  593. return
  594. }
  595. //check dataset
  596. datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid, models.NPU)
  597. if err != nil {
  598. log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"])
  599. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  600. ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form)
  601. return
  602. }
  603. //prepare code and out path
  604. _, err = ioutil.ReadDir(codeLocalPath)
  605. if err == nil {
  606. os.RemoveAll(codeLocalPath)
  607. }
  608. if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
  609. log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err)
  610. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  611. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
  612. return
  613. }
  614. //todo: upload code (send to file_server todo this work?)
  615. if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
  616. log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err)
  617. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  618. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
  619. return
  620. }
  621. if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil {
  622. log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err)
  623. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  624. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
  625. return
  626. }
  627. var datasetRemotePath, allFileName string
  628. for _, datasetInfo := range datasetInfos {
  629. if datasetRemotePath == "" {
  630. datasetRemotePath = datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'"
  631. allFileName = datasetInfo.FullName
  632. } else {
  633. datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'"
  634. allFileName = allFileName + ";" + datasetInfo.FullName
  635. }
  636. }
  637. //prepare command
  638. preTrainModelPath := getPreTrainModelPath(form.PreTrainModelUrl, form.CkptName)
  639. command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, form.CkptName)
  640. if err != nil {
  641. log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
  642. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  643. ctx.RenderWithErr("Create task failed, internal error", tpl, &form)
  644. return
  645. }
  646. commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)
  647. req := &grampus.GenerateTrainJobReq{
  648. JobName: jobName,
  649. DisplayJobName: displayJobName,
  650. ComputeResource: models.NPUResource,
  651. ProcessType: grampus.ProcessorTypeNPU,
  652. Command: command,
  653. ImageId: form.ImageID,
  654. Description: description,
  655. CodeObsPath: codeObsPath,
  656. BootFileUrl: codeObsPath + bootFile,
  657. BootFile: bootFile,
  658. WorkServerNumber: form.WorkServerNumber,
  659. Uuid: uuid,
  660. CommitID: commitID,
  661. IsLatestVersion: isLatestVersion,
  662. BranchName: branchName,
  663. Params: form.Params,
  664. EngineName: engineName,
  665. VersionCount: versionCount,
  666. TotalVersionCount: modelarts.TotalVersionCount,
  667. DatasetNames: datasetNames,
  668. DatasetInfos: datasetInfos,
  669. Spec: spec,
  670. }
  671. if form.ModelName != "" { //使用预训练模型训练
  672. req.ModelName = form.ModelName
  673. req.LabelName = form.LabelName
  674. req.CkptName = form.CkptName
  675. req.ModelVersion = form.ModelVersion
  676. req.PreTrainModelUrl = form.PreTrainModelUrl
  677. }
  678. err = grampus.GenerateTrainJob(ctx, req)
  679. if err != nil {
  680. log.Error("GenerateTrainJob failed:%v", err.Error())
  681. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  682. ctx.RenderWithErr(err.Error(), tpl, &form)
  683. return
  684. }
  685. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job")
  686. }
  687. func GrampusStopJob(ctx *context.Context) {
  688. var ID = ctx.Params(":jobid")
  689. var resultCode = "0"
  690. var errorMsg = ""
  691. var status = ""
  692. task := ctx.Cloudbrain
  693. for {
  694. if task.Status == string(models.GrampusStatusStopped) || task.Status == string(models.GrampusStatusFailed) || task.Status == string(models.GrampusStatusSucceeded) {
  695. log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"])
  696. resultCode = "-1"
  697. errorMsg = "system error"
  698. break
  699. }
  700. res, err := grampus.StopJob(task.JobID)
  701. if err != nil {
  702. log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  703. resultCode = strconv.Itoa(res.ErrorCode)
  704. errorMsg = res.ErrorMsg
  705. break
  706. }
  707. oldStatus := task.Status
  708. task.Status = string(models.GrampusStatusStopped)
  709. if task.EndTime == 0 {
  710. task.EndTime = timeutil.TimeStampNow()
  711. }
  712. task.ComputeAndSetDuration()
  713. if oldStatus != task.Status {
  714. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  715. }
  716. err = models.UpdateJob(task)
  717. if err != nil {
  718. log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  719. resultCode = "-1"
  720. errorMsg = "system error"
  721. break
  722. }
  723. status = task.Status
  724. break
  725. }
  726. ctx.JSON(200, map[string]interface{}{
  727. "result_code": resultCode,
  728. "error_msg": errorMsg,
  729. "status": status,
  730. "id": ID,
  731. "StatusOK": 0,
  732. })
  733. }
  734. func GrampusTrainJobDel(ctx *context.Context) {
  735. var listType = ctx.Query("listType")
  736. if err := deleteGrampusJob(ctx); err != nil {
  737. log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"])
  738. ctx.ServerError(err.Error(), err)
  739. return
  740. }
  741. var isAdminPage = ctx.Query("isadminpage")
  742. var isHomePage = ctx.Query("ishomepage")
  743. if ctx.IsUserSiteAdmin() && isAdminPage == "true" {
  744. ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains")
  745. } else if isHomePage == "true" {
  746. ctx.Redirect(setting.AppSubURL + "/cloudbrains")
  747. } else {
  748. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType)
  749. }
  750. }
  751. func deleteGrampusJob(ctx *context.Context) error {
  752. task := ctx.Cloudbrain
  753. if task.Status != string(models.GrampusStatusStopped) && task.Status != string(models.GrampusStatusSucceeded) && task.Status != string(models.GrampusStatusFailed) {
  754. log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"])
  755. return errors.New("the job has not been stopped")
  756. }
  757. err := models.DeleteJob(task)
  758. if err != nil {
  759. log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"])
  760. return err
  761. }
  762. storageType := models.TypeCloudBrainOne
  763. if task.ComputeResource == models.NPUResource {
  764. storageType = models.TypeCloudBrainTwo
  765. }
  766. DeleteCloudbrainJobStorage(task.JobName, storageType)
  767. return nil
  768. }
  769. func GrampusTrainJobShow(ctx *context.Context) {
  770. ctx.Data["PageIsCloudBrain"] = true
  771. var task *models.Cloudbrain
  772. task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid"))
  773. if err != nil {
  774. log.Error("GetCloudbrainByJobID failed:" + err.Error())
  775. ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
  776. return
  777. }
  778. if task.DeletedAt.IsZero() { //normal record
  779. result, err := grampus.GetJob(task.JobID)
  780. if err != nil {
  781. log.Error("GetJob failed:" + err.Error())
  782. ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
  783. return
  784. }
  785. if result != nil {
  786. if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 {
  787. task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0]
  788. }
  789. oldStatus := task.Status
  790. task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status)
  791. if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning {
  792. task.Duration = result.JobInfo.RunSec
  793. task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)
  794. if task.StartTime == 0 && result.JobInfo.StartedAt > 0 {
  795. task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt)
  796. }
  797. if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 {
  798. task.EndTime = task.StartTime.Add(task.Duration)
  799. }
  800. task.CorrectCreateUnix()
  801. if oldStatus != task.Status {
  802. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  803. }
  804. err = models.UpdateJob(task)
  805. if err != nil {
  806. log.Error("UpdateJob failed:" + err.Error())
  807. }
  808. }
  809. }
  810. }
  811. if len(task.Parameters) > 0 {
  812. var parameters models.Parameters
  813. err := json.Unmarshal([]byte(task.Parameters), &parameters)
  814. if err != nil {
  815. log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err)
  816. ctx.ServerError("system error", err)
  817. return
  818. }
  819. if len(parameters.Parameter) > 0 {
  820. paramTemp := ""
  821. for _, Parameter := range parameters.Parameter {
  822. param := Parameter.Label + " = " + Parameter.Value + "; "
  823. paramTemp = paramTemp + param
  824. }
  825. task.Parameters = paramTemp[:len(paramTemp)-2]
  826. } else {
  827. task.Parameters = ""
  828. }
  829. }
  830. taskList := make([]*models.Cloudbrain, 0)
  831. taskList = append(taskList, task)
  832. prepareSpec4Show(ctx, task)
  833. ctx.Data["version_list_task"] = taskList
  834. ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false)
  835. ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task)
  836. ctx.Data["displayJobName"] = task.DisplayJobName
  837. aiCenterInfo := strings.Split(task.AiCenter, "+")
  838. if len(aiCenterInfo) == 2 {
  839. ctx.Data["ai_center"] = aiCenterInfo[1]
  840. }
  841. ctx.HTML(http.StatusOK, tplGrampusTrainJobShow)
  842. }
  843. func GrampusGetLog(ctx *context.Context) {
  844. jobID := ctx.Params(":jobid")
  845. job, err := models.GetCloudbrainByJobID(jobID)
  846. if err != nil {
  847. log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"])
  848. ctx.ServerError(err.Error(), err)
  849. return
  850. }
  851. content, err := grampus.GetTrainJobLog(job.JobID)
  852. if err != nil {
  853. log.Error("GetTrainJobLog failed: %v", err, ctx.Data["MsgID"])
  854. ctx.ServerError(err.Error(), err)
  855. return
  856. }
  857. ctx.JSON(http.StatusOK, map[string]interface{}{
  858. "JobName": job.JobName,
  859. "Content": content,
  860. })
  861. return
  862. }
  863. func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName string) (string, error) {
  864. var command string
  865. workDir := grampus.NpuWorkDir
  866. if processorType == grampus.ProcessorTypeGPU {
  867. workDir = grampus.GpuWorkDir
  868. }
  869. command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScript, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject)
  870. //download code & dataset
  871. if processorType == grampus.ProcessorTypeNPU {
  872. commandDownload := "./downloader_for_obs " + setting.Bucket + " " + codeRemotePath + " " + grampus.CodeArchiveName + " '" + dataRemotePath + "' '" + datasetName + "'"
  873. commandDownload = processPretrainModelParameter(pretrainModelPath, pretrainModelFileName, commandDownload)
  874. command += commandDownload
  875. } else if processorType == grampus.ProcessorTypeGPU {
  876. commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " '" + dataRemotePath + "' '" + datasetName + "'"
  877. commandDownload = processPretrainModelParameter(pretrainModelPath, pretrainModelFileName, commandDownload)
  878. command += commandDownload
  879. }
  880. //unzip code & dataset
  881. unZipDatasetCommand := generateDatasetUnzipCommand(datasetName)
  882. commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + unZipDatasetCommand
  883. command += commandUnzip
  884. command += "echo \"unzip finished;start to exec code;\";"
  885. // set export
  886. var commandExport string
  887. if processorType == grampus.ProcessorTypeNPU {
  888. commandExport = "export bucket=" + setting.Bucket + " && export remote_path=" + outputRemotePath + ";"
  889. } else if processorType == grampus.ProcessorTypeGPU {
  890. commandExport = "export env=" + setting.Grampus.Env + " && export remote_path=" + outputRemotePath + ";"
  891. }
  892. command += commandExport
  893. //exec code
  894. var parameters models.Parameters
  895. var paramCode string
  896. if len(paramSrc) != 0 {
  897. err := json.Unmarshal([]byte(paramSrc), &parameters)
  898. if err != nil {
  899. log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err)
  900. return command, err
  901. }
  902. for _, parameter := range parameters.Parameter {
  903. paramCode += " --" + parameter.Label + "=" + parameter.Value
  904. }
  905. }
  906. if pretrainModelFileName != "" {
  907. paramCode += " --pretrainmodelname" + "=" + pretrainModelFileName
  908. }
  909. var commandCode string
  910. if processorType == grampus.ProcessorTypeNPU {
  911. commandCode = "/bin/bash /home/work/run_train_for_openi.sh " + workDir + "code/" + strings.ToLower(repoName) + "/" + bootFile + " /tmp/log/train.log" + paramCode + ";"
  912. } else if processorType == grampus.ProcessorTypeGPU {
  913. commandCode = "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";"
  914. }
  915. command += commandCode
  916. //get exec result
  917. commandGetRes := "result=$?;"
  918. command += commandGetRes
  919. //upload models
  920. if processorType == grampus.ProcessorTypeNPU {
  921. commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_npu " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;"
  922. command += commandUpload
  923. } else if processorType == grampus.ProcessorTypeGPU {
  924. commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;"
  925. command += commandUpload
  926. }
  927. //check exec result
  928. commandCheckRes := "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1\""
  929. command += commandCheckRes
  930. return command, nil
  931. }
  932. func processPretrainModelParameter(pretrainModelPath string, pretrainModelFileName string, commandDownload string) string {
  933. commandDownloadTemp := commandDownload
  934. if pretrainModelPath != "" {
  935. commandDownloadTemp += " '" + pretrainModelPath + "' '" + pretrainModelFileName + "'"
  936. }
  937. commandDownloadTemp += ";"
  938. return commandDownloadTemp
  939. }
  940. func generateDatasetUnzipCommand(datasetName string) string {
  941. var unZipDatasetCommand string
  942. datasetNameArray := strings.Split(datasetName, ";")
  943. if len(datasetNameArray) == 1 { //单数据集
  944. unZipDatasetCommand = "unzip -q '" + datasetName + "';"
  945. if strings.HasSuffix(datasetName, ".tar.gz") {
  946. unZipDatasetCommand = "tar --strip-components=1 -zxvf '" + datasetName + "';"
  947. }
  948. } else { //多数据集
  949. for _, datasetNameTemp := range datasetNameArray {
  950. if strings.HasSuffix(datasetName, ".tar.gz") {
  951. unZipDatasetCommand = unZipDatasetCommand + "tar -zxvf '" + datasetName + "';"
  952. } else {
  953. unZipDatasetCommand = unZipDatasetCommand + "unzip -q '" + datasetNameTemp + "' -d './" + strings.TrimSuffix(datasetNameTemp, ".zip") + "';"
  954. }
  955. }
  956. }
  957. return unZipDatasetCommand
  958. }
  959. func downloadZipCode(ctx *context.Context, codePath, branchName string) error {
  960. archiveType := git.ZIP
  961. archivePath := codePath
  962. if !com.IsDir(archivePath) {
  963. if err := os.MkdirAll(archivePath, os.ModePerm); err != nil {
  964. log.Error("MkdirAll failed:" + err.Error())
  965. return err
  966. }
  967. }
  968. // Get corresponding commit.
  969. var (
  970. commit *git.Commit
  971. err error
  972. )
  973. gitRepo := ctx.Repo.GitRepo
  974. if err != nil {
  975. log.Error("OpenRepository failed:" + err.Error())
  976. return err
  977. }
  978. if gitRepo.IsBranchExist(branchName) {
  979. commit, err = gitRepo.GetBranchCommit(branchName)
  980. if err != nil {
  981. log.Error("GetBranchCommit failed:" + err.Error())
  982. return err
  983. }
  984. } else {
  985. log.Error("the branch is not exist: " + branchName)
  986. return fmt.Errorf("The branch does not exist.")
  987. }
  988. archivePath = path.Join(archivePath, grampus.CodeArchiveName)
  989. if !com.IsFile(archivePath) {
  990. if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{
  991. Format: archiveType,
  992. Prefix: setting.Repository.PrefixArchiveFiles,
  993. }); err != nil {
  994. log.Error("CreateArchive failed:" + err.Error())
  995. return err
  996. }
  997. }
  998. return nil
  999. }