Browse Source

get all log

tags/v1.22.1.3
lewis 4 years ago
parent
commit
e116706f3e
3 changed files with 122 additions and 22 deletions
  1. +26
    -10
      models/cloudbrain.go
  2. +68
    -4
      modules/cloudbrain/resty.go
  3. +28
    -8
      routers/api/v1/repo/cloudbrain.go

+ 26
- 10
models/cloudbrain.go View File

@@ -476,7 +476,7 @@ type MatchInfo struct {

type GetJobLogResult struct {
ScrollID string `json:"_scroll_id"`
Took int `json:"took"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
@@ -485,18 +485,34 @@ type GetJobLogResult struct {
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Hits []struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Source struct {
Message string `json:"message"`
} `json:"_source"`
Sort []int `json:"sort"`
} `json:"hits"`
Hits []Hits `json:"hits"`
} `json:"hits"`
}

type Hits struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Source struct {
Message string `json:"message"`
} `json:"_source"`
Sort []int `json:"sort"`
}

type GetAllJobLogParams struct {
Scroll string `json:"scroll"`
ScrollID string `json:"scroll_id"`
}

type DeleteJobLogTokenParams struct {
ScrollID string `json:"scroll_id"`
}

type DeleteJobLogTokenResult struct {
Succeeded bool `json:"succeeded"`
NumFreed int `json:"num_freed"`
}

type CloudBrainResult struct {
Code string `json:"code"`
Msg string `json:"msg"`


+ 68
- 4
modules/cloudbrain/resty.go View File

@@ -26,6 +26,8 @@ const (
JobHasBeenStopped = "S410"
Public = "public"
Custom = "custom"
LogPageSize = 500
LogPageTokenExpired = "5m"
)

func getRestyClient() *resty.Client {
@@ -279,7 +281,7 @@ func GetJobLog(jobID string) (*models.GetJobLogResult, error) {
client := getRestyClient()
var result models.GetJobLogResult
req := models.GetJobLogParams{
Size: "5000",
Size: strconv.Itoa(LogPageSize),
Sort: "log.offset",
QueryInfo: models.QueryInfo{
MatchInfo: models.MatchInfo{
@@ -293,17 +295,79 @@ func GetJobLog(jobID string) (*models.GetJobLogResult, error) {
SetAuthToken(TOKEN).
SetBody(req).
SetResult(&result).
Post(HOST + "es/_search?_source=message&scroll=5m")
Post(HOST + "es/_search?_source=message&scroll=" + LogPageTokenExpired)

if err != nil {
log.Info("GetJobLog failed: %v", err)
log.Error("GetJobLog failed: %v", err)
return &result, fmt.Errorf("resty GetJobLog: %v, %s", err, res.String())
}

if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) {
log.Info("res.Status(): %s, response: %s", res.Status(), res.String())
log.Error("res.Status(): %s, response: %s", res.Status(), res.String())
return &result, errors.New(res.String())
}

return &result, nil
}

func GetJobAllLog(scrollID string) (*models.GetJobLogResult, error) {
checkSetting()
client := getRestyClient()
var result models.GetJobLogResult
req := models.GetAllJobLogParams{
Scroll: LogPageTokenExpired,
ScrollID: scrollID,
}

res, err := client.R().
SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetBody(req).
SetResult(&result).
Post(HOST + "es/_search/scroll")

if err != nil {
log.Error("GetJobAllLog failed: %v", err)
return &result, fmt.Errorf("resty GetJobAllLog: %v, %s", err, res.String())
}

if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) {
log.Error("res.Status(): %s, response: %s", res.Status(), res.String())
return &result, errors.New(res.String())
}

return &result, nil
}

func DeleteJobLogToken(scrollID string) (error) {
checkSetting()
client := getRestyClient()
var result models.DeleteJobLogTokenResult
req := models.DeleteJobLogTokenParams{
ScrollID: scrollID,
}

res, err := client.R().
SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetBody(req).
SetResult(&result).
Delete(HOST + "es/_search/scroll")

if err != nil {
log.Error("DeleteJobLogToken failed: %v", err)
return fmt.Errorf("resty DeleteJobLogToken: %v, %s", err, res.String())
}

if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) {
log.Error("res.Status(): %s, response: %s", res.Status(), res.String())
return errors.New(res.String())
}

if !result.Succeeded {
log.Error("DeleteJobLogToken failed")
return errors.New("DeleteJobLogToken failed")
}

return nil
}

+ 28
- 8
routers/api/v1/repo/cloudbrain.go View File

@@ -102,25 +102,45 @@ func CloudbrainGetLog(ctx *context.Context) {
return
}

var hits []models.Hits
result, err := cloudbrain.GetJobLog(jobID)
if err != nil{
log.Error("GetJobLog failed: %v", err, ctx.Data["MsgID"])
ctx.ServerError(err.Error(), err)
return
}
hits = result.Hits.Hits

//if the size equal page_size, then take the scroll_id to get all log and delete the scroll_id(the num of scroll_id is limited)
if len(result.Hits.Hits) >= cloudbrain.LogPageSize {
for {
resultNext, err := cloudbrain.GetJobAllLog(result.ScrollID)
if err != nil{
log.Error("GetJobAllLog failed: %v", err, ctx.Data["MsgID"])
} else {
for _, hit := range resultNext.Hits.Hits {
hits = append(hits, hit)
}
}

if len(resultNext.Hits.Hits) < cloudbrain.LogPageSize {
log.Info("get all log already")
break
}
}
}

sort.Slice(result.Hits.Hits, func(i, j int) bool {
return result.Hits.Hits[i].Sort[0] < result.Hits.Hits[j].Sort[0]
cloudbrain.DeleteJobLogToken(result.ScrollID)

sort.Slice(hits, func(i, j int) bool {
return hits[i].Sort[0] < hits[j].Sort[0]
})

log.Info("%v", result.Hits.Hits)
var content []string
for _, log := range result.Hits.Hits {
content = append(content, log.Source.Message + "\n")
var content string
for _, log := range hits {
content += log.Source.Message + "\n"
}

log.Info("%v", content)

ctx.JSON(http.StatusOK, map[string]interface{}{
"JobID": jobID,
"Content": content,


Loading…
Cancel
Save