package repo import ( "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "encoding/json" "errors" "io/ioutil" "net/http" "os" "path" "strconv" "strings" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" ) const ( tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show" //GPU tplGrampusTrainJobGPUNew base.TplName = "repo/grampus/trainjob/gpu/new" //NPU tplGrampusTrainJobNPUNew base.TplName = "repo/grampus/trainjob/npu/new" ) func GrampusTrainJobGPUNew(ctx *context.Context) { err := grampusGpuNewDataPrepare(ctx) if err != nil { ctx.ServerError("get new train-job info failed", err) return } ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew) } func grampusGpuNewDataPrepare(ctx *context.Context) error { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["display_job_name"] = displayJobName //get valid images result, err := cloudbrain.GetImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range result.Payload.ImageInfo { if strings.HasPrefix(result.Payload.ImageInfo[i].Place, "192.168") { result.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { result.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["images"] = result.Payload.ImageInfo resultPublic, err := cloudbrain.GetPublicImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetPublicImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range resultPublic.Payload.ImageInfo { if strings.HasPrefix(resultPublic.Payload.ImageInfo[i].Place, "192.168") { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["public_images"] = resultPublic.Payload.ImageInfo //get valid dataset attachs, err := models.GetAllUserAttachments(ctx.User.ID) if err != nil { log.Error("GetAllUserAttachments failed: %v", err, ctx.Data["MsgID"]) return err } ctx.Data["attachments"] = attachs ctx.Data["command"] = cloudbrain.Command ctx.Data["code_path"] = cloudbrain.CodeMountPath ctx.Data["dataset_path"] = cloudbrain.DataSetMountPath ctx.Data["model_path"] = cloudbrain.ModelMountPath ctx.Data["benchmark_path"] = cloudbrain.BenchMarkMountPath ctx.Data["is_benchmark_enabled"] = setting.IsBenchmarkEnabled //get valid resource specs if categories == nil { json.Unmarshal([]byte(setting.BenchmarkCategory), &categories) } ctx.Data["benchmark_categories"] = categories.Category ctx.Data["benchmark_types"] = GetBenchmarkTypes(ctx).BenchmarkType if gpuInfos == nil { json.Unmarshal([]byte(setting.GpuTypes), &gpuInfos) } ctx.Data["gpu_types"] = gpuInfos.GpuInfo if trainGpuInfos == nil { json.Unmarshal([]byte(setting.TrainGpuTypes), &trainGpuInfos) } ctx.Data["train_gpu_types"] = trainGpuInfos.GpuInfo if benchmarkGpuInfos == nil { json.Unmarshal([]byte(setting.BenchmarkGpuTypes), &benchmarkGpuInfos) } ctx.Data["benchmark_gpu_types"] = benchmarkGpuInfos.GpuInfo if benchmarkResourceSpecs == nil { json.Unmarshal([]byte(setting.BenchmarkResourceSpecs), &benchmarkResourceSpecs) } ctx.Data["benchmark_resource_specs"] = benchmarkResourceSpecs.ResourceSpec if cloudbrain.ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs) } ctx.Data["resource_specs"] = cloudbrain.ResourceSpecs.ResourceSpec if cloudbrain.TrainResourceSpecs == nil { json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs) } ctx.Data["train_resource_specs"] = cloudbrain.TrainResourceSpecs.ResourceSpec ctx.Data["params"] = "" ctx.Data["branchName"] = ctx.Repo.BranchName ctx.Data["snn4imagenet_path"] = cloudbrain.Snn4imagenetMountPath ctx.Data["is_snn4imagenet_enabled"] = setting.IsSnn4imagenetEnabled ctx.Data["brainscore_path"] = cloudbrain.BrainScoreMountPath ctx.Data["is_brainscore_enabled"] = setting.IsBrainScoreEnabled ctx.Data["cloudbraintype"] = models.TypeCloudBrainOne ctx.Data["benchmarkMode"] = ctx.Query("benchmarkMode") return nil } func GrampusTrainJobNPUNew(ctx *context.Context) { err := grampusTrainJobNpuNewDataPrepare(ctx) if err != nil { ctx.ServerError("get new train-job info failed", err) return } ctx.HTML(200, tplGrampusTrainJobNPUNew) } func grampusTrainJobNpuNewDataPrepare(ctx *context.Context) error { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var displayJobName = cutString(ctx.User.Name, 5) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["display_job_name"] = displayJobName //get valid dataset attachs, err := models.GetModelArtsTrainAttachments(ctx.User.ID) if err != nil { ctx.ServerError("GetAllUserAttachments failed:", err) return err } ctx.Data["attachments"] = attachs //get valid resource specs var resourcePools modelarts.ResourcePool if err = json.Unmarshal([]byte(setting.ResourcePools), &resourcePools); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return err } ctx.Data["resource_pools"] = resourcePools.Info var engines modelarts.Engine if err = json.Unmarshal([]byte(setting.Engines), &engines); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return err } ctx.Data["engines"] = engines.Info var versionInfos modelarts.VersionInfo if err = json.Unmarshal([]byte(setting.EngineVersions), &versionInfos); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return err } ctx.Data["engine_versions"] = versionInfos.Version var flavorInfos modelarts.Flavor if err = json.Unmarshal([]byte(setting.TrainJobFLAVORINFOS), &flavorInfos); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return err } ctx.Data["flavor_infos"] = flavorInfos.Info ctx.Data["params"] = "" ctx.Data["branchName"] = ctx.Repo.BranchName configList, err := getConfigList(modelarts.PerPage, 1, modelarts.SortByCreateTime, "desc", "", modelarts.ConfigTypeCustom) if err != nil { ctx.ServerError("getConfigList failed:", err) return err } ctx.Data["config_list"] = configList.ParaConfigs ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo return nil } func grampusParamCheckCreateTrainJob(form auth.CreateGrampusTrainJobForm) error { if !strings.HasSuffix(form.BootFile, ".py") { log.Error("the boot file(%s) must be a python file", form.BootFile) return errors.New("启动文件必须是python文件") } if form.BranchName == "" { log.Error("the branch must not be null!", form.BranchName) return errors.New("代码分支不能为空!") } return nil } func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount) displayJobName := form.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) uuid := form.Attachment description := form.Description bootFile := form.BootFile params := form.Params repo := ctx.Repo.Repository codeLocalPath := setting.JobPath + jobName + modelarts.CodePath codeObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.CodePath dataPath := "/" + setting.Bucket + "/" + setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + uuid + "/" branchName := form.BranchName isLatestVersion := modelarts.IsLatestVersion FlavorName := form.FlavorName VersionCount := modelarts.VersionCount EngineName := form.EngineName count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.NPUResource) if err != nil { log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobNPUNew, &form) return } } if err := grampusParamCheckCreateTrainJob(form); err != nil { log.Error("paramCheckCreateTrainJob failed:(%v)", err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form) return } //check whether the task name in the project is duplicated tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobNPUNew, &form) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form) return } } //prepare code and out path _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } gitRepo, _ := git.OpenRepository(repo.RepoPath()) commitID, _ := gitRepo.GetBranchCommitID(branchName) if err := downloadCode(repo, codeLocalPath, branchName); err != nil { log.Error("downloadCode failed, server timed out: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Create task failed, server timed out", tplGrampusTrainJobNPUNew, &form) return } //todo: upload code (send to file_server todo this work?) if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath + VersionOutputPath + "/"); err != nil { log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Failed to obsMkdir_output", tplGrampusTrainJobNPUNew, &form) return } if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.LogPath + VersionOutputPath + "/"); err != nil { log.Error("Failed to obsMkdir_log: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Failed to obsMkdir_log", tplGrampusTrainJobNPUNew, &form) return } if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { // if err := uploadCodeToObs(codeLocalPath, jobName, parentDir); err != nil { log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Failed to uploadCodeToObs", tplGrampusTrainJobNPUNew, &form) return } //prepare command //todo: download code, download dataset, unzip dataset, exec code, upload model var parameters models.Parameters param := make([]models.Parameter, 0) existDeviceTarget := false if len(params) != 0 { err := json.Unmarshal([]byte(params), ¶meters) if err != nil { log.Error("Failed to Unmarshal params: %s (%v)", params, err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("运行参数错误", tplGrampusTrainJobNPUNew, &form) return } for _, parameter := range parameters.Parameter { if parameter.Label == modelarts.DeviceTarget { existDeviceTarget = true } if parameter.Label != modelarts.TrainUrl && parameter.Label != modelarts.DataUrl { param = append(param, models.Parameter{ Label: parameter.Label, Value: parameter.Value, }) } } } if !existDeviceTarget { param = append(param, models.Parameter{ Label: modelarts.DeviceTarget, Value: modelarts.Ascend, }) } req := &grampus.GenerateTrainJobReq{ JobName: jobName, DisplayJobName: displayJobName, ComputeResource: models.NPUResource, Command: "echo test", ResourceSpecId: "f2497d54732b45fb8d887e63be1db4a7", ImageUrl: "", ImageId: "e6e85cd78ca24e158f71b6fac9c2fb95", DataUrl: dataPath, Description: description, CodeObsPath: codeObsPath, BootFileUrl: codeObsPath + bootFile, BootFile: bootFile, //TrainUrl: outputObsPath, //FlavorCode: flavorCode, WorkServerNumber: 1, //EngineID: int64(engineID), //LogUrl: logObsPath, //PoolID: poolID, Uuid: uuid, //Parameters: param, CommitID: commitID, IsLatestVersion: isLatestVersion, BranchName: branchName, Params: form.Params, FlavorName: FlavorName, EngineName: EngineName, VersionCount: VersionCount, TotalVersionCount: modelarts.TotalVersionCount, } //将params转换Parameters.Parameter,出错时返回给前端 var Parameters modelarts.Parameters if err := json.Unmarshal([]byte(params), &Parameters); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return } err = grampus.GenerateTrainJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error()) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") } func GrampusStopJob(ctx *context.Context) { var ID = ctx.Params(":jobid") var resultCode = "0" var errorMsg = "" var status = "" task := ctx.Cloudbrain for { if task.Status == string(models.GrampusStatusStopped) || task.Status == string(models.GrampusStatusFailed) || task.Status == string(models.GrampusStatusSucceeded) { log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } res, err := grampus.StopJob(task.JobID) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = strconv.Itoa(res.ErrorCode) errorMsg = res.ErrorMsg break } task.Status = string(models.GrampusStatusStopped) if task.EndTime == 0 { task.EndTime = timeutil.TimeStampNow() } task.ComputeAndSetDuration() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } status = task.Status break } ctx.JSON(200, map[string]interface{}{ "result_code": resultCode, "error_msg": errorMsg, "status": status, "id": ID, "StatusOK": 0, }) } func GrampusTrainJobDel(ctx *context.Context) { var listType = ctx.Query("listType") if err := deleteGrampusJob(ctx); err != nil { log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"]) ctx.ServerError(err.Error(), err) return } var isAdminPage = ctx.Query("isadminpage") var isHomePage = ctx.Query("ishomepage") if ctx.IsUserSiteAdmin() && isAdminPage == "true" { ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains") } else if isHomePage == "true" { ctx.Redirect(setting.AppSubURL + "/cloudbrains") } else { ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType) } } func deleteGrampusJob(ctx *context.Context) error { task := ctx.Cloudbrain if task.Status != string(models.GrampusStatusStopped) && task.Status != string(models.GrampusStatusSucceeded) && task.Status != string(models.GrampusStatusFailed) { log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) return errors.New("the job has not been stopped") } err := models.DeleteJob(task) if err != nil { log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"]) return err } storageType := models.TypeCloudBrainOne if task.ComputeResource == models.NPUResource { storageType = models.TypeCloudBrainTwo } deleteJobStorage(task.JobName, storageType) return nil } func GrampusTrainJobShow(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true var task *models.Cloudbrain task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid")) if err != nil { log.Error("GetCloudbrainByJobID failed:" + err.Error()) ctx.ServerError("system error", err) return } attachment, err := models.GetAttachmentByUUID(task.Uuid) if err == nil { task.DatasetName = attachment.Name } if len(task.Parameters) > 0 { var parameters models.Parameters err := json.Unmarshal([]byte(task.Parameters), ¶meters) if err != nil { log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err) ctx.ServerError("system error", err) return } if len(parameters.Parameter) > 0 { paramTemp := "" for _, Parameter := range parameters.Parameter { param := Parameter.Label + " = " + Parameter.Value + "; " paramTemp = paramTemp + param } task.Parameters = paramTemp[:len(paramTemp)-2] } } if task.DeletedAt.IsZero() { //normal record result, err := grampus.GetJob(task.JobID) if err != nil { log.Error("GetJob failed:" + err.Error()) ctx.ServerError("GetJob failed", err) return } if result != nil { task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { task.Duration = result.JobInfo.RunSec task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) } if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { task.EndTime = task.StartTime.Add(task.Duration) } task.CorrectCreateUnix() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob failed:" + err.Error()) } } } } taskList := make([]*models.Cloudbrain, 0) taskList = append(taskList, task) ctx.Data["version_list_task"] = taskList ctx.HTML(http.StatusOK, tplGrampusTrainJobShow) } func GrampusGetLog(ctx *context.Context) { jobID := ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } content, err := grampus.GetTrainJobLog(job.JobID) if err != nil { log.Error("GetJobLog failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } ctx.JSON(http.StatusOK, map[string]interface{}{ "JobName": job.JobName, "Content": content, }) return }