package repo import ( "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "os" "path" "strconv" "strings" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" ) const ( tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show" //GPU tplGrampusTrainJobGPUNew base.TplName = "repo/grampus/trainjob/gpu/new" //NPU tplGrampusTrainJobNPUNew base.TplName = "repo/grampus/trainjob/npu/new" ) func GrampusTrainJobGPUNew(ctx *context.Context) { err := grampusGpuNewDataPrepare(ctx) if err != nil { ctx.ServerError("get new train-job info failed", err) return } ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew) } func grampusGpuNewDataPrepare(ctx *context.Context) error { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["display_job_name"] = displayJobName //get valid images result, err := cloudbrain.GetImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range result.Payload.ImageInfo { if strings.HasPrefix(result.Payload.ImageInfo[i].Place, "192.168") { result.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { result.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["images"] = result.Payload.ImageInfo resultPublic, err := cloudbrain.GetPublicImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetPublicImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range resultPublic.Payload.ImageInfo { if strings.HasPrefix(resultPublic.Payload.ImageInfo[i].Place, "192.168") { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["public_images"] = resultPublic.Payload.ImageInfo //get valid dataset attachs, err := models.GetAllUserAttachments(ctx.User.ID) if err != nil { log.Error("GetAllUserAttachments failed: %v", err, ctx.Data["MsgID"]) return err } ctx.Data["attachments"] = attachs ctx.Data["command"] = cloudbrain.Command ctx.Data["code_path"] = cloudbrain.CodeMountPath ctx.Data["dataset_path"] = cloudbrain.DataSetMountPath ctx.Data["model_path"] = cloudbrain.ModelMountPath ctx.Data["benchmark_path"] = cloudbrain.BenchMarkMountPath ctx.Data["is_benchmark_enabled"] = setting.IsBenchmarkEnabled //get valid resource specs if categories == nil { json.Unmarshal([]byte(setting.BenchmarkCategory), &categories) } ctx.Data["benchmark_categories"] = categories.Category ctx.Data["benchmark_types"] = GetBenchmarkTypes(ctx).BenchmarkType if gpuInfos == nil { json.Unmarshal([]byte(setting.GpuTypes), &gpuInfos) } ctx.Data["gpu_types"] = gpuInfos.GpuInfo if trainGpuInfos == nil { json.Unmarshal([]byte(setting.TrainGpuTypes), &trainGpuInfos) } ctx.Data["train_gpu_types"] = trainGpuInfos.GpuInfo if benchmarkGpuInfos == nil { json.Unmarshal([]byte(setting.BenchmarkGpuTypes), &benchmarkGpuInfos) } ctx.Data["benchmark_gpu_types"] = benchmarkGpuInfos.GpuInfo if benchmarkResourceSpecs == nil { json.Unmarshal([]byte(setting.BenchmarkResourceSpecs), &benchmarkResourceSpecs) } ctx.Data["benchmark_resource_specs"] = benchmarkResourceSpecs.ResourceSpec if cloudbrain.ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs) } ctx.Data["resource_specs"] = cloudbrain.ResourceSpecs.ResourceSpec if cloudbrain.TrainResourceSpecs == nil { json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs) } ctx.Data["train_resource_specs"] = cloudbrain.TrainResourceSpecs.ResourceSpec branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0) if err != nil { log.Error("GetBranches error:", err) } ctx.Data["branches"] = branches ctx.Data["branchName"] = ctx.Repo.BranchName return nil } func GrampusTrainJobNPUNew(ctx *context.Context) { err := grampusTrainJobNpuNewDataPrepare(ctx) if err != nil { ctx.ServerError("get new train-job info failed", err) return } ctx.HTML(200, tplGrampusTrainJobNPUNew) } func grampusTrainJobNpuNewDataPrepare(ctx *context.Context) error { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var displayJobName = cutString(ctx.User.Name, 5) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["display_job_name"] = displayJobName //get valid dataset attachs, err := models.GetModelArtsTrainAttachments(ctx.User.ID) if err != nil { log.Error("GetModelArtsTrainAttachments failed:", err.Error()) } else { ctx.Data["attachments"] = attachs } //get valid engines images, err := grampus.GetImages(grampus.ProcessorTypeNPU) if err != nil { log.Error("GetResourceSpecs failed:", err.Error()) } else { ctx.Data["engine_versions"] = images.Infos } //get valid resource specs specs, err := grampus.GetResourceSpecs(grampus.ProcessorTypeNPU) if err != nil { log.Error("GetResourceSpecs failed:", err.Error()) } else { ctx.Data["flavor_infos"] = specs.Infos } //get branches branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0) if err != nil { log.Error("GetBranches error:", err.Error()) } else { ctx.Data["branches"] = branches } ctx.Data["branchName"] = ctx.Repo.BranchName return nil } func grampusParamCheckCreateTrainJob(form auth.CreateGrampusTrainJobForm) error { if !strings.HasSuffix(form.BootFile, ".py") { log.Error("the boot file(%s) must be a python file", form.BootFile) return errors.New("启动文件必须是python文件") } if form.BranchName == "" { log.Error("the branch must not be null!", form.BranchName) return errors.New("代码分支不能为空!") } return nil } func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount) displayJobName := form.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) uuid := form.Attachment description := form.Description bootFile := form.BootFile params := form.Params repo := ctx.Repo.Repository codeLocalPath := setting.JobPath + jobName + modelarts.CodePath codeObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.CodePath dataPath := "/" + setting.Bucket + "/" + setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + uuid + "/" branchName := form.BranchName isLatestVersion := modelarts.IsLatestVersion flavorName := form.FlavorName versionCount := modelarts.VersionCount engineName := form.EngineName //check count limit count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.NPUResource) if err != nil { log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobNPUNew, &form) return } } //check param if err := grampusParamCheckCreateTrainJob(form); err != nil { log.Error("paramCheckCreateTrainJob failed:(%v)", err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form) return } //check whether the task name in the project is duplicated tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobNPUNew, &form) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form) return } } //check dataset attachment, err := models.GetAttachmentByUUID(uuid) if err != nil { log.Error("GetAttachmentByUUID failed:", err.Error(), ctx.Data["MsgID"]) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("dataset is not exist", tplGrampusTrainJobNPUNew, &form) return } //prepare code and out path _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } if err := downloadCode(repo, codeLocalPath, branchName); err != nil { log.Error("downloadCode failed, server timed out: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Create task failed, server timed out", tplGrampusTrainJobNPUNew, &form) return } //todo: upload code (send to file_server todo this work?) if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath + VersionOutputPath + "/"); err != nil { log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Failed to obsMkdir_output", tplGrampusTrainJobNPUNew, &form) return } if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.LogPath + VersionOutputPath + "/"); err != nil { log.Error("Failed to obsMkdir_log: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Failed to obsMkdir_log", tplGrampusTrainJobNPUNew, &form) return } if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("Failed to uploadCodeToObs", tplGrampusTrainJobNPUNew, &form) return } //prepare command //todo: download code, download dataset, unzip dataset, exec code, upload model command, err := generateCommand(grampus.ProcessorTypeNPU, "obs:/"+codeObsPath, "obs:/"+dataPath, params, "", attachment.Name) log.Info(command) var parameters models.Parameters param := make([]models.Parameter, 0) if len(params) != 0 { err := json.Unmarshal([]byte(params), ¶meters) if err != nil { log.Error("Failed to Unmarshal params: %s (%v)", params, err) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr("运行参数错误", tplGrampusTrainJobNPUNew, &form) return } for _, parameter := range parameters.Parameter { param = append(param, models.Parameter{ Label: parameter.Label, Value: parameter.Value, }) } } param = append(param, models.Parameter{ Label: modelarts.DeviceTarget, Value: modelarts.Ascend, }) gitRepo, _ := git.OpenRepository(repo.RepoPath()) commitID, _ := gitRepo.GetBranchCommitID(branchName) req := &grampus.GenerateTrainJobReq{ JobName: jobName, DisplayJobName: displayJobName, ComputeResource: models.NPUResource, Command: command, ResourceSpecId: form.FlavorID, ImageUrl: "", ImageId: form.ImageID, DataUrl: dataPath, Description: description, CodeObsPath: codeObsPath, BootFileUrl: codeObsPath + bootFile, BootFile: bootFile, WorkServerNumber: form.WorkServerNumber, Uuid: uuid, CommitID: commitID, IsLatestVersion: isLatestVersion, BranchName: branchName, Params: form.Params, FlavorName: flavorName, EngineName: engineName, VersionCount: versionCount, TotalVersionCount: modelarts.TotalVersionCount, DatasetName: attachment.Name, } err = grampus.GenerateTrainJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error()) grampusTrainJobNpuNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") } func GrampusStopJob(ctx *context.Context) { var ID = ctx.Params(":jobid") var resultCode = "0" var errorMsg = "" var status = "" task := ctx.Cloudbrain for { if task.Status == string(models.GrampusStatusStopped) || task.Status == string(models.GrampusStatusFailed) || task.Status == string(models.GrampusStatusSucceeded) { log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } res, err := grampus.StopJob(task.JobID) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = strconv.Itoa(res.ErrorCode) errorMsg = res.ErrorMsg break } task.Status = string(models.GrampusStatusStopped) if task.EndTime == 0 { task.EndTime = timeutil.TimeStampNow() } task.ComputeAndSetDuration() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } status = task.Status break } ctx.JSON(200, map[string]interface{}{ "result_code": resultCode, "error_msg": errorMsg, "status": status, "id": ID, "StatusOK": 0, }) } func GrampusTrainJobDel(ctx *context.Context) { var listType = ctx.Query("listType") if err := deleteGrampusJob(ctx); err != nil { log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"]) ctx.ServerError(err.Error(), err) return } var isAdminPage = ctx.Query("isadminpage") var isHomePage = ctx.Query("ishomepage") if ctx.IsUserSiteAdmin() && isAdminPage == "true" { ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains") } else if isHomePage == "true" { ctx.Redirect(setting.AppSubURL + "/cloudbrains") } else { ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType) } } func deleteGrampusJob(ctx *context.Context) error { task := ctx.Cloudbrain if task.Status != string(models.GrampusStatusStopped) && task.Status != string(models.GrampusStatusSucceeded) && task.Status != string(models.GrampusStatusFailed) { log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) return errors.New("the job has not been stopped") } err := models.DeleteJob(task) if err != nil { log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"]) return err } storageType := models.TypeCloudBrainOne if task.ComputeResource == models.NPUResource { storageType = models.TypeCloudBrainTwo } deleteJobStorage(task.JobName, storageType) return nil } func GrampusTrainJobShow(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true var task *models.Cloudbrain task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid")) if err != nil { log.Error("GetCloudbrainByJobID failed:" + err.Error()) ctx.ServerError("system error", err) return } if task.DeletedAt.IsZero() { //normal record result, err := grampus.GetJob(task.JobID) if err != nil { log.Error("GetJob failed:" + err.Error()) ctx.ServerError("GetJob failed", err) return } if result != nil { task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { task.Duration = result.JobInfo.RunSec task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) } if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { task.EndTime = task.StartTime.Add(task.Duration) } task.CorrectCreateUnix() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob failed:" + err.Error()) } } } } if len(task.Parameters) > 0 { var parameters models.Parameters err := json.Unmarshal([]byte(task.Parameters), ¶meters) if err != nil { log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err) ctx.ServerError("system error", err) return } if len(parameters.Parameter) > 0 { paramTemp := "" for _, Parameter := range parameters.Parameter { param := Parameter.Label + " = " + Parameter.Value + "; " paramTemp = paramTemp + param } task.Parameters = paramTemp[:len(paramTemp)-2] } else { task.Parameters = "" } } taskList := make([]*models.Cloudbrain, 0) taskList = append(taskList, task) ctx.Data["version_list_task"] = taskList ctx.HTML(http.StatusOK, tplGrampusTrainJobShow) } func GrampusGetLog(ctx *context.Context) { jobID := ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } content, err := grampus.GetTrainJobLog(job.JobID) if err != nil { log.Error("GetJobLog failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } ctx.JSON(http.StatusOK, map[string]interface{}{ "JobName": job.JobName, "Content": content, }) return } func generateCommand(processorType, codeObsPath, dataObsPath, params, outputPath, datasetName string) (string, error) { var command string command += grampus.CommandPrepareScript //download code & dataset if processorType == grampus.ProcessorTypeNPU { commandDownload := "python " + grampus.ScriptSyncObsCodeAndDataset + " --access_key=" + setting.AccessKeyID + " --secret_key=" + setting.SecretAccessKey + " --project_id=" + setting.ProjectID + " --region_name=" + setting.Location + " --code_obs_dir=" + codeObsPath + " --data_obs_dir=" + dataObsPath + " --dataset_name=" + datasetName + ";" command += commandDownload } else if processorType == grampus.ProcessorTypeGPU { } //unzip dataset //exec code //upload models return command, nil } func generateCommandObsDownloadFile(srcObsFile, dstLocalDir string) (string, error) { var command string command = "python;" command += "from modelarts.session import Session \n" command += fmt.Sprintf("session = Session(access_key='%s',secret_key='%s', project_id='%s', region_name='%s') \n", setting.AccessKeyID, setting.SecretAccessKey, setting.ProjectID, setting.Location) if util.IsDir(srcObsFile) { command += fmt.Sprintf("session.obs.download_dir(src_obs_dir=\"%s\", dst_local_dir=\"%s\") \n", srcObsFile, dstLocalDir) } else { command += fmt.Sprintf("session.obs.download_file(src_obs_file=\"%s\", dst_local_dir=\"%s\") \n", srcObsFile, dstLocalDir) } return command, nil }