Browse Source

get job

tags/v1.22.6.2
lewis 3 years ago
parent
commit
ad72d95103
6 changed files with 86 additions and 28 deletions
  1. +12
    -7
      models/cloudbrain.go
  2. +14
    -5
      modules/grampus/grampus.go
  3. +9
    -15
      modules/grampus/resty.go
  4. +25
    -1
      routers/api/v1/repo/modelarts.go
  5. +25
    -0
      routers/repo/cloudbrain.go
  6. +1
    -0
      routers/repo/modelarts.go

+ 12
- 7
models/cloudbrain.go View File

@@ -102,11 +102,11 @@ const (


//grampus //grampus
GrampusStatusPending = "pending" GrampusStatusPending = "pending"
GrampusStatusRunning = "running"
GrampusStatusFailed = "failed"
GrampusStatusSucceeded = "succeeded"
GrampusStatusStopped = "stopped"
GrampusStatusUnknown = "unknown"
GrampusStatusRunning = "RUNNING"
GrampusStatusFailed = "FAILED"
GrampusStatusSucceeded = "SUCCEEDED"
GrampusStatusStopped = "STOPPED"
GrampusStatusUnknown = "UNKNOWN"
) )


type Cloudbrain struct { type Cloudbrain struct {
@@ -214,7 +214,7 @@ func ConvertDurationToStr(duration int64) string {
} }


func IsTrainJobTerminal(status string) bool { func IsTrainJobTerminal(status string) bool {
return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || status == string(ModelArtsTrainJobKilled)
return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || status == string(ModelArtsTrainJobKilled) || status == GrampusStatusFailed || status == GrampusStatusStopped || status == GrampusStatusSucceeded
} }


func IsModelArtsDebugJobTerminal(status string) bool { func IsModelArtsDebugJobTerminal(status string) bool {
@@ -1185,6 +1185,11 @@ type CreateGrampusJobResponse struct {
JobInfo GrampusJobInfo `json:"otJob"` JobInfo GrampusJobInfo `json:"otJob"`
} }


type GetGrampusJobResponse struct {
GrampusResult
JobInfo GrampusJobInfo `json:"otJob"`
}

type GrampusTasks struct { type GrampusTasks struct {
Command string `json:"command"` Command string `json:"command"`
Name string `json:"name"` Name string `json:"name"`
@@ -1227,7 +1232,7 @@ func Cloudbrains(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int64, error) {
) )
} }


if len(opts.ComputeResource) >= 0 {
if len(opts.ComputeResource) > 0 {
cond = cond.And( cond = cond.And(
builder.Eq{"cloudbrain.compute_resource": opts.ComputeResource}, builder.Eq{"cloudbrain.compute_resource": opts.ComputeResource},
) )


+ 14
- 5
modules/grampus/grampus.go View File

@@ -6,6 +6,7 @@ import (
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
"strings"
) )


const ( const (
@@ -100,7 +101,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error


jobID := jobResult.JobInfo.JobID jobID := jobResult.JobInfo.JobID
err = models.CreateCloudbrain(&models.Cloudbrain{ err = models.CreateCloudbrain(&models.Cloudbrain{
Status: string(models.GrampusStatusPending),
Status: TransTrainJobStatus(jobResult.JobInfo.Status),
UserID: ctx.User.ID, UserID: ctx.User.ID,
RepoID: ctx.Repo.Repository.ID, RepoID: ctx.Repo.Repository.ID,
JobID: jobID, JobID: jobID,
@@ -110,10 +111,10 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error
Type: models.TypeCloudBrainGrampus, Type: models.TypeCloudBrainGrampus,
//VersionID: jobResult.VersionID, //VersionID: jobResult.VersionID,
//VersionName: jobResult.VersionName, //VersionName: jobResult.VersionName,
Uuid: req.Uuid,
DatasetName: req.DatasetName,
CommitID: req.CommitID,
//IsLatestVersion: req.IsLatestVersion,
Uuid: req.Uuid,
DatasetName: req.DatasetName,
CommitID: req.CommitID,
IsLatestVersion: req.IsLatestVersion,
ComputeResource: req.ComputeResource, ComputeResource: req.ComputeResource,
//EngineID: req.EngineID, //EngineID: req.EngineID,
TrainUrl: req.TrainUrl, TrainUrl: req.TrainUrl,
@@ -148,3 +149,11 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error


return nil return nil
} }

func TransTrainJobStatus(status string) string {
if status == "pending" {
status = "waiting"
}

return strings.ToUpper(status)
}

+ 9
- 15
modules/grampus/resty.go View File

@@ -124,40 +124,34 @@ sendjob:
return &result, nil return &result, nil
} }


func GetJob(jobID string) (*models.GetNotebookResult, error) {
func GetJob(jobID string) (*models.GetGrampusJobResponse, error) {
checkSetting() checkSetting()
client := getRestyClient() client := getRestyClient()
var result models.GetNotebookResult
var result models.GetGrampusJobResponse


retry := 0 retry := 0


sendjob: sendjob:
res, err := client.R().
_, err := client.R().
SetHeader("Content-Type", "application/json"). SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN). SetAuthToken(TOKEN).
SetResult(&result). SetResult(&result).
Get(HOST + "/v1/" + setting.ProjectID + urlTrainJob + "/" + jobID)
Get(HOST + urlTrainJob + "/" + jobID)


if err != nil { if err != nil {
return nil, fmt.Errorf("resty GetJob: %v", err) return nil, fmt.Errorf("resty GetJob: %v", err)
} }


if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
if result.ErrorCode == errorIllegalToken && retry < 1 {
retry++ retry++
log.Info("retry get token")
_ = getToken() _ = getToken()
goto sendjob goto sendjob
} }


var response models.NotebookResult
err = json.Unmarshal(res.Body(), &response)
if err != nil {
log.Error("json.Unmarshal failed: %s", err.Error())
return &result, fmt.Errorf("son.Unmarshal failed: %s", err.Error())
}

if len(response.ErrorCode) != 0 {
log.Error("GetJob failed(%s): %s", response.ErrorCode, response.ErrorMsg)
return &result, fmt.Errorf("GetJob failed(%s): %s", response.ErrorCode, response.ErrorMsg)
if result.ErrorCode != 0 {
log.Error("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg)
} }


return &result, nil return &result, nil


+ 25
- 1
routers/api/v1/repo/modelarts.go View File

@@ -6,6 +6,7 @@
package repo package repo


import ( import (
"code.gitea.io/gitea/modules/grampus"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@@ -167,7 +168,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {
log.Error("UpdateJob failed:", err) log.Error("UpdateJob failed:", err)
} }
} }
} else {
} else if job.Type == models.TypeCloudBrainTwo {
result, err := modelarts.GetTrainJob(jobID, strconv.FormatInt(job.VersionID, 10)) result, err := modelarts.GetTrainJob(jobID, strconv.FormatInt(job.VersionID, 10))
if err != nil { if err != nil {
ctx.NotFound(err) ctx.NotFound(err)
@@ -181,6 +182,29 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {
job.Duration = result.Duration / 1000 job.Duration = result.Duration / 1000
job.TrainJobDuration = models.ConvertDurationToStr(job.Duration) job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)


if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 {
job.EndTime = job.StartTime.Add(job.Duration)
}
job.CorrectCreateUnix()
err = models.UpdateTrainJobVersion(job)
if err != nil {
log.Error("UpdateJob failed:", err)
}
} else if job.Type == models.TypeCloudBrainGrampus {
result, err := grampus.GetJob(jobID)
if err != nil {
log.Error("GetJob(%s) failed:%v", job.JobName, err)
ctx.NotFound(err)
return
}

if job.StartTime == 0 && result.JobInfo.StartedAt > 0 {
job.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt / 1000)
}
job.Status = grampus.TransTrainJobStatus(result.JobInfo.Status)
job.Duration = result.JobInfo.RunSec
job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)

if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 { if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 {
job.EndTime = job.StartTime.Add(job.Duration) job.EndTime = job.StartTime.Add(job.Duration)
} }


+ 25
- 0
routers/repo/cloudbrain.go View File

@@ -2,6 +2,7 @@ package repo


import ( import (
"bufio" "bufio"
"code.gitea.io/gitea/modules/grampus"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -1492,7 +1493,31 @@ func SyncCloudbrainStatus() {
} else { } else {
log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType) log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType)
} }
} else if task.Type == models.TypeCloudBrainGrampus {
result, err := grampus.GetJob(task.JobID)
if err != nil {
log.Error("GetTrainJob(%s) failed:%v", task.JobName, err)
continue
}


if result != nil {
task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status)
task.Duration = result.JobInfo.RunSec
task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)

if task.StartTime == 0 && result.JobInfo.StartedAt > 0 {
task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt / 1000)
}
if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 {
task.EndTime = task.StartTime.Add(task.Duration)
}
task.CorrectCreateUnix()
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
continue
}
}
} else { } else {
log.Error("task.Type(%s) is error:%d", task.JobName, task.Type) log.Error("task.Type(%s) is error:%d", task.JobName, task.Type)
} }


+ 1
- 0
routers/repo/modelarts.go View File

@@ -577,6 +577,7 @@ func TrainJobIndex(ctx *context.Context) {
JobTypes: jobTypes, JobTypes: jobTypes,
IsLatestVersion: modelarts.IsLatestVersion, IsLatestVersion: modelarts.IsLatestVersion,
ComputeResource: listType, ComputeResource: listType,
Type: models.TypeCloudBrainAll,
}) })
if err != nil { if err != nil {
ctx.ServerError("Cloudbrain", err) ctx.ServerError("Cloudbrain", err)


Loading…
Cancel
Save