From 54081e35579f91f930622f090c69973fc5725fe8 Mon Sep 17 00:00:00 2001 From: liuzx Date: Wed, 24 Aug 2022 11:31:21 +0800 Subject: [PATCH] add rediskey stop same task --- routers/repo/cloudbrain.go | 54 ++++++++++++++++++++++---------------- routers/repo/grampus.go | 23 ++++++++++++++++ routers/repo/modelarts.go | 47 ++++++++++++++++++++++++++++----- 3 files changed, 95 insertions(+), 29 deletions(-) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 119e77348..0226c1414 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -30,8 +30,8 @@ import ( "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" + "code.gitea.io/gitea/modules/redis/redis_client" "code.gitea.io/gitea/modules/redis/redis_key" - "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/util" @@ -100,6 +100,9 @@ func jobNamePrefixValid(s string) string { } func cloudBrainNewDataPrepare(ctx *context.Context) error { + var taskJobNameKey = ctx.Query("taskJobNameKey") + redis_client.Del(taskJobNameKey) + ctx.Data["PageIsCloudBrain"] = true t := time.Now() var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] @@ -249,21 +252,20 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { tpl = tplCloudBrainTrainJobNew } - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) - // isOk, err := lock.Lock(5 * time.Second) - // if !isOk { - // log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) - // // cloudBrainNewDataPrepare(ctx) - // // ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form) - // // return - // } + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form) + return + } tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr("the job name did already exist", tpl, &form) return } @@ -271,7 +273,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr("system error", tpl, &form) return } @@ -279,7 +280,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if !jobNamePattern.MatchString(displayJobName) { cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) return } @@ -287,7 +287,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if jobType != string(models.JobTypeBenchmark) && jobType != string(models.JobTypeDebug) && jobType != string(models.JobTypeTrain) { log.Error("jobtype error:", jobType, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr("jobtype error", tpl, &form) return } @@ -296,14 +295,12 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if err != nil { log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr("system error", tpl, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(ctx.Tr("repo.cloudbrain.morethanonejob"), tpl, &form) return } @@ -316,7 +313,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if err != nil { log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form) return } @@ -328,7 +324,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if err != nil || !bootFileExist { log.Error("Get bootfile error:", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpl, &form) return } @@ -337,7 +332,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if err != nil { log.Error("getTrainJobCommand failed: %v", err) cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(err.Error(), tpl, &form) return } @@ -349,7 +343,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if errStr != "" { cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(errStr, tpl, &form) return } @@ -360,7 +353,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { errStr = loadCodeAndMakeModelPath(repo, codePath, branchName, jobName, cloudbrain.ModelMountPath) if errStr != "" { cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(ctx.Tr(errStr), tpl, &form) return } @@ -397,7 +389,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { err = cloudbrain.GenerateTask(req) if err != nil { cloudBrainNewDataPrepare(ctx) - defer lock.UnLock() ctx.RenderWithErr(err.Error(), tpl, &form) return } @@ -446,10 +437,19 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra bootFile := strings.TrimSpace(form.BootFile) labelName := form.LabelName repo := ctx.Repo.Repository + tpl := tplCloudBrainInferenceJobNew + + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form) + return + } ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName log.Info("ckpt url:" + ckptUrl) - tpl := tplCloudBrainInferenceJobNew command, err := getInferenceJobCommand(form) if err != nil { log.Error("getTrainJobCommand failed: %v", err) @@ -2274,12 +2274,20 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo resourceSpecId := cloudbrain.BenchMarkResourceID benchmarkTypeID := form.BenchmarkTypeID benchmarkChildTypeID := form.BenchmarkChildTypeID + repo := ctx.Repo.Repository ctx.Data["description"] = form.Description ctx.Data["benchmarkTypeID"] = benchmarkTypeID ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID - repo := ctx.Repo.Repository + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeBenchmark), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tplCloudBrainBenchmarkNew, &form) + return + } tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeBenchmark), displayJobName) if err == nil { diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index cdde7596c..3342f6ce1 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -17,6 +17,8 @@ import ( "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/notification" + "code.gitea.io/gitea/modules/redis/redis_client" + "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "github.com/unknwon/com" @@ -61,6 +63,9 @@ func GrampusTrainJobNPUNew(ctx *context.Context) { } func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error { + var taskJobNameKey = ctx.Query("taskJobNameKey") + redis_client.Del(taskJobNameKey) + ctx.Data["PageIsCloudBrain"] = true t := time.Now() @@ -209,6 +214,15 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain flavorName := form.FlavorName image := strings.TrimSpace(form.Image) + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tplGrampusTrainJobGPUNew, &form) + return + } + if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form) @@ -401,6 +415,15 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain versionCount := modelarts.VersionCountOne engineName := form.EngineName + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tplGrampusTrainJobNPUNew, &form) + return + } + if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form) diff --git a/routers/repo/modelarts.go b/routers/repo/modelarts.go index b98772775..8c69851db 100755 --- a/routers/repo/modelarts.go +++ b/routers/repo/modelarts.go @@ -125,6 +125,9 @@ func NotebookNew(ctx *context.Context) { } func notebookNewDataPrepare(ctx *context.Context) error { + var taskJobNameKey = ctx.Query("taskJobNameKey") + redis_client.Del(taskJobNameKey) + ctx.Data["PageIsCloudBrain"] = true t := time.Now() var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] @@ -210,6 +213,15 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm imageId := form.ImageId repo := ctx.Repo.Repository + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsNotebookNew, &form) + return + } + count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID) if err != nil { log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"]) @@ -786,8 +798,8 @@ func setSpecBySpecialPoolConfig(ctx *context.Context, jobType string) { } func trainJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) error { - var taskJobnameKey = ctx.Query("taskJobnameKey") - redis_client.Del(taskJobnameKey) + var taskJobNameKey = ctx.Query("taskJobNameKey") + redis_client.Del(taskJobNameKey) ctx.Data["PageIsCloudBrain"] = true //can, err := canUserCreateTrainJob(ctx.User.ID) @@ -976,6 +988,9 @@ func trainJobNewVersionDataPrepare(ctx *context.Context) error { } func versionErrorDataPrepare(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) error { + var taskJobNameKey = ctx.Query("taskJobNameKey") + redis_client.Del(taskJobNameKey) + ctx.Data["PageIsCloudBrain"] = true var jobID = ctx.Params(":jobid") // var versionName = ctx.Params(":version-name") @@ -1094,12 +1109,11 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) VersionCount := modelarts.VersionCountOne EngineName := form.EngineName - taskJobnameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName) - isOk, err := redis_client.Setnx(taskJobnameKey, "", 5*time.Second) - ctx.Data["taskJobnameKey"] = taskJobnameKey + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey if !isOk { log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) - // trainJobErrorNewDataPrepare(ctx, form) ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsTrainJobIndex, &form) return } @@ -1417,6 +1431,15 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ EngineName := form.EngineName isLatestVersion := modelarts.IsLatestVersion + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsTrainJobVersionNew, &form) + return + } + canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID) if !canNewJob { versionErrorDataPrepare(ctx, form) @@ -2000,6 +2023,15 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference ckptUrl := "/" + form.TrainUrl + form.CkptName log.Info("ckpt url:" + ckptUrl) + taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName) + isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second) + ctx.Data["taskJobNameKey"] = taskJobNameKey + if !isOk { + log.Error("The task have been processed:%v", err, ctx.Data["MsgID"]) + ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsInferenceJobNew, &form) + return + } + count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID) if err != nil { log.Error("GetCloudbrainInferenceJobCountByUserID failed:%v", err, ctx.Data["MsgID"]) @@ -2378,6 +2410,9 @@ func inferenceJobNewDataPrepare(ctx *context.Context) error { } func inferenceJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModelArtsInferenceJobForm) error { + var taskJobNameKey = ctx.Query("taskJobNameKey") + redis_client.Del(taskJobNameKey) + ctx.Data["PageIsCloudBrain"] = true t := time.Now()