|
- package prometheus
-
- import (
- "bufio"
- "bytes"
- "fmt"
- "io"
- "math"
- "mime"
- "net/http"
- "time"
-
- "github.com/influxdata/telegraf"
- "github.com/influxdata/telegraf/metric"
- . "github.com/influxdata/telegraf/plugins/parsers/prometheus/common"
-
- "github.com/matttproud/golang_protobuf_extensions/pbutil"
- dto "github.com/prometheus/client_model/go"
- "github.com/prometheus/common/expfmt"
- )
-
- func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
- var parser expfmt.TextParser
- var metrics []telegraf.Metric
- var err error
- // parse even if the buffer begins with a newline
- buf = bytes.TrimPrefix(buf, []byte("\n"))
- // Read raw data
- buffer := bytes.NewBuffer(buf)
- reader := bufio.NewReader(buffer)
-
- // Prepare output
- metricFamilies := make(map[string]*dto.MetricFamily)
-
- if isProtobuf(header) {
- for {
- mf := &dto.MetricFamily{}
- if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
- if ierr == io.EOF {
- break
- }
- return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
- }
- metricFamilies[mf.GetName()] = mf
- }
- } else {
- metricFamilies, err = parser.TextToMetricFamilies(reader)
- if err != nil {
- return nil, fmt.Errorf("reading text format failed: %s", err)
- }
- }
-
- now := time.Now()
- // read metrics
- for metricName, mf := range metricFamilies {
- for _, m := range mf.Metric {
- // reading tags
- tags := MakeLabels(m, nil)
-
- // reading fields
- var fields map[string]interface{}
- if mf.GetType() == dto.MetricType_SUMMARY {
- // summary metric
- fields = makeQuantiles(m)
- fields["count"] = float64(m.GetSummary().GetSampleCount())
- fields["sum"] = float64(m.GetSummary().GetSampleSum())
- } else if mf.GetType() == dto.MetricType_HISTOGRAM {
- // histogram metric
- fields = makeBuckets(m)
- fields["count"] = float64(m.GetHistogram().GetSampleCount())
- fields["sum"] = float64(m.GetHistogram().GetSampleSum())
-
- } else {
- // standard metric
- fields = getNameAndValue(m)
- }
- // converting to telegraf metric
- if len(fields) > 0 {
- var t time.Time
- if m.TimestampMs != nil && *m.TimestampMs > 0 {
- t = time.Unix(0, *m.TimestampMs*1000000)
- } else {
- t = now
- }
- metric, err := metric.New(metricName, tags, fields, t, ValueType(mf.GetType()))
- if err == nil {
- metrics = append(metrics, metric)
- }
- }
- }
- }
-
- return metrics, err
- }
-
- func isProtobuf(header http.Header) bool {
- mediatype, params, error := mime.ParseMediaType(header.Get("Content-Type"))
-
- if error != nil {
- return false
- }
-
- return mediatype == "application/vnd.google.protobuf" &&
- params["encoding"] == "delimited" &&
- params["proto"] == "io.prometheus.client.MetricFamily"
- }
-
- // Get Quantiles from summary metric
- func makeQuantiles(m *dto.Metric) map[string]interface{} {
- fields := make(map[string]interface{})
- for _, q := range m.GetSummary().Quantile {
- if !math.IsNaN(q.GetValue()) {
- fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
- }
- }
- return fields
- }
-
- // Get Buckets from histogram metric
- func makeBuckets(m *dto.Metric) map[string]interface{} {
- fields := make(map[string]interface{})
- for _, b := range m.GetHistogram().Bucket {
- fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
- }
- return fields
- }
-
- // Get name and value from metric
- func getNameAndValue(m *dto.Metric) map[string]interface{} {
- fields := make(map[string]interface{})
- if m.Gauge != nil {
- if !math.IsNaN(m.GetGauge().GetValue()) {
- fields["gauge"] = float64(m.GetGauge().GetValue())
- }
- } else if m.Counter != nil {
- if !math.IsNaN(m.GetCounter().GetValue()) {
- fields["counter"] = float64(m.GetCounter().GetValue())
- }
- } else if m.Untyped != nil {
- if !math.IsNaN(m.GetUntyped().GetValue()) {
- fields["value"] = float64(m.GetUntyped().GetValue())
- }
- }
- return fields
- }
|