diff --git a/center/center.go b/center/center.go index 06ea8eea..dbffbbf0 100644 --- a/center/center.go +++ b/center/center.go @@ -8,7 +8,7 @@ import ( "github.com/ccfos/nightingale/v6/alert/astats" "github.com/ccfos/nightingale/v6/alert/process" "github.com/ccfos/nightingale/v6/center/cconf" - "github.com/ccfos/nightingale/v6/center/idents" + "github.com/ccfos/nightingale/v6/center/metas" "github.com/ccfos/nightingale/v6/center/sso" "github.com/ccfos/nightingale/v6/conf" "github.com/ccfos/nightingale/v6/memsto" @@ -18,6 +18,7 @@ import ( "github.com/ccfos/nightingale/v6/pkg/i18nx" "github.com/ccfos/nightingale/v6/pkg/logx" "github.com/ccfos/nightingale/v6/prom" + "github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/writer" "github.com/ccfos/nightingale/v6/storage" @@ -54,11 +55,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { return nil, err } - idents := idents.New(db, redis) + metas := metas.New(redis) + idents := idents.New(db) syncStats := memsto.NewSyncStats() alertStats := astats.NewSyncStats() - // idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset) + sso := sso.Init(config.Center, ctx) busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats) @@ -76,8 +78,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { writers := writer.NewWriters(config.Pushgw) alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors) - centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, notifyConfigCache, promClients, redis, sso, ctx, idents) - pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx) + centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, notifyConfigCache, promClients, redis, sso, ctx, metas) + pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx) r := httpx.GinEngine(config.Global.RunMode, config.HTTP) diff --git a/center/idents/idents.go b/center/metas/metas.go similarity index 59% rename from center/idents/idents.go rename to center/metas/metas.go index 639224e7..c2841222 100644 --- a/center/idents/idents.go +++ b/center/metas/metas.go @@ -1,4 +1,4 @@ -package idents +package metas import ( "context" @@ -9,21 +9,17 @@ import ( "github.com/ccfos/nightingale/v6/storage" "github.com/toolkits/pkg/logger" - "github.com/toolkits/pkg/slice" - "gorm.io/gorm" ) type Set struct { sync.RWMutex items map[string]models.HostMeta - db *gorm.DB redis storage.Redis } -func New(db *gorm.DB, redis storage.Redis) *Set { +func New(redis storage.Redis) *Set { set := &Set{ items: make(map[string]models.HostMeta), - db: db, redis: redis, } @@ -74,14 +70,13 @@ func (s *Set) persist() { func (s *Set) updateMeta(items map[string]models.HostMeta) { m := make(map[string]models.HostMeta, 100) - now := time.Now().Unix() num := 0 for _, meta := range items { m[meta.Hostname] = meta num++ if num == 100 { - if err := s.updateTargets(m, now); err != nil { + if err := s.updateTargets(m); err != nil { logger.Errorf("failed to update targets: %v", err) } m = make(map[string]models.HostMeta, 100) @@ -89,12 +84,12 @@ func (s *Set) updateMeta(items map[string]models.HostMeta) { } } - if err := s.updateTargets(m, now); err != nil { + if err := s.updateTargets(m); err != nil { logger.Errorf("failed to update targets: %v", err) } } -func (s *Set) updateTargets(m map[string]models.HostMeta, now int64) error { +func (s *Set) updateTargets(m map[string]models.HostMeta) error { count := int64(len(m)) if count == 0 { return nil @@ -106,33 +101,5 @@ func (s *Set) updateTargets(m map[string]models.HostMeta, now int64) error { values = append(values, meta) } err := s.redis.MSet(context.Background(), values...).Err() - if err != nil { - return err - } - var lst []string - for ident := range m { - lst = append(lst, ident) - } - - // there are some idents not found in db, so insert them - var exists []string - err = s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error - if err != nil { - return err - } - - err = s.db.Table("target").Where("ident in ?", exists).Update("update_at", now).Error - if err != nil { - logger.Error("failed to update target:", exists, "error:", err) - } - - news := slice.SubString(lst, exists) - for i := 0; i < len(news); i++ { - err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error - if err != nil { - logger.Error("failed to insert target:", news[i], "error:", err) - } - } - - return nil + return err } diff --git a/center/router/router.go b/center/router/router.go index fe48e816..f4d7c000 100644 --- a/center/router/router.go +++ b/center/router/router.go @@ -9,7 +9,7 @@ import ( "github.com/ccfos/nightingale/v6/center/cconf" "github.com/ccfos/nightingale/v6/center/cstats" - "github.com/ccfos/nightingale/v6/center/idents" + "github.com/ccfos/nightingale/v6/center/metas" "github.com/ccfos/nightingale/v6/center/sso" "github.com/ccfos/nightingale/v6/memsto" "github.com/ccfos/nightingale/v6/pkg/aop" @@ -30,13 +30,13 @@ type Router struct { NotifyConfigCache *memsto.NotifyConfigCacheType PromClients *prom.PromClientMap Redis storage.Redis - IdentSet *idents.Set + MetaSet *metas.Set Sso *sso.SsoClient Ctx *ctx.Context } func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operation, ds *memsto.DatasourceCacheType, ncc *memsto.NotifyConfigCacheType, - pc *prom.PromClientMap, redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, identSet *idents.Set) *Router { + pc *prom.PromClientMap, redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, metaSet *metas.Set) *Router { return &Router{ HTTP: httpConfig, Center: center, @@ -47,7 +47,7 @@ func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operatio Redis: redis, Sso: sso, Ctx: ctx, - IdentSet: identSet, + MetaSet: metaSet, } } diff --git a/center/router/router_heartbeat.go b/center/router/router_heartbeat.go index 81175315..efaa77f3 100644 --- a/center/router/router_heartbeat.go +++ b/center/router/router_heartbeat.go @@ -36,6 +36,6 @@ func (rt *Router) heartbeat(c *gin.Context) { ginx.Dangerous(err) req.Offset = (time.Now().UnixMilli() - req.UnixTime) - rt.IdentSet.Set(req.Hostname, req) + rt.MetaSet.Set(req.Hostname, req) ginx.NewRender(c).Message(nil) } diff --git a/pushgw/idents/idents.go b/pushgw/idents/idents.go new file mode 100644 index 00000000..992fb0ff --- /dev/null +++ b/pushgw/idents/idents.go @@ -0,0 +1,115 @@ +package idents + +import ( + "sync" + "time" + + "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/slice" + "gorm.io/gorm" +) + +type Set struct { + sync.Mutex + items map[string]struct{} + db *gorm.DB +} + +func New(db *gorm.DB) *Set { + set := &Set{ + items: make(map[string]struct{}), + db: db, + } + + set.Init() + return set +} + +func (s *Set) Init() { + go s.LoopPersist() +} + +func (s *Set) MSet(items map[string]struct{}) { + s.Lock() + defer s.Unlock() + for ident := range items { + s.items[ident] = struct{}{} + } +} + +func (s *Set) LoopPersist() { + for { + time.Sleep(time.Second) + s.persist() + } +} + +func (s *Set) persist() { + var items map[string]struct{} + + s.Lock() + if len(s.items) == 0 { + s.Unlock() + return + } + + items = s.items + s.items = make(map[string]struct{}) + s.Unlock() + + s.updateTimestamp(items) +} + +func (s *Set) updateTimestamp(items map[string]struct{}) { + lst := make([]string, 0, 100) + now := time.Now().Unix() + num := 0 + for ident := range items { + lst = append(lst, ident) + num++ + if num == 100 { + if err := s.updateTargets(lst, now); err != nil { + logger.Errorf("failed to update targets: %v", err) + } + lst = lst[:0] + num = 0 + } + } + + if err := s.updateTargets(lst, now); err != nil { + logger.Errorf("failed to update targets: %v", err) + } +} + +func (s *Set) updateTargets(lst []string, now int64) error { + count := int64(len(lst)) + if count == 0 { + return nil + } + + ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now) + if ret.Error != nil { + return ret.Error + } + + if ret.RowsAffected == count { + return nil + } + + // there are some idents not found in db, so insert them + var exists []string + err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error + if err != nil { + return err + } + + news := slice.SubString(lst, exists) + for i := 0; i < len(news); i++ { + err = s.db.Exec("INSERT INTO target(ident, update_at, cluster) VALUES(?, ?, ?)", news[i], now, "").Error + if err != nil { + logger.Error("failed to insert target:", news[i], "error:", err) + } + } + + return nil +} diff --git a/pushgw/pushgw.go b/pushgw/pushgw.go index 0cbf82ff..b7a7d471 100644 --- a/pushgw/pushgw.go +++ b/pushgw/pushgw.go @@ -9,12 +9,14 @@ import ( "github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/httpx" "github.com/ccfos/nightingale/v6/pkg/logx" + "github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/router" "github.com/ccfos/nightingale/v6/pushgw/writer" "github.com/ccfos/nightingale/v6/storage" ) type PushgwProvider struct { + Ident *idents.Set Router *router.Router } @@ -35,6 +37,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { } ctx := ctx.NewContext(context.Background(), db) + idents := idents.New(db) + stats := memsto.NewSyncStats() busiGroupCache := memsto.NewBusiGroupCache(ctx, stats) @@ -43,7 +47,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { writers := writer.NewWriters(config.Pushgw) r := httpx.GinEngine(config.Global.RunMode, config.HTTP) - rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx) + rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx) rt.Config(r) httpClean := httpx.Init(config.HTTP, r) diff --git a/pushgw/router/router.go b/pushgw/router/router.go index f099e204..1e05d283 100644 --- a/pushgw/router/router.go +++ b/pushgw/router/router.go @@ -6,6 +6,7 @@ import ( "github.com/ccfos/nightingale/v6/memsto" "github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/httpx" + "github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/pconf" "github.com/ccfos/nightingale/v6/pushgw/writer" ) @@ -15,18 +16,19 @@ type Router struct { Pushgw pconf.Pushgw TargetCache *memsto.TargetCacheType BusiGroupCache *memsto.BusiGroupCacheType - // IdentSet *idents.Set - Writers *writer.WritersType - Ctx *ctx.Context + IdentSet *idents.Set + Writers *writer.WritersType + Ctx *ctx.Context } -func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, writers *writer.WritersType, ctx *ctx.Context) *Router { +func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, idents *idents.Set, writers *writer.WritersType, ctx *ctx.Context) *Router { return &Router{ HTTP: httpConfig, Writers: writers, Ctx: ctx, TargetCache: tc, BusiGroupCache: bg, + IdentSet: idents, } } diff --git a/pushgw/router/router_datadog.go b/pushgw/router/router_datadog.go index 06c0bee4..fe6cb060 100644 --- a/pushgw/router/router_datadog.go +++ b/pushgw/router/router_datadog.go @@ -224,6 +224,7 @@ func (r *Router) datadogSeries(c *gin.Context) { succ int fail int msg = "received" + ids = make(map[string]struct{}) ) for i := 0; i < cnt; i++ { @@ -245,6 +246,9 @@ func (r *Router) datadogSeries(c *gin.Context) { } if ident != "" { + // register host + ids[ident] = struct{}{} + // fill tags target, has := r.TargetCache.Get(ident) if has { @@ -269,6 +273,7 @@ func (r *Router) datadogSeries(c *gin.Context) { if succ > 0 { CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ)) + r.IdentSet.MSet(ids) } c.JSON(200, gin.H{ diff --git a/pushgw/router/router_openfalcon.go b/pushgw/router/router_openfalcon.go index 1f9e61c7..99a54c3a 100644 --- a/pushgw/router/router_openfalcon.go +++ b/pushgw/router/router_openfalcon.go @@ -180,6 +180,7 @@ func (rt *Router) falconPush(c *gin.Context) { fail int msg = "received" ts = time.Now().Unix() + ids = make(map[string]struct{}) ) for i := 0; i < len(arr); i++ { @@ -195,6 +196,9 @@ func (rt *Router) falconPush(c *gin.Context) { } if ident != "" { + // register host + ids[ident] = struct{}{} + // fill tags target, has := rt.TargetCache.Get(ident) if has { @@ -219,6 +223,7 @@ func (rt *Router) falconPush(c *gin.Context) { if succ > 0 { CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ)) + rt.IdentSet.MSet(ids) } c.JSON(200, gin.H{ diff --git a/pushgw/router/router_opentsdb.go b/pushgw/router/router_opentsdb.go index f665da5d..09eb43ea 100644 --- a/pushgw/router/router_opentsdb.go +++ b/pushgw/router/router_opentsdb.go @@ -166,6 +166,7 @@ func (rt *Router) openTSDBPut(c *gin.Context) { fail int msg = "received" ts = time.Now().Unix() + ids = make(map[string]struct{}) ) for i := 0; i < len(arr); i++ { @@ -190,6 +191,9 @@ func (rt *Router) openTSDBPut(c *gin.Context) { host, has := arr[i].Tags["ident"] if has { + // register host + ids[host] = struct{}{} + // fill tags target, has := rt.TargetCache.Get(host) if has { @@ -214,6 +218,7 @@ func (rt *Router) openTSDBPut(c *gin.Context) { if succ > 0 { CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ)) + rt.IdentSet.MSet(ids) } c.JSON(200, gin.H{ diff --git a/pushgw/router/router_remotewrite.go b/pushgw/router/router_remotewrite.go index 3b98c24a..df46484e 100644 --- a/pushgw/router/router_remotewrite.go +++ b/pushgw/router/router_remotewrite.go @@ -80,6 +80,7 @@ func (rt *Router) remoteWrite(c *gin.Context) { var ( ident string metric string + ids = make(map[string]struct{}) ) for i := 0; i < count; i++ { @@ -98,6 +99,9 @@ func (rt *Router) remoteWrite(c *gin.Context) { } if len(ident) > 0 { + // register host + ids[ident] = struct{}{} + // fill tags target, has := rt.TargetCache.Get(ident) if has { @@ -119,6 +123,7 @@ func (rt *Router) remoteWrite(c *gin.Context) { } CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count)) + rt.IdentSet.MSet(ids) } // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling