From 5a34b5322116634799b2c0a22e725fb1d05838a6 Mon Sep 17 00:00:00 2001 From: zouap Date: Sun, 9 Oct 2022 10:12:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=99=BA=E7=AE=97GPU?= =?UTF-8?q?=E7=9A=84=E6=94=AF=E6=8C=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: zouap --- routers/repo/aisafety.go | 190 +++++++++++++++++++++++++++++++++++++ routers/repo/cloudbrain.go | 3 + 2 files changed, 193 insertions(+) diff --git a/routers/repo/aisafety.go b/routers/repo/aisafety.go index 5211e082b..245894e45 100644 --- a/routers/repo/aisafety.go +++ b/routers/repo/aisafety.go @@ -197,6 +197,8 @@ func syncAiSafetyTaskStatus(job *models.Cloudbrain) { queryTaskStatusFromCloudbrainTwo(job) } else if job.Type == models.TypeCloudBrainOne { queryTaskStatusFromCloudbrain(job) + } else if job.Type == models.TypeC2Net { + queryTaskStatusFromGrampus(job) } } else { if job.Status == string(models.ModelSafetyTesting) { @@ -222,6 +224,53 @@ func TimerHandleModelSafetyTestTask() { } } +func queryTaskStatusFromGrampus(task *models.Cloudbrain) { + if task.DeletedAt.IsZero() { //normal record + result, err := grampus.GetJob(task.JobID) + if err != nil { + log.Error("GetJob failed:" + err.Error()) + return + } + + if result != nil { + if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { + task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] + } + task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + if task.Status != models.GrampusStatusSucceeded { + if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { + task.Duration = result.JobInfo.RunSec + if task.Duration < 0 { + task.Duration = 0 + } + task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) + + if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { + task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) + } + if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { + task.EndTime = task.StartTime.Add(task.Duration) + } + task.CorrectCreateUnix() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob failed:" + err.Error()) + } + } + } else { + task.Status = string(models.ModelSafetyTesting) + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob failed:", err) + } + //send msg to beihang + sendGPUInferenceResultToTest(task) + } + } + } + +} + func queryTaskStatusFromCloudbrainTwo(job *models.Cloudbrain) { log.Info("The task not finished,name=" + job.DisplayJobName) result, err := modelarts.GetTrainJob(job.JobID, strconv.FormatInt(job.VersionID, 10)) @@ -623,7 +672,148 @@ func AiSafetyCreateForPost(ctx *context.Context) { } func createForGrampusGPU(ctx *context.Context, jobName string) { + BootFile := ctx.Query("boot_file") + displayJobName := ctx.Query("display_job_name") + description := ctx.Query("description") + image := strings.TrimSpace(ctx.Query("image")) + srcDataset := ctx.Query("src_dataset") //uuid + combatDataset := ctx.Query("combat_dataset") //uuid + evaluationIndex := ctx.Query("evaluationIndex") + Params := ctx.Query("run_para_list") + specId := ctx.QueryInt64("spec_id") + TrainUrl := ctx.Query("train_url") + CkptName := ctx.Query("ckpt_name") + ModelName := ctx.Query("ModelName") + ModelVersion := ctx.Query("ModelVersion") + repo := ctx.Repo.Repository + codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" + codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" + //check specification + spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{ + JobType: models.JobTypeTrain, + ComputeResource: models.GPU, + Cluster: models.C2NetCluster, + }) + if err != nil || spec == nil { + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("Resource specification not available", tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + + if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { + log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("points.insufficient_points_balance"), tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + //check dataset + uuid := srcDataset + ";" + combatDataset + datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid, models.GPU) + if err != nil { + log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + + //prepare code and out path + _, err = ioutil.ReadDir(codeLocalPath) + if err == nil { + os.RemoveAll(codeLocalPath) + } + + if err := downloadZipCode(ctx, codeLocalPath, cloudbrain.DefaultBranchName); err != nil { + log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + + //todo: upload code (send to file_server todo this work?) + //upload code + if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { + log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + + modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/" + if err := mkModelPath(modelPath); err != nil { + log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + + //init model readme + if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil { + log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + + var datasetRemotePath, allFileName string + for _, datasetInfo := range datasetInfos { + if datasetRemotePath == "" { + datasetRemotePath = datasetInfo.DataLocalPath + allFileName = datasetInfo.FullName + } else { + datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath + allFileName = allFileName + ";" + datasetInfo.FullName + } + + } + + //prepare command + preTrainModelPath := getPreTrainModelPath(TrainUrl, CkptName) + + command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, BootFile, Params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, CkptName) + if err != nil { + log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("Create task failed, internal error", tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } + + commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(cloudbrain.DefaultBranchName) + + req := &grampus.GenerateTrainJobReq{ + JobName: jobName, + DisplayJobName: displayJobName, + ComputeResource: models.GPUResource, + ProcessType: grampus.ProcessorTypeGPU, + Command: command, + ImageUrl: image, + Description: description, + BootFile: BootFile, + Uuid: uuid, + CommitID: commitID, + BranchName: cloudbrain.DefaultBranchName, + Params: Params, + EngineName: image, + DatasetNames: datasetNames, + DatasetInfos: datasetInfos, + + IsLatestVersion: modelarts.IsLatestVersion, + VersionCount: modelarts.VersionCountOne, + WorkServerNumber: 1, + Spec: spec, + ModelName: ModelName, + LabelName: evaluationIndex, + CkptName: CkptName, + ModelVersion: ModelVersion, + PreTrainModelUrl: TrainUrl, + } + err = grampus.GenerateTrainJob(ctx, req) + if err != nil { + log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"]) + GrampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(err.Error(), tplCloudBrainModelSafetyNewGrampusGpu, nil) + return + } } func createForGrampusNPU(ctx *context.Context, jobName string) { diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 50379fe08..b3069d00d 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -54,6 +54,9 @@ const ( tplCloudBrainModelSafetyNewGpu base.TplName = "repo/modelsafety/newgpu" tplCloudBrainModelSafetyNewNpu base.TplName = "repo/modelsafety/newnpu" + tplCloudBrainModelSafetyNewGrampusGpu base.TplName = "repo/modelsafety/newgrampusgpu" + tplCloudBrainModelSafetyNewGrampusNpu base.TplName = "repo/modelsafety/newgrampusnpu" + tplCloudBrainImageSubmit base.TplName = "repo/cloudbrain/image/submit" tplCloudBrainImageEdit base.TplName = "repo/cloudbrain/image/edit"