Browse Source

Merge pull request 'fix-2069' (#2882) from fix-2069 into V20220926

Reviewed-on: https://git.openi.org.cn/OpenI/aiforge/pulls/2882
Reviewed-by: zouap <zouap@pcl.ac.cn>
tags/v1.22.9.2^2
zouap 3 years ago
parent
commit
c629c5b5f3
10 changed files with 132 additions and 14 deletions
  1. +2
    -1
      models/cloudbrain.go
  2. +2
    -1
      modules/auth/wechat/access_token.go
  3. +0
    -1
      modules/cloudbrain/resty.go
  4. +7
    -0
      modules/redis/redis_key/cloudbrain_redis_key.go
  5. +2
    -1
      modules/redis/redis_lock/lock.go
  6. +1
    -0
      options/locale/locale_en-US.ini
  7. +1
    -0
      options/locale/locale_zh-CN.ini
  8. +46
    -5
      routers/repo/cloudbrain.go
  9. +25
    -2
      routers/repo/grampus.go
  10. +46
    -3
      routers/repo/modelarts.go

+ 2
- 1
models/cloudbrain.go View File

@@ -101,7 +101,8 @@ const (
ModelArtsTrainJobCheckRunningCompleted ModelArtsJobStatus = "CHECK_RUNNING_COMPLETED" //审核作业已经完成
ModelArtsTrainJobCheckFailed ModelArtsJobStatus = "CHECK_FAILED" //审核作业失败

DURATION_STR_ZERO = "00:00:00"
DURATION_STR_ZERO = "00:00:00"
CloudbrainKeyDuration = 24 * time.Hour

//grampus
GrampusStatusPending = "pending"


+ 2
- 1
modules/auth/wechat/access_token.go View File

@@ -1,10 +1,11 @@
package wechat

import (
"time"

"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"time"
)

var accessTokenLock = redis_lock.NewDistributeLock(redis_key.AccessTokenLockKey())


+ 0
- 1
modules/cloudbrain/resty.go View File

@@ -145,7 +145,6 @@ sendjob:
if jobResult.Code != Success {
return &jobResult, fmt.Errorf("jobResult err: %s", res.String())
}

return &jobResult, nil
}



+ 7
- 0
modules/redis/redis_key/cloudbrain_redis_key.go View File

@@ -0,0 +1,7 @@
package redis_key

const CLOUDBRAIN_PREFIX = "cloudbrain"

func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string {
return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key")
}

+ 2
- 1
modules/redis/redis_lock/lock.go View File

@@ -1,8 +1,9 @@
package redis_lock

import (
"code.gitea.io/gitea/modules/redis/redis_client"
"time"

"code.gitea.io/gitea/modules/redis/redis_client"
)

type DistributeLock struct {


+ 1
- 0
options/locale/locale_en-US.ini View File

@@ -1093,6 +1093,7 @@ cloudbrain_operate = Operate
cloudbrain_status_createtime = Status/Createtime
cloudbrain_status_runtime = Running Time
cloudbrain_jobname_err=Name must start with a lowercase letter or number,can include lowercase letter,number,_ and -,can not end with _, and can be up to 36 characters long.
cloudbrain_samejob_err=A task with the same name has been created, the system is processing it, please wait a minute.
cloudbrain_bootfile_err=The bootfile does not exist in the repository
cloudbrain_query_fail=Failed to query cloudbrain information.
cloudbrain.mirror_tag = Mirror Tag


+ 1
- 0
options/locale/locale_zh-CN.ini View File

@@ -1097,6 +1097,7 @@ cloudbrain_operate=操作
cloudbrain_status_createtime=状态/创建时间
cloudbrain_status_runtime = 运行时长
cloudbrain_jobname_err=只能以小写字母或数字开头且只包含小写字母、数字、_和-,不能以_结尾,最长36个字符。
cloudbrain_samejob_err=同名任务已经被创建,系统处理中,请您稍候。
cloudbrain_bootfile_err=仓库中不存在启动文件
cloudbrain_query_fail=查询云脑任务失败。
cloudbrain.mirror_tag = 镜像标签


+ 46
- 5
routers/repo/cloudbrain.go View File

@@ -2,8 +2,6 @@ package repo

import (
"bufio"
"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"
"encoding/json"
"errors"
"fmt"
@@ -17,6 +15,9 @@ import (
"time"
"unicode/utf8"

"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"

"code.gitea.io/gitea/modules/notification"

"code.gitea.io/gitea/modules/grampus"
@@ -32,6 +33,8 @@ import (
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util"
@@ -202,6 +205,16 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
tpl = tplCloudBrainTrainJobNew
}

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {
if len(tasks) != 0 {
@@ -342,7 +355,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
ctx.RenderWithErr(err.Error(), tpl, &form)
return
}

if jobType == string(models.JobTypeTrain) {
ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=all")
} else {
@@ -386,10 +398,20 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra
bootFile := strings.TrimSpace(form.BootFile)
labelName := form.LabelName
repo := ctx.Repo.Repository
tpl := tplCloudBrainInferenceJobNew

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
return
}
defer lock.UnLock()

ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName
log.Info("ckpt url:" + ckptUrl)
tpl := tplCloudBrainInferenceJobNew
command, err := getInferenceJobCommand(form)
if err != nil {
log.Error("getTrainJobCommand failed: %v", err)
@@ -2238,12 +2260,21 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo
codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath
benchmarkTypeID := form.BenchmarkTypeID
benchmarkChildTypeID := form.BenchmarkChildTypeID
repo := ctx.Repo.Repository

ctx.Data["description"] = form.Description
ctx.Data["benchmarkTypeID"] = benchmarkTypeID
ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID

repo := ctx.Repo.Repository
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), form.JobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplCloudBrainBenchmarkNew, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeBenchmark), displayJobName)
if err == nil {
@@ -2425,6 +2456,16 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm)
tpl := tplCloudBrainBenchmarkNew
command := cloudbrain.GetCloudbrainDebugCommand()

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {
if len(tasks) != 0 {


+ 25
- 2
routers/repo/grampus.go View File

@@ -1,8 +1,6 @@
package repo

import (
"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"
"encoding/json"
"errors"
"fmt"
@@ -14,11 +12,16 @@ import (
"strings"
"time"

"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"

"code.gitea.io/gitea/modules/auth"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
"github.com/unknwon/com"
@@ -215,6 +218,16 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
branchName := form.BranchName
image := strings.TrimSpace(form.Image)

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobGPUNew, &form)
return
}
defer lock.UnLock()

if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form)
@@ -417,6 +430,16 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
versionCount := modelarts.VersionCountOne
engineName := form.EngineName

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobNPUNew, &form)
return
}
defer lock.UnLock()

if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form)


+ 46
- 3
routers/repo/modelarts.go View File

@@ -2,9 +2,6 @@ package repo

import (
"archive/zip"
"code.gitea.io/gitea/modules/modelarts_cd"
"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"
"encoding/json"
"errors"
"fmt"
@@ -18,6 +15,10 @@ import (
"time"
"unicode/utf8"

"code.gitea.io/gitea/modules/modelarts_cd"
"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/auth"
"code.gitea.io/gitea/modules/base"
@@ -28,6 +29,8 @@ import (
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/obs"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil"
@@ -210,6 +213,16 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm
imageId := form.ImageId
repo := ctx.Repo.Repository

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
notebookNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsNotebookNew, &form)
return
}
defer lock.UnLock()

count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"])
@@ -1153,6 +1166,16 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm)
return
}

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
trainJobErrorNewDataPrepare(ctx, form)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobNew, &form)
return
}
defer lock.UnLock()

count, err := models.GetCloudbrainTrainJobCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainTrainJobCountByUserID failed:%v", err, ctx.Data["MsgID"])
@@ -1525,6 +1548,16 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ
EngineName := form.EngineName
isLatestVersion := modelarts.IsLatestVersion

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
versionErrorDataPrepare(ctx, form)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobVersionNew, &form)
return
}
defer lock.UnLock()

canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID)
if !canNewJob {
versionErrorDataPrepare(ctx, form)
@@ -2136,6 +2169,16 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference
return
}

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
inferenceJobErrorNewDataPrepare(ctx, form)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsInferenceJobNew, &form)
return
}
defer lock.UnLock()

count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainInferenceJobCountByUserID failed:%v", err, ctx.Data["MsgID"])


Loading…
Cancel
Save