Browse Source

add rediskey stop same task

tags/v1.22.9.2^2
liuzx 3 years ago
parent
commit
54081e3557
3 changed files with 95 additions and 29 deletions
  1. +31
    -23
      routers/repo/cloudbrain.go
  2. +23
    -0
      routers/repo/grampus.go
  3. +41
    -6
      routers/repo/modelarts.go

+ 31
- 23
routers/repo/cloudbrain.go View File

@@ -30,8 +30,8 @@ import (
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util"
@@ -100,6 +100,9 @@ func jobNamePrefixValid(s string) string {
}

func cloudBrainNewDataPrepare(ctx *context.Context) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true
t := time.Now()
var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
@@ -249,21 +252,20 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
tpl = tplCloudBrainTrainJobNew
}

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
// isOk, err := lock.Lock(5 * time.Second)
// if !isOk {
// log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
// // cloudBrainNewDataPrepare(ctx)
// // ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form)
// // return
// }
taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form)
return
}

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {
if len(tasks) != 0 {
log.Error("the job name did already exist", ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr("the job name did already exist", tpl, &form)
return
}
@@ -271,7 +273,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if !models.IsErrJobNotExist(err) {
log.Error("system error, %v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr("system error", tpl, &form)
return
}
@@ -279,7 +280,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {

if !jobNamePattern.MatchString(displayJobName) {
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form)
return
}
@@ -287,7 +287,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if jobType != string(models.JobTypeBenchmark) && jobType != string(models.JobTypeDebug) && jobType != string(models.JobTypeTrain) {
log.Error("jobtype error:", jobType, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr("jobtype error", tpl, &form)
return
}
@@ -296,14 +295,12 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if err != nil {
log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr("system error", tpl, &form)
return
} else {
if count >= 1 {
log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain.morethanonejob"), tpl, &form)
return
}
@@ -316,7 +313,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if err != nil {
log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form)
return
}
@@ -328,7 +324,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if err != nil || !bootFileExist {
log.Error("Get bootfile error:", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpl, &form)
return
}
@@ -337,7 +332,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if err != nil {
log.Error("getTrainJobCommand failed: %v", err)
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(err.Error(), tpl, &form)
return
}
@@ -349,7 +343,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {

if errStr != "" {
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(errStr, tpl, &form)
return
}
@@ -360,7 +353,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
errStr = loadCodeAndMakeModelPath(repo, codePath, branchName, jobName, cloudbrain.ModelMountPath)
if errStr != "" {
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(ctx.Tr(errStr), tpl, &form)
return
}
@@ -397,7 +389,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
err = cloudbrain.GenerateTask(req)
if err != nil {
cloudBrainNewDataPrepare(ctx)
defer lock.UnLock()
ctx.RenderWithErr(err.Error(), tpl, &form)
return
}
@@ -446,10 +437,19 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra
bootFile := strings.TrimSpace(form.BootFile)
labelName := form.LabelName
repo := ctx.Repo.Repository
tpl := tplCloudBrainInferenceJobNew

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form)
return
}

ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName
log.Info("ckpt url:" + ckptUrl)
tpl := tplCloudBrainInferenceJobNew
command, err := getInferenceJobCommand(form)
if err != nil {
log.Error("getTrainJobCommand failed: %v", err)
@@ -2274,12 +2274,20 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo
resourceSpecId := cloudbrain.BenchMarkResourceID
benchmarkTypeID := form.BenchmarkTypeID
benchmarkChildTypeID := form.BenchmarkChildTypeID
repo := ctx.Repo.Repository

ctx.Data["description"] = form.Description
ctx.Data["benchmarkTypeID"] = benchmarkTypeID
ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID

repo := ctx.Repo.Repository
taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeBenchmark), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplCloudBrainBenchmarkNew, &form)
return
}

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeBenchmark), displayJobName)
if err == nil {


+ 23
- 0
routers/repo/grampus.go View File

@@ -17,6 +17,8 @@ import (
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
"github.com/unknwon/com"
@@ -61,6 +63,9 @@ func GrampusTrainJobNPUNew(ctx *context.Context) {
}

func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true

t := time.Now()
@@ -209,6 +214,15 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
flavorName := form.FlavorName
image := strings.TrimSpace(form.Image)

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplGrampusTrainJobGPUNew, &form)
return
}

if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form)
@@ -401,6 +415,15 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
versionCount := modelarts.VersionCountOne
engineName := form.EngineName

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplGrampusTrainJobNPUNew, &form)
return
}

if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form)


+ 41
- 6
routers/repo/modelarts.go View File

@@ -125,6 +125,9 @@ func NotebookNew(ctx *context.Context) {
}

func notebookNewDataPrepare(ctx *context.Context) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true
t := time.Now()
var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
@@ -210,6 +213,15 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm
imageId := form.ImageId
repo := ctx.Repo.Repository

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsNotebookNew, &form)
return
}

count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"])
@@ -786,8 +798,8 @@ func setSpecBySpecialPoolConfig(ctx *context.Context, jobType string) {
}

func trainJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) error {
var taskJobnameKey = ctx.Query("taskJobnameKey")
redis_client.Del(taskJobnameKey)
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)
ctx.Data["PageIsCloudBrain"] = true

//can, err := canUserCreateTrainJob(ctx.User.ID)
@@ -976,6 +988,9 @@ func trainJobNewVersionDataPrepare(ctx *context.Context) error {
}

func versionErrorDataPrepare(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true
var jobID = ctx.Params(":jobid")
// var versionName = ctx.Params(":version-name")
@@ -1094,12 +1109,11 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm)
VersionCount := modelarts.VersionCountOne
EngineName := form.EngineName

taskJobnameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobnameKey, "", 5*time.Second)
ctx.Data["taskJobnameKey"] = taskJobnameKey
taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
// trainJobErrorNewDataPrepare(ctx, form)
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsTrainJobIndex, &form)
return
}
@@ -1417,6 +1431,15 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ
EngineName := form.EngineName
isLatestVersion := modelarts.IsLatestVersion

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsTrainJobVersionNew, &form)
return
}

canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID)
if !canNewJob {
versionErrorDataPrepare(ctx, form)
@@ -2000,6 +2023,15 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference
ckptUrl := "/" + form.TrainUrl + form.CkptName
log.Info("ckpt url:" + ckptUrl)

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsInferenceJobNew, &form)
return
}

count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainInferenceJobCountByUserID failed:%v", err, ctx.Data["MsgID"])
@@ -2378,6 +2410,9 @@ func inferenceJobNewDataPrepare(ctx *context.Context) error {
}

func inferenceJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModelArtsInferenceJobForm) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true

t := time.Now()


Loading…
Cancel
Save