package repo import ( "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/obs" "code.gitea.io/gitea/modules/storage" "encoding/json" "errors" "github.com/unknwon/com" "io" "os" "path" "strconv" "strings" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" ) const ( tplModelArtsNotebookIndex base.TplName = "repo/modelarts/notebook/index" tplModelArtsNotebookNew base.TplName = "repo/modelarts/notebook/new" tplModelArtsNotebookShow base.TplName = "repo/modelarts/notebook/show" tplModelArtsTrainJobIndex base.TplName = "repo/modelarts/trainjob/index" tplModelArtsTrainJobNew base.TplName = "repo/modelarts/trainjob/new" tplModelArtsTrainJobShow base.TplName = "repo/modelarts/trainjob/show" ) // MustEnableDataset check if repository enable internal cb func MustEnableModelArts(ctx *context.Context) { if !ctx.Repo.CanRead(models.UnitTypeCloudBrain) { ctx.NotFound("MustEnableCloudbrain", nil) return } } func NotebookIndex(ctx *context.Context) { MustEnableModelArts(ctx) repo := ctx.Repo.Repository page := ctx.QueryInt("page") if page <= 0 { page = 1 } ciTasks, count, err := models.Cloudbrains(&models.CloudbrainsOptions{ ListOptions: models.ListOptions{ Page: page, PageSize: setting.UI.IssuePagingNum, }, RepoID: repo.ID, Type: models.TypeCloudBrainNotebook, }) if err != nil { ctx.ServerError("Cloudbrain", err) return } for i, task := range ciTasks { if task.Status == string(models.JobRunning) { ciTasks[i].CanDebug = true } else { ciTasks[i].CanDebug = false } } pager := context.NewPagination(int(count), setting.UI.IssuePagingNum, page, 5) pager.SetDefaultParams(ctx) ctx.Data["Page"] = pager ctx.Data["PageIsCloudBrain"] = true ctx.Data["Tasks"] = ciTasks ctx.HTML(200, tplModelArtsNotebookIndex) } func NotebookNew(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var jobName = cutString(ctx.User.Name, 5) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["job_name"] = jobName attachs, err := models.GetModelArtsUserAttachments(ctx.User.ID) if err != nil { ctx.ServerError("GetAllUserAttachments failed:", err) return } ctx.Data["attachments"] = attachs ctx.Data["dataset_path"] = modelarts.DataSetMountPath ctx.Data["env"] = modelarts.NotebookEnv ctx.Data["notebook_type"] = modelarts.NotebookType ctx.Data["flavor"] = modelarts.FlavorInfo ctx.HTML(200, tplModelArtsNotebookNew) } func NotebookCreate(ctx *context.Context, form auth.CreateModelArtsNotebookForm) { ctx.Data["PageIsCloudBrain"] = true jobName := form.JobName uuid := form.Attachment description := form.Description err := modelarts.GenerateTask(ctx, jobName, uuid, description) if err != nil { ctx.RenderWithErr(err.Error(), tplModelArtsNotebookNew, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/notebook") } func NotebookShow(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true var jobID = ctx.Params(":jobid") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.Data["error"] = err.Error() ctx.RenderWithErr(err.Error(), tplModelArtsNotebookIndex, nil) return } result, err := modelarts.GetJob(jobID) if err != nil { ctx.Data["error"] = err.Error() ctx.RenderWithErr(err.Error(), tplModelArtsNotebookIndex, nil) return } if result != nil { task.Status = result.Status err = models.UpdateJob(task) if err != nil { ctx.Data["error"] = err.Error() ctx.RenderWithErr(err.Error(), tplModelArtsNotebookIndex, nil) return } createTime, _ := com.StrTo(result.CreationTimestamp).Int64() result.CreateTime = time.Unix(int64(createTime/1000), 0).Format("2006-01-02 15:04:05") endTime, _ := com.StrTo(result.LatestUpdateTimestamp).Int64() result.LatestUpdateTime = time.Unix(int64(endTime/1000), 0).Format("2006-01-02 15:04:05") result.QueuingInfo.BeginTime = time.Unix(int64(result.QueuingInfo.BeginTimestamp/1000), 0).Format("2006-01-02 15:04:05") result.QueuingInfo.EndTime = time.Unix(int64(result.QueuingInfo.EndTimestamp/1000), 0).Format("2006-01-02 15:04:05") } ctx.Data["task"] = task ctx.Data["jobID"] = jobID ctx.Data["result"] = result ctx.HTML(200, tplModelArtsNotebookShow) } func NotebookDebug(ctx *context.Context) { var jobID = ctx.Params(":jobid") _, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } result, err := modelarts.GetJob(jobID) if err != nil { ctx.RenderWithErr(err.Error(), tplModelArtsNotebookIndex, nil) return } res, err := modelarts.GetJobToken(jobID) if err != nil { ctx.RenderWithErr(err.Error(), tplModelArtsNotebookIndex, nil) return } urls := strings.Split(result.Spec.Annotations.Url, "/") urlPrefix := result.Spec.Annotations.TargetDomain for i, url := range urls { if i > 2 { urlPrefix += "/" + url } } debugUrl := urlPrefix + "?token=" + res.Token ctx.Redirect(debugUrl) } func NotebookStop(ctx *context.Context) { var jobID = ctx.Params(":jobid") log.Info(jobID) task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } if task.Status != string(models.JobRunning) { log.Error("the job(%s) is not running", task.JobName) ctx.ServerError("the job is not running", errors.New("the job is not running")) return } param := models.NotebookAction{ Action: models.ActionStop, } res, err := modelarts.StopJob(jobID, param) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err.Error()) ctx.ServerError("StopJob failed", err) return } task.Status = res.CurrentStatus err = models.UpdateJob(task) if err != nil { ctx.ServerError("UpdateJob failed", err) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/notebook") } func NotebookDel(ctx *context.Context) { var jobID = ctx.Params(":jobid") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } if task.Status != string(models.JobStopped) { log.Error("the job(%s) has not been stopped", task.JobName) ctx.ServerError("the job has not been stopped", errors.New("the job has not been stopped")) return } _, err = modelarts.DelJob(jobID) if err != nil { log.Error("DelJob(%s) failed:%v", task.JobName, err.Error()) ctx.ServerError("DelJob failed", err) return } err = models.DeleteJob(task) if err != nil { ctx.ServerError("DeleteJob failed", err) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/notebook") } func TrainJobIndex(ctx *context.Context) { MustEnableModelArts(ctx) repo := ctx.Repo.Repository page := ctx.QueryInt("page") if page <= 0 { page = 1 } tasks, count, err := models.Cloudbrains(&models.CloudbrainsOptions{ ListOptions: models.ListOptions{ Page: page, PageSize: setting.UI.IssuePagingNum, }, RepoID: repo.ID, Type: models.TypeCloudBrainTrainJob, }) if err != nil { ctx.ServerError("Cloudbrain", err) return } pager := context.NewPagination(int(count), setting.UI.IssuePagingNum, page, 5) pager.SetDefaultParams(ctx) ctx.Data["Page"] = pager ctx.Data["PageIsCloudBrain"] = true ctx.Data["Tasks"] = tasks ctx.HTML(200, tplModelArtsTrainJobIndex) } func TrainJobNew(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var jobName = cutString(ctx.User.Name, 5) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["job_name"] = jobName attachs, err := models.GetModelArtsUserAttachments(ctx.User.ID) if err != nil { ctx.ServerError("GetAllUserAttachments failed:", err) return } ctx.Data["attachments"] = attachs var resourcePools modelarts.ResourcePool if err = json.Unmarshal([]byte(modelarts.ResourcePools), &resourcePools); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return } ctx.Data["resource_pools"] = resourcePools.Info var engines modelarts.Engine if err = json.Unmarshal([]byte(modelarts.Engines), &engines); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return } ctx.Data["engines"] = engines.Info var versionInfos modelarts.VersionInfo if err = json.Unmarshal([]byte(modelarts.EngineVersions), &versionInfos); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return } ctx.Data["engine_versions"] = versionInfos.Version var flavorInfos modelarts.Flavor if err = json.Unmarshal([]byte(modelarts.FlavorInfos), &flavorInfos); err != nil { ctx.ServerError("json.Unmarshal failed:", err) return } ctx.Data["flavor_infos"] = flavorInfos.Info ctx.HTML(200, tplModelArtsTrainJobNew) } func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) { ctx.Data["PageIsCloudBrain"] = true jobName := form.JobName uuid := form.Attachment description := form.Description workServerNumber := form.WorkServerNumber engineID := form.EngineID bootFile := form.BootFile flavorCode := form.Flavor poolID := form.PoolID isSaveParam := form.IsSaveParam repo := ctx.Repo.Repository codeLocalPath := setting.JobPath + jobName + modelarts.CodePath codeObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.CodePath outputObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.OutputPath logObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.LogPath dataPath := "/" + setting.Bucket + "/" + setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + "/" //param check if err := paramCheckCreateTrainJob(form); err != nil { log.Error("paramCheckCreateTrainJob failed:(%v)", err) ctx.RenderWithErr(err.Error(), tplModelArtsTrainJobNew, &form) return } if err := git.Clone(repo.RepoPath(), codeLocalPath, git.CloneRepoOptions{}); err != nil { log.Error("Failed to clone repository: %s (%v)", repo.FullName(), err) ctx.RenderWithErr("Failed to clone repository", tplModelArtsTrainJobNew, &form) return } //todo: upload code (send to file_server todo this work?) if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil { log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err) ctx.RenderWithErr("Failed to obsMkdir_output", tplModelArtsTrainJobNew, &form) return } if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.LogPath); err != nil { log.Error("Failed to obsMkdir_log: %s (%v)", repo.FullName(), err) ctx.RenderWithErr("Failed to obsMkdir_log", tplModelArtsTrainJobNew, &form) return } if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) ctx.RenderWithErr("Failed to uploadCodeToObs", tplModelArtsTrainJobNew, &form) return } if isSaveParam == "on" { if form.ParameterTemplateName == "" { log.Error("ParameterTemplateName is empty") ctx.RenderWithErr("保存作业参数时,作业参数名称不能为空", tplModelArtsTrainJobNew, &form) return } _, err := modelarts.CreateTrainJobConfig(models.CreateConfigParams{ ConfigName: form.ParameterTemplateName, Description: form.PrameterDescription, DataUrl: dataPath, AppUrl: codeObsPath, BootFileUrl: codeObsPath + bootFile, TrainUrl: outputObsPath, Flavor: models.Flavor{ Code: flavorCode, }, WorkServerNum: workServerNumber, EngineID: int64(engineID), LogUrl: logObsPath, PoolID: poolID, Parameter: []models.Parameter{ }, }) if err != nil { log.Error("Failed to CreateTrainJobConfig: %v", err) ctx.RenderWithErr("保存作业参数失败:" + err.Error(), tplModelArtsTrainJobNew, &form) return } } req := &modelarts.GenerateTrainJobReq{ JobName: jobName, DataUrl: dataPath, Description: description, CodeObsPath: codeObsPath, BootFile: codeObsPath + bootFile, TrainUrl: outputObsPath, FlavorCode: flavorCode, WorkServerNumber: workServerNumber, EngineID: int64(engineID), LogUrl: logObsPath, PoolID: poolID, } err := modelarts.GenerateTrainJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error()) ctx.RenderWithErr(err.Error(), tplModelArtsTrainJobNew, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") } // readDir reads the directory named by dirname and returns // a list of directory entries sorted by filename. func readDir(dirname string) ([]os.FileInfo, error) { f, err := os.Open(dirname) if err != nil { return nil, err } list, err := f.Readdir(100) f.Close() if err != nil { //todo: can not upload empty folder if err == io.EOF { return nil, nil } return nil, err } //sort.Slice(list, func(i, j int) bool { return list[i].Name() < list[j].Name() }) return list, nil } func uploadCodeToObs(codePath, jobName, parentDir string) error { files, err := readDir(codePath) if err != nil { log.Error("readDir(%s) failed: %s", codePath, err.Error()) return err } for _, file := range files { if file.IsDir() { input := &obs.PutObjectInput{} input.Bucket = setting.Bucket input.Key = parentDir + file.Name() + "/" _, err = storage.ObsCli.PutObject(input) if err != nil { log.Error("PutObject(%s) failed: %s", input.Key, err.Error()) return err } if err = uploadCodeToObs(codePath + file.Name() + "/", jobName, parentDir + file.Name() + "/"); err != nil { log.Error("uploadCodeToObs(%s) failed: %s", file.Name(), err.Error()) return err } } else { input := &obs.PutFileInput{} input.Bucket = setting.Bucket input.Key = setting.CodePathPrefix + jobName + "/code/" + parentDir + file.Name() input.SourceFile = codePath + file.Name() _, err = storage.ObsCli.PutFile(input) if err != nil { log.Error("PutFile(%s) failed: %s", input.SourceFile, err.Error()) return err } } } return nil } func obsMkdir(dir string) error { input := &obs.PutObjectInput{} input.Bucket = setting.Bucket input.Key = dir _, err := storage.ObsCli.PutObject(input) if err != nil { log.Error("PutObject(%s) failed: %s", input.Key, err.Error()) return err } return nil } func paramCheckCreateTrainJob(form auth.CreateModelArtsTrainJobForm) error { if !strings.HasSuffix(form.BootFile, ".py") { log.Error("the boot file(%s) must be a python file", form.BootFile) return errors.New("启动文件必须是python文件") } if form.WorkServerNumber > 25 || form.WorkServerNumber < 1{ log.Error("the WorkServerNumber(%d) must be in (1,25)", form.WorkServerNumber) return errors.New("计算节点数必须在1-25之间") } return nil }