Browse Source

sync

tags/v1.22.8.1^2
lewis 3 years ago
parent
commit
4895dc4973
4 changed files with 108 additions and 213 deletions
  1. +1
    -5
      models/cloudbrain.go
  2. +10
    -9
      models/cloudbrain_temp.go
  3. +96
    -198
      modules/modelarts/modelarts.go
  4. +1
    -1
      routers/repo/modelarts.go

+ 1
- 5
models/cloudbrain.go View File

@@ -8,14 +8,13 @@ import (
"strings"
"time"

"code.gitea.io/gitea/modules/util"

"xorm.io/builder"
"xorm.io/xorm"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)

type CloudbrainStatus string
@@ -31,9 +30,6 @@ const (
)

const (
TempJobId = "TEMP"
TempVersionId = TempJobId
TempJobStatus = TempJobId
NPUResource = "NPU"
GPUResource = "CPU/GPU"
AllResource = "all"


+ 10
- 9
models/cloudbrain_temp.go View File

@@ -1,23 +1,24 @@
package models

import (
"code.gitea.io/gitea/modules/setting"
"time"

"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)

const (
//TempJobIdPrefix = "TEMP"

TempJobId = "TEMP"
TempVersionId = TempJobId
TempJobStatus = TempJobId
)

type CloudbrainTemp struct {
ID int64 `xorm:"pk autoincr"`
JobID string
VersionID string
JobName string
Type int
ID int64 `xorm:"pk autoincr"`
JobID string `xorm:"NOT NULL DEFAULT 'TEMP'"`
VersionID string `xorm:"NOT NULL DEFAULT 'TEMP'"`
JobName string `xorm:"NOT NULL "`
Type int `xorm:"NOT NULL "`
JobType string `xorm:"INDEX NOT NULL DEFAULT 'DEBUG'"`
Status string `xorm:"INDEX NOT NULL DEFAULT 'TEMP'"`
QueryTimes int `xorm:"INDEX NOT NULL DEFAULT 0"`
@@ -46,7 +47,7 @@ func getCloudBrainTemp(temp *CloudbrainTemp) (*CloudbrainTemp, error) {

func GetCloudBrainTempJobs() ([]*CloudbrainTemp, error) {
jobs := make([]*CloudbrainTemp, 0, 10)
return jobs, x.In("status", JobStatusTemp, string(ModelArtsStopped), string(ModelArtsStopping)).
return jobs, x.In("status", TempJobStatus, string(ModelArtsStopped)).
And("query_times < ?", setting.MaxTempQueryTimes).
Limit(100).
Find(&jobs)


+ 96
- 198
modules/modelarts/modelarts.go View File

@@ -288,7 +288,7 @@ func GenerateNotebook2(ctx *context.Context, displayJobName, jobName, uuid, desc
log.Info("(%s)unknown error, set temp status", displayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempJobVersionId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: jobName,
@@ -381,7 +381,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempJobVersionId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
@@ -512,7 +512,7 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: jobId,
VersionID: models.TempJobVersionId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
@@ -591,99 +591,6 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
return createErr
}

func GenerateTrainJobVersionByUserImage(ctx *context.Context, req *GenerateTrainJobReq, jobId string) (err error) {
createTime := timeutil.TimeStampNow()
jobResult, err := createTrainJobUserImage(models.CreateUserImageTrainJobParams{
JobName: req.JobName,
Description: req.Description,
Config: models.UserImageConfig{
WorkServerNum: req.WorkServerNumber,
AppUrl: req.CodeObsPath,
BootFileUrl: req.BootFileUrl,
DataUrl: req.DataUrl,
TrainUrl: req.TrainUrl,
LogUrl: req.LogUrl,
PoolID: req.PoolID,
CreateVersion: true,
Flavor: models.Flavor{
Code: req.FlavorCode,
},
Parameter: req.Parameters,
UserImageUrl: req.UserImageUrl,
UserCommand: req.UserCommand,
},
})
if err != nil {
log.Error("CreateJob failed: %v", err.Error())
return err
}

var jobTypes []string
jobTypes = append(jobTypes, string(models.JobTypeTrain))
repo := ctx.Repo.Repository
VersionTaskList, VersionListCount, err := models.CloudbrainsVersionList(&models.CloudbrainsOptions{
RepoID: repo.ID,
Type: models.TypeCloudBrainTwo,
JobTypes: jobTypes,
JobID: strconv.FormatInt(jobResult.JobID, 10),
})
if err != nil {
ctx.ServerError("Cloudbrain", err)
return err
}
//将当前版本的isLatestVersion设置为"1"和任务数量更新,任务数量包括当前版本数VersionCount和历史创建的总版本数TotalVersionCount

err = models.CreateCloudbrain(&models.Cloudbrain{
Status: TransTrainJobStatus(jobResult.Status),
UserID: ctx.User.ID,
RepoID: ctx.Repo.Repository.ID,
JobID: strconv.FormatInt(jobResult.JobID, 10),
JobName: req.JobName,
DisplayJobName: req.DisplayJobName,
JobType: string(models.JobTypeTrain),
Type: models.TypeCloudBrainTwo,
VersionID: jobResult.VersionID,
VersionName: jobResult.VersionName,
Uuid: req.Uuid,
DatasetName: req.DatasetName,
CommitID: req.CommitID,
IsLatestVersion: req.IsLatestVersion,
PreVersionName: req.PreVersionName,
ComputeResource: models.NPUResource,
EngineID: MORDELART_USER_IMAGE_ENGINE_ID,
Image: req.UserImageUrl,
TrainUrl: req.TrainUrl,
BranchName: req.BranchName,
Parameters: req.Params,
BootFile: req.BootFile,
DataUrl: req.DataUrl,
LogUrl: req.LogUrl,
PreVersionId: req.PreVersionId,
FlavorCode: req.FlavorCode,
Description: req.Description,
WorkServerNumber: req.WorkServerNumber,
FlavorName: req.FlavorName,
EngineName: req.EngineName,
TotalVersionCount: VersionTaskList[0].TotalVersionCount + 1,
VersionCount: VersionListCount + 1,
CreatedUnix: createTime,
UpdatedUnix: createTime,
})
if err != nil {
log.Error("CreateCloudbrain(%s) failed:%v", req.JobName, err.Error())
return err
}

//将训练任务的上一版本的isLatestVersion设置为"0"
err = models.SetVersionCountAndLatestVersion(strconv.FormatInt(jobResult.JobID, 10), VersionTaskList[0].VersionName, VersionCountOne, NotLatestVersion, TotalVersionCount)
if err != nil {
ctx.ServerError("Update IsLatestVersion failed", err)
return err
}

return err
}

func TransTrainJobStatus(status int) string {
switch status {
case 0:
@@ -769,7 +676,7 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
err = models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempJobVersionId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
@@ -941,27 +848,27 @@ func SyncTempStatusJob() {

for _, temp := range jobs {
log.Info("start to handle record: %s", temp.JobName)

if temp.Type == models.TypeCloudBrainTwo {
if temp.JobType == string(models.JobTypeDebug) {
err = handleNotebook(temp)
if err != nil {
log.Error("handleTempNotebook falied:%v", err)
log.Error("handleNotebook falied:%v", err)
break
}
} else if temp.JobType == string(models.JobTypeTrain) || temp.JobType == string(models.JobTypeInference) {
if task.VersionCount > VersionCountOne {
//multi version
err = handleTrainJobMultiVersion(temp)
_, err = models.GetCloudbrainByJobID(temp.JobID)
if err != nil {
//one version
err = handleTrainJob(temp)
if err != nil {
log.Error("handleTrainJobMultiVersion falied:%v", err)
log.Error("handleTrainJob falied:%v", err)
break
}
} else {
//inference or one version
err = handleTrainJob(temp)
//multi version
err = handleTrainJobMultiVersion(temp)
if err != nil {
log.Error("handleTrainJob falied:%v", err)
log.Error("handleTrainJobMultiVersion falied:%v", err)
break
}
}
@@ -973,13 +880,7 @@ func SyncTempStatusJob() {
}

func handleNotebook(temp *models.CloudbrainTemp) error {
if temp.Status == string(models.ModelArtsStopped) {
_, err := DelNotebook2(temp.JobID)
if err != nil {
log.Error("DelNotebook2 failed:%v", err)
return err
}
} else if temp.Status == models.JobStatusTemp {
if temp.Status == models.TempJobStatus {
err := handleTempNotebook(temp)
if err != nil {
log.Error("handleTempNotebook failed:%v", err)
@@ -993,10 +894,18 @@ func handleNotebook(temp *models.CloudbrainTemp) error {
}

temp.Status = res.Status
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
if temp.Status == string(models.ModelArtsStopped) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}

_, err := DelNotebook2(temp.JobID)
if err != nil {
log.Error("DelNotebook2 failed:%v", err)
return err
}
}
}

@@ -1017,7 +926,7 @@ func handleTempNotebook(temp *models.CloudbrainTemp) error {
temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("IncreaseCloudbrainTempQueryTimes failed:%v", err)
log.Error("UpdateCloudbrainTemp failed:%v", err)
}

if result != nil {
@@ -1045,7 +954,6 @@ func handleTempNotebook(temp *models.CloudbrainTemp) error {

if isExist {
log.Info("find the record(%s)", temp.JobName)
models.UpdateCloudbrainTemp(temp)
res, err := ManageNotebook2(temp.JobID, models.NotebookAction{Action: models.ActionStop})
if err != nil {
log.Error("ManageNotebook2(%s) failed:%v", temp.JobName, err)
@@ -1082,14 +990,7 @@ func handleTempNotebook(temp *models.CloudbrainTemp) error {
}

func handleTrainJob(temp *models.CloudbrainTemp) error {
if temp.Status == string(models.ModelArtsStopped) {
_, err := DelTrainJob(temp.JobID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}
} else if temp.Status == models.JobStatusTemp {
//todo
if temp.Status == models.TempJobStatus {
err := handleTempTrainJob(temp)
if err != nil {
log.Error("handleTempTrainJob failed:%v", err)
@@ -1103,10 +1004,18 @@ func handleTrainJob(temp *models.CloudbrainTemp) error {
}

temp.Status = TransTrainJobStatus(res.IntStatus)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
if temp.Status == string(models.ModelArtsStopped) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}

_, err := DelTrainJob(temp.JobID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}
}
}

@@ -1114,17 +1023,10 @@ func handleTrainJob(temp *models.CloudbrainTemp) error {
}

func handleTrainJobMultiVersion(temp *models.CloudbrainTemp) error {
if temp.Status == string(models.ModelArtsStopped) {
_, err := DelTrainJobVersion(temp.JobID, temp.VersionID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}
} else if temp.Status == models.JobStatusTemp {
//todo
if temp.Status == models.TempJobStatus {
err := handleTempTrainJobMultiVersion(temp)
if err != nil {
log.Error("handleTempTrainJob failed:%v", err)
log.Error("handleTempTrainJobMultiVersion failed:%v", err)
return err
}
} else if temp.Status == string(models.ModelArtsStopping) {
@@ -1135,20 +1037,31 @@ func handleTrainJobMultiVersion(temp *models.CloudbrainTemp) error {
}

temp.Status = TransTrainJobStatus(res.IntStatus)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
if temp.Status == string(models.ModelArtsStopped) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}

_, err := DelTrainJobVersion(temp.JobID, temp.VersionID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}
}

}

return nil
}

func handleMultiVersionJob(temp *models.CloudbrainTemp, task *models.Cloudbrain) error {
func handleTempTrainJobMultiVersion(temp *models.CloudbrainTemp) error {
var err error
var isExist bool

for {
result, err := GetTrainJobVersionList(1000, 1, task.JobID)
result, err := GetTrainJobVersionList(1000, 1, temp.JobID)
if err != nil {
log.Error("GetTrainJobVersionList failed:%v", err)
break
@@ -1157,35 +1070,29 @@ func handleMultiVersionJob(temp *models.CloudbrainTemp, task *models.Cloudbrain)
temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("IncreaseCloudbrainTempQueryTimes failed:%v", err)
log.Error("UpdateCloudbrainTemp failed:%v", err)
}

if result != nil {
if strconv.FormatInt(result.JobID, 10) == task.JobID && result.JobName == task.JobName {
if result.VersionCount == int64(task.VersionCount) {
log.Info("find the record(%s)", task.DisplayJobName)
task.Status = TransTrainJobStatus(result.JobVersionList[0].IntStatus)
task.VersionName = result.JobVersionList[0].VersionName
task.VersionID = result.JobVersionList[0].VersionID

err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
break
}
//todo: check find
count, _ := models.GetCloudbrainCountByJobName(temp.JobName, temp.JobType, temp.Type)
if result.VersionCount == int64(count+1) {
log.Info("find the record(%s)", temp.JobName)

temp.Status = task.Status
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", task.JobName, err)
break
}
isExist = true
temp.Status = TransTrainJobStatus(result.JobVersionList[0].IntStatus)
temp.VersionID = strconv.FormatInt(result.JobVersionList[0].VersionID, 10)

err = models.DeleteCloudbrainTemp(temp)
if err != nil {
log.Error("DeleteCloudbrainTemp(%s) failed:%v", task.DisplayJobName, err)
break
}
_, err := StopTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("StopTrainJob failed:%v", err)
break
}
temp.Status = string(models.ModelArtsStopping)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
break
}
}
}
@@ -1193,19 +1100,13 @@ func handleMultiVersionJob(temp *models.CloudbrainTemp, task *models.Cloudbrain)
break
}

if temp.QueryTimes >= setting.MaxTempQueryTimes && temp.Status != models.JobStatusTemp {
if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")
task.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
return err
}

temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", task.JobName, err)
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}
@@ -1213,8 +1114,10 @@ func handleMultiVersionJob(temp *models.CloudbrainTemp, task *models.Cloudbrain)
return err
}

func handleTrainJob(temp *models.CloudbrainTemp, task *models.Cloudbrain) error {
func handleTempTrainJob(temp *models.CloudbrainTemp) error {
var err error
var isExist bool

for {
result, err := GetTrainJobList(1000, 1, "create_time", "desc", temp.JobName)
if err != nil {
@@ -1225,36 +1128,31 @@ func handleTrainJob(temp *models.CloudbrainTemp, task *models.Cloudbrain) error
temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("IncreaseCloudbrainTempQueryTimes failed:%v", err)
log.Error("UpdateCloudbrainTemp failed:%v", err)
}

if result != nil {
for _, job := range result.JobList {
if task.JobName == job.JobName {
log.Info("find the record(%s)", task.DisplayJobName)
task.Status = TransTrainJobStatus(job.IntStatus)
task.JobID = strconv.FormatInt(job.JobID, 10)
if temp.JobName == job.JobName && TransTrainJobStatus(job.IntStatus) != string(models.ModelArtsTrainJobFailed) {
log.Info("find the record(%s)", temp.JobName)

err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err)
break
}
isExist = true
temp.Status = TransTrainJobStatus(job.IntStatus)
temp.JobID = strconv.FormatInt(job.JobID, 10)
temp.VersionID = strconv.FormatInt(job.VersionID, 10)

temp.Status = task.Status
err = models.UpdateCloudbrainTemp(temp)
_, err = StopTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", task.JobName, err)
log.Error("StopTrainJob(%s) failed:%v", temp.JobName, err)
break
}

err = models.DeleteCloudbrainTemp(temp)
temp.Status = string(models.ModelArtsStopping)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("DeleteCloudbrainTemp(%s) failed:%v", task.DisplayJobName, err)
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
break
}

break
}
}
}
@@ -1262,13 +1160,13 @@ func handleTrainJob(temp *models.CloudbrainTemp, task *models.Cloudbrain) error
break
}

if temp.QueryTimes >= setting.MaxTempQueryTimes && temp.Status != models.JobStatusTemp {
if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")

temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", task.JobName, err)
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}


+ 1
- 1
routers/repo/modelarts.go View File

@@ -452,7 +452,7 @@ func NotebookRestart(ctx *context.Context) {
log.Info("(%s)unknown error, set temp status", task.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: task.JobID,
VersionID: models.TempJobVersionId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: task.Type,
JobName: task.JobName,


Loading…
Cancel
Save