diff --git a/models/cloudbrain.go b/models/cloudbrain.go index f1a99577b..e43b86030 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -101,7 +101,8 @@ const ( ModelArtsTrainJobCheckRunningCompleted ModelArtsJobStatus = "CHECK_RUNNING_COMPLETED" //审核作业已经完成 ModelArtsTrainJobCheckFailed ModelArtsJobStatus = "CHECK_FAILED" //审核作业失败 - DURATION_STR_ZERO = "00:00:00" + DURATION_STR_ZERO = "00:00:00" + CloudbrainKeyDuration = 24 * time.Hour //grampus GrampusStatusPending = "pending" diff --git a/modules/auth/wechat/access_token.go b/modules/auth/wechat/access_token.go index e4e38ee30..aa2609b35 100644 --- a/modules/auth/wechat/access_token.go +++ b/modules/auth/wechat/access_token.go @@ -1,10 +1,11 @@ package wechat import ( + "time" + "code.gitea.io/gitea/modules/redis/redis_client" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" - "time" ) var accessTokenLock = redis_lock.NewDistributeLock(redis_key.AccessTokenLockKey()) diff --git a/modules/cloudbrain/resty.go b/modules/cloudbrain/resty.go index 556ba2a97..03f368dda 100755 --- a/modules/cloudbrain/resty.go +++ b/modules/cloudbrain/resty.go @@ -145,7 +145,6 @@ sendjob: if jobResult.Code != Success { return &jobResult, fmt.Errorf("jobResult err: %s", res.String()) } - return &jobResult, nil } diff --git a/modules/redis/redis_key/cloudbrain_redis_key.go b/modules/redis/redis_key/cloudbrain_redis_key.go new file mode 100644 index 000000000..4c5d05dfc --- /dev/null +++ b/modules/redis/redis_key/cloudbrain_redis_key.go @@ -0,0 +1,7 @@ +package redis_key + +const CLOUDBRAIN_PREFIX = "cloudbrain" + +func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string { + return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key") +} diff --git a/modules/redis/redis_lock/lock.go b/modules/redis/redis_lock/lock.go index 5723c379d..b7a9daa05 100644 --- a/modules/redis/redis_lock/lock.go +++ b/modules/redis/redis_lock/lock.go @@ -1,8 +1,9 @@ package redis_lock import ( - "code.gitea.io/gitea/modules/redis/redis_client" "time" + + "code.gitea.io/gitea/modules/redis/redis_client" ) type DistributeLock struct { diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index b295e4d49..8d3868b73 100755 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -1093,6 +1093,7 @@ cloudbrain_operate = Operate cloudbrain_status_createtime = Status/Createtime cloudbrain_status_runtime = Running Time cloudbrain_jobname_err=Name must start with a lowercase letter or number,can include lowercase letter,number,_ and -,can not end with _, and can be up to 36 characters long. +cloudbrain_samejob_err=A task with the same name has been created, the system is processing it, please wait a minute. cloudbrain_bootfile_err=The bootfile does not exist in the repository cloudbrain_query_fail=Failed to query cloudbrain information. cloudbrain.mirror_tag = Mirror Tag diff --git a/options/locale/locale_zh-CN.ini b/options/locale/locale_zh-CN.ini index ae2d5c7f4..b02600803 100755 --- a/options/locale/locale_zh-CN.ini +++ b/options/locale/locale_zh-CN.ini @@ -1097,6 +1097,7 @@ cloudbrain_operate=操作 cloudbrain_status_createtime=状态/创建时间 cloudbrain_status_runtime = 运行时长 cloudbrain_jobname_err=只能以小写字母或数字开头且只包含小写字母、数字、_和-,不能以_结尾,最长36个字符。 +cloudbrain_samejob_err=同名任务已经被创建,系统处理中,请您稍候。 cloudbrain_bootfile_err=仓库中不存在启动文件 cloudbrain_query_fail=查询云脑任务失败。 cloudbrain.mirror_tag = 镜像标签 diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 27670f23c..54e24fe97 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -2,8 +2,6 @@ package repo import ( "bufio" - "code.gitea.io/gitea/services/cloudbrain/resource" - "code.gitea.io/gitea/services/reward/point/account" "encoding/json" "errors" "fmt" @@ -17,6 +15,9 @@ import ( "time" "unicode/utf8" + "code.gitea.io/gitea/services/cloudbrain/resource" + "code.gitea.io/gitea/services/reward/point/account" + "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/grampus" @@ -32,6 +33,8 @@ import ( "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/util" @@ -202,6 +205,16 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { tpl = tplCloudBrainTrainJobNew } + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + cloudBrainNewDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + return + } + defer lock.UnLock() + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { if len(tasks) != 0 { @@ -342,7 +355,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { ctx.RenderWithErr(err.Error(), tpl, &form) return } - if jobType == string(models.JobTypeTrain) { ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=all") } else { @@ -386,10 +398,20 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra bootFile := strings.TrimSpace(form.BootFile) labelName := form.LabelName repo := ctx.Repo.Repository + tpl := tplCloudBrainInferenceJobNew + + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + cloudBrainNewDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + return + } + defer lock.UnLock() ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName log.Info("ckpt url:" + ckptUrl) - tpl := tplCloudBrainInferenceJobNew command, err := getInferenceJobCommand(form) if err != nil { log.Error("getTrainJobCommand failed: %v", err) @@ -2238,12 +2260,21 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath benchmarkTypeID := form.BenchmarkTypeID benchmarkChildTypeID := form.BenchmarkChildTypeID + repo := ctx.Repo.Repository ctx.Data["description"] = form.Description ctx.Data["benchmarkTypeID"] = benchmarkTypeID ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID - repo := ctx.Repo.Repository + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), form.JobType, displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + cloudBrainNewDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplCloudBrainBenchmarkNew, &form) + return + } + defer lock.UnLock() tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeBenchmark), displayJobName) if err == nil { @@ -2425,6 +2456,16 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm) tpl := tplCloudBrainBenchmarkNew command := cloudbrain.GetCloudbrainDebugCommand() + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + cloudBrainNewDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + return + } + defer lock.UnLock() + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { if len(tasks) != 0 { diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index 1e9af8139..0100e6eb2 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -1,8 +1,6 @@ package repo import ( - "code.gitea.io/gitea/services/cloudbrain/resource" - "code.gitea.io/gitea/services/reward/point/account" "encoding/json" "errors" "fmt" @@ -14,11 +12,16 @@ import ( "strings" "time" + "code.gitea.io/gitea/services/cloudbrain/resource" + "code.gitea.io/gitea/services/reward/point/account" + "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/notification" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "github.com/unknwon/com" @@ -215,6 +218,16 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain branchName := form.BranchName image := strings.TrimSpace(form.Image) + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobGPUNew, &form) + return + } + defer lock.UnLock() + if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form) @@ -417,6 +430,16 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain versionCount := modelarts.VersionCountOne engineName := form.EngineName + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobNPUNew, &form) + return + } + defer lock.UnLock() + if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form) diff --git a/routers/repo/modelarts.go b/routers/repo/modelarts.go index ad7dc9c3c..4f3341e6f 100755 --- a/routers/repo/modelarts.go +++ b/routers/repo/modelarts.go @@ -2,9 +2,6 @@ package repo import ( "archive/zip" - "code.gitea.io/gitea/modules/modelarts_cd" - "code.gitea.io/gitea/services/cloudbrain/resource" - "code.gitea.io/gitea/services/reward/point/account" "encoding/json" "errors" "fmt" @@ -18,6 +15,10 @@ import ( "time" "unicode/utf8" + "code.gitea.io/gitea/modules/modelarts_cd" + "code.gitea.io/gitea/services/cloudbrain/resource" + "code.gitea.io/gitea/services/reward/point/account" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/base" @@ -28,6 +29,8 @@ import ( "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/obs" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/timeutil" @@ -210,6 +213,16 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm imageId := form.ImageId repo := ctx.Repo.Repository + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + notebookNewDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsNotebookNew, &form) + return + } + defer lock.UnLock() + count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID) if err != nil { log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"]) @@ -1153,6 +1166,16 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) return } + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + trainJobErrorNewDataPrepare(ctx, form) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobNew, &form) + return + } + defer lock.UnLock() + count, err := models.GetCloudbrainTrainJobCountByUserID(ctx.User.ID) if err != nil { log.Error("GetCloudbrainTrainJobCountByUserID failed:%v", err, ctx.Data["MsgID"]) @@ -1525,6 +1548,16 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ EngineName := form.EngineName isLatestVersion := modelarts.IsLatestVersion + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + versionErrorDataPrepare(ctx, form) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobVersionNew, &form) + return + } + defer lock.UnLock() + canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID) if !canNewJob { versionErrorDataPrepare(ctx, form) @@ -2136,6 +2169,16 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference return } + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + inferenceJobErrorNewDataPrepare(ctx, form) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsInferenceJobNew, &form) + return + } + defer lock.UnLock() + count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID) if err != nil { log.Error("GetCloudbrainInferenceJobCountByUserID failed:%v", err, ctx.Data["MsgID"])