Browse Source

add lock to stop same task

tags/v1.22.9.2^2
liuzx 3 years ago
parent
commit
7c6a21a6cf
3 changed files with 48 additions and 56 deletions
  1. +22
    -16
      routers/repo/cloudbrain.go
  2. +9
    -12
      routers/repo/grampus.go
  3. +17
    -28
      routers/repo/modelarts.go

+ 22
- 16
routers/repo/cloudbrain.go View File

@@ -30,8 +30,8 @@ import (
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util"
@@ -100,9 +100,6 @@ func jobNamePrefixValid(s string) string {
}

func cloudBrainNewDataPrepare(ctx *context.Context) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true
t := time.Now()
var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
@@ -252,14 +249,14 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
tpl = tplCloudBrainTrainJobNew
}

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {
@@ -439,14 +436,14 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra
repo := ctx.Repo.Repository
tpl := tplCloudBrainInferenceJobNew

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form)
return
}
defer lock.UnLock()

ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName
log.Info("ckpt url:" + ckptUrl)
@@ -2319,14 +2316,14 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo
ctx.Data["benchmarkTypeID"] = benchmarkTypeID
ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeBenchmark), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), form.JobType, displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplCloudBrainBenchmarkNew, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeBenchmark), displayJobName)
if err == nil {
@@ -2515,6 +2512,15 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm)
tpl := tplCloudBrainBenchmarkNew
command := cloudbrain.GetCloudbrainDebugCommand()

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tpl, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {
if len(tasks) != 0 {


+ 9
- 12
routers/repo/grampus.go View File

@@ -17,8 +17,8 @@ import (
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
"github.com/unknwon/com"
@@ -63,9 +63,6 @@ func GrampusTrainJobNPUNew(ctx *context.Context) {
}

func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true

t := time.Now()
@@ -214,14 +211,14 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
flavorName := form.FlavorName
image := strings.TrimSpace(form.Image)

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplGrampusTrainJobGPUNew, &form)
return
}
defer lock.UnLock()

if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
@@ -415,14 +412,14 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
versionCount := modelarts.VersionCountOne
engineName := form.EngineName

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplGrampusTrainJobNPUNew, &form)
return
}
defer lock.UnLock()

if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)


+ 17
- 28
routers/repo/modelarts.go View File

@@ -25,8 +25,8 @@ import (
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/obs"
"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil"
@@ -125,9 +125,6 @@ func NotebookNew(ctx *context.Context) {
}

func notebookNewDataPrepare(ctx *context.Context) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true
t := time.Now()
var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
@@ -213,14 +210,14 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm
imageId := form.ImageId
repo := ctx.Repo.Repository

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsNotebookNew, &form)
return
}
defer lock.UnLock()

count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID)
if err != nil {
@@ -816,8 +813,6 @@ func setSpecBySpecialPoolConfig(ctx *context.Context, jobType string) {
}

func trainJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)
ctx.Data["PageIsCloudBrain"] = true

//can, err := canUserCreateTrainJob(ctx.User.ID)
@@ -1013,9 +1008,6 @@ func trainJobNewVersionDataPrepare(ctx *context.Context) error {
}

func versionErrorDataPrepare(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true
var jobID = ctx.Params(":jobid")
// var versionName = ctx.Params(":version-name")
@@ -1134,14 +1126,14 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm)
VersionCount := modelarts.VersionCountOne
EngineName := form.EngineName

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsTrainJobIndex, &form)
return
}
defer lock.UnLock()

count, err := models.GetCloudbrainTrainJobCountByUserID(ctx.User.ID)
if err != nil {
@@ -1456,14 +1448,14 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ
EngineName := form.EngineName
isLatestVersion := modelarts.IsLatestVersion

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsTrainJobVersionNew, &form)
return
}
defer lock.UnLock()

canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID)
if !canNewJob {
@@ -2058,14 +2050,14 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference
ckptUrl := "/" + form.TrainUrl + form.CkptName
log.Info("ckpt url:" + ckptUrl)

taskJobNameKey := redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)
isOk, err := redis_client.Setnx(taskJobNameKey, "", 5*time.Second)
ctx.Data["taskJobNameKey"] = taskJobNameKey
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName))
isOk := lock.Lock(60 * time.Second)
if !isOk {
log.Error("The task have been processed:%v", err, ctx.Data["MsgID"])
log.Error("The task have been processed", ctx.Data["MsgID"])
ctx.RenderWithErr("The task have been processed, please wait a minute", tplModelArtsInferenceJobNew, &form)
return
}
defer lock.UnLock()

count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID)
if err != nil {
@@ -2468,9 +2460,6 @@ func inferenceJobNewDataPrepare(ctx *context.Context) error {
}

func inferenceJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModelArtsInferenceJobForm) error {
var taskJobNameKey = ctx.Query("taskJobNameKey")
redis_client.Del(taskJobNameKey)

ctx.Data["PageIsCloudBrain"] = true

t := time.Now()


Loading…
Cancel
Save