From ad72d9510337e40c9f10fbe4887c234d998da2f8 Mon Sep 17 00:00:00 2001 From: lewis <747342561@qq.com> Date: Mon, 23 May 2022 20:21:44 +0800 Subject: [PATCH] get job --- models/cloudbrain.go | 19 ++++++++++++------- modules/grampus/grampus.go | 19 ++++++++++++++----- modules/grampus/resty.go | 24 +++++++++--------------- routers/api/v1/repo/modelarts.go | 26 +++++++++++++++++++++++++- routers/repo/cloudbrain.go | 25 +++++++++++++++++++++++++ routers/repo/modelarts.go | 1 + 6 files changed, 86 insertions(+), 28 deletions(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index f775626ad..694e277d4 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -102,11 +102,11 @@ const ( //grampus GrampusStatusPending = "pending" - GrampusStatusRunning = "running" - GrampusStatusFailed = "failed" - GrampusStatusSucceeded = "succeeded" - GrampusStatusStopped = "stopped" - GrampusStatusUnknown = "unknown" + GrampusStatusRunning = "RUNNING" + GrampusStatusFailed = "FAILED" + GrampusStatusSucceeded = "SUCCEEDED" + GrampusStatusStopped = "STOPPED" + GrampusStatusUnknown = "UNKNOWN" ) type Cloudbrain struct { @@ -214,7 +214,7 @@ func ConvertDurationToStr(duration int64) string { } func IsTrainJobTerminal(status string) bool { - return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || status == string(ModelArtsTrainJobKilled) + return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || status == string(ModelArtsTrainJobKilled) || status == GrampusStatusFailed || status == GrampusStatusStopped || status == GrampusStatusSucceeded } func IsModelArtsDebugJobTerminal(status string) bool { @@ -1185,6 +1185,11 @@ type CreateGrampusJobResponse struct { JobInfo GrampusJobInfo `json:"otJob"` } +type GetGrampusJobResponse struct { + GrampusResult + JobInfo GrampusJobInfo `json:"otJob"` +} + type GrampusTasks struct { Command string `json:"command"` Name string `json:"name"` @@ -1227,7 +1232,7 @@ func Cloudbrains(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int64, error) { ) } - if len(opts.ComputeResource) >= 0 { + if len(opts.ComputeResource) > 0 { cond = cond.And( builder.Eq{"cloudbrain.compute_resource": opts.ComputeResource}, ) diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 26e143429..71e368fa6 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -6,6 +6,7 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/timeutil" + "strings" ) const ( @@ -100,7 +101,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error jobID := jobResult.JobInfo.JobID err = models.CreateCloudbrain(&models.Cloudbrain{ - Status: string(models.GrampusStatusPending), + Status: TransTrainJobStatus(jobResult.JobInfo.Status), UserID: ctx.User.ID, RepoID: ctx.Repo.Repository.ID, JobID: jobID, @@ -110,10 +111,10 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error Type: models.TypeCloudBrainGrampus, //VersionID: jobResult.VersionID, //VersionName: jobResult.VersionName, - Uuid: req.Uuid, - DatasetName: req.DatasetName, - CommitID: req.CommitID, - //IsLatestVersion: req.IsLatestVersion, + Uuid: req.Uuid, + DatasetName: req.DatasetName, + CommitID: req.CommitID, + IsLatestVersion: req.IsLatestVersion, ComputeResource: req.ComputeResource, //EngineID: req.EngineID, TrainUrl: req.TrainUrl, @@ -148,3 +149,11 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error return nil } + +func TransTrainJobStatus(status string) string { + if status == "pending" { + status = "waiting" + } + + return strings.ToUpper(status) +} diff --git a/modules/grampus/resty.go b/modules/grampus/resty.go index f64c98b55..6f1ee72d6 100755 --- a/modules/grampus/resty.go +++ b/modules/grampus/resty.go @@ -124,40 +124,34 @@ sendjob: return &result, nil } -func GetJob(jobID string) (*models.GetNotebookResult, error) { +func GetJob(jobID string) (*models.GetGrampusJobResponse, error) { checkSetting() client := getRestyClient() - var result models.GetNotebookResult + var result models.GetGrampusJobResponse retry := 0 sendjob: - res, err := client.R(). + _, err := client.R(). SetHeader("Content-Type", "application/json"). SetAuthToken(TOKEN). SetResult(&result). - Get(HOST + "/v1/" + setting.ProjectID + urlTrainJob + "/" + jobID) + Get(HOST + urlTrainJob + "/" + jobID) if err != nil { return nil, fmt.Errorf("resty GetJob: %v", err) } - if res.StatusCode() == http.StatusUnauthorized && retry < 1 { + if result.ErrorCode == errorIllegalToken && retry < 1 { retry++ + log.Info("retry get token") _ = getToken() goto sendjob } - var response models.NotebookResult - err = json.Unmarshal(res.Body(), &response) - if err != nil { - log.Error("json.Unmarshal failed: %s", err.Error()) - return &result, fmt.Errorf("son.Unmarshal failed: %s", err.Error()) - } - - if len(response.ErrorCode) != 0 { - log.Error("GetJob failed(%s): %s", response.ErrorCode, response.ErrorMsg) - return &result, fmt.Errorf("GetJob failed(%s): %s", response.ErrorCode, response.ErrorMsg) + if result.ErrorCode != 0 { + log.Error("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) } return &result, nil diff --git a/routers/api/v1/repo/modelarts.go b/routers/api/v1/repo/modelarts.go index 9e4edea03..06e4bea44 100755 --- a/routers/api/v1/repo/modelarts.go +++ b/routers/api/v1/repo/modelarts.go @@ -6,6 +6,7 @@ package repo import ( + "code.gitea.io/gitea/modules/grampus" "net/http" "strconv" "strings" @@ -167,7 +168,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { log.Error("UpdateJob failed:", err) } } - } else { + } else if job.Type == models.TypeCloudBrainTwo { result, err := modelarts.GetTrainJob(jobID, strconv.FormatInt(job.VersionID, 10)) if err != nil { ctx.NotFound(err) @@ -181,6 +182,29 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { job.Duration = result.Duration / 1000 job.TrainJobDuration = models.ConvertDurationToStr(job.Duration) + if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 { + job.EndTime = job.StartTime.Add(job.Duration) + } + job.CorrectCreateUnix() + err = models.UpdateTrainJobVersion(job) + if err != nil { + log.Error("UpdateJob failed:", err) + } + } else if job.Type == models.TypeCloudBrainGrampus { + result, err := grampus.GetJob(jobID) + if err != nil { + log.Error("GetJob(%s) failed:%v", job.JobName, err) + ctx.NotFound(err) + return + } + + if job.StartTime == 0 && result.JobInfo.StartedAt > 0 { + job.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt / 1000) + } + job.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + job.Duration = result.JobInfo.RunSec + job.TrainJobDuration = models.ConvertDurationToStr(job.Duration) + if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 { job.EndTime = job.StartTime.Add(job.Duration) } diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index df27a12c2..b95fe2fe5 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -2,6 +2,7 @@ package repo import ( "bufio" + "code.gitea.io/gitea/modules/grampus" "encoding/json" "errors" "fmt" @@ -1492,7 +1493,31 @@ func SyncCloudbrainStatus() { } else { log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType) } + } else if task.Type == models.TypeCloudBrainGrampus { + result, err := grampus.GetJob(task.JobID) + if err != nil { + log.Error("GetTrainJob(%s) failed:%v", task.JobName, err) + continue + } + if result != nil { + task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + task.Duration = result.JobInfo.RunSec + task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) + + if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { + task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt / 1000) + } + if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { + task.EndTime = task.StartTime.Add(task.Duration) + } + task.CorrectCreateUnix() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + continue + } + } } else { log.Error("task.Type(%s) is error:%d", task.JobName, task.Type) } diff --git a/routers/repo/modelarts.go b/routers/repo/modelarts.go index dfb2631a0..ea4ff3b1d 100755 --- a/routers/repo/modelarts.go +++ b/routers/repo/modelarts.go @@ -577,6 +577,7 @@ func TrainJobIndex(ctx *context.Context) { JobTypes: jobTypes, IsLatestVersion: modelarts.IsLatestVersion, ComputeResource: listType, + Type: models.TypeCloudBrainAll, }) if err != nil { ctx.ServerError("Cloudbrain", err)