From b0c05368f73634fb0ab0068876e5ae61bcc4dfeb Mon Sep 17 00:00:00 2001 From: Yening Qin <710leo@gmail.com> Date: Tue, 13 Dec 2022 16:24:23 +0800 Subject: [PATCH] n9e server support multi cluster alert (#1318) * support multi * refactor * code refactor * refactor * code refactor * fix run mult cluster rule * code refactor * add alerting_engine api * add alerting_engine api * update sql * refactor recording push * refactor * refactor * delete useless cluster * split to fields * change stats * change stats --- docker/initsql/a-n9e.sql | 5 +- .../a-n9e-for-Postgres.sql | 1 - etc/server.conf | 13 ++ src/models/alert_cur_event.go | 4 +- src/models/alerting_engine.go | 66 +++++- src/server/config/config.go | 27 ++- src/server/config/prom_client.go | 50 +++-- src/server/config/prom_option.go | 7 + src/server/config/reader.go | 98 +++++---- src/server/engine/engine.go | 7 +- src/server/engine/mute.go | 17 ++ src/server/engine/worker.go | 195 ++++++++++++------ src/server/idents/idents.go | 13 +- src/server/memsto/alert_mute_cache.go | 18 +- src/server/memsto/alert_rule_cache.go | 16 +- src/server/memsto/alert_subsribe_cache.go | 16 +- src/server/memsto/busi_group_cache.go | 2 +- src/server/memsto/recording_rule_cache.go | 12 +- src/server/memsto/target_cache.go | 2 +- src/server/memsto/user_cache.go | 2 +- src/server/memsto/user_group_cache.go | 3 +- src/server/naming/hashring.go | 56 +++-- src/server/naming/heartbeat.go | 72 +++++-- src/server/naming/leader.go | 4 +- src/server/router/router.go | 2 +- src/server/router/router_datadog.go | 2 +- src/server/router/router_event.go | 12 +- src/server/router/router_memsto.go | 30 ++- src/server/router/router_openfalcon.go | 2 +- src/server/router/router_opentsdb.go | 2 +- src/server/router/router_prom.go | 6 +- src/server/stat/stat.go | 4 +- src/server/writer/writer.go | 55 +++-- src/webapi/router/router.go | 2 + src/webapi/router/router_server.go | 18 +- 35 files changed, 570 insertions(+), 271 deletions(-) diff --git a/docker/initsql/a-n9e.sql b/docker/initsql/a-n9e.sql index 724df9d1..91c15f02 100644 --- a/docker/initsql/a-n9e.sql +++ b/docker/initsql/a-n9e.sql @@ -525,6 +525,5 @@ CREATE TABLE `alerting_engines` `instance` varchar(128) not null default '' comment 'instance identification, e.g. 10.9.0.9:9090', `cluster` varchar(128) not null default '' comment 'target reader cluster', `clock` bigint not null, - PRIMARY KEY (`id`), - UNIQUE KEY (`instance`) -) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; + PRIMARY KEY (`id`) +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; \ No newline at end of file diff --git a/docker/initsql_for_postgres/a-n9e-for-Postgres.sql b/docker/initsql_for_postgres/a-n9e-for-Postgres.sql index 4d42db63..e6725127 100644 --- a/docker/initsql_for_postgres/a-n9e-for-Postgres.sql +++ b/docker/initsql_for_postgres/a-n9e-for-Postgres.sql @@ -610,6 +610,5 @@ CREATE TABLE alerting_engines clock bigint not null ) ; ALTER TABLE alerting_engines ADD CONSTRAINT alerting_engines_pk PRIMARY KEY (id); -ALTER TABLE alerting_engines ADD CONSTRAINT alerting_engines_un UNIQUE (instance); COMMENT ON COLUMN alerting_engines.instance IS 'instance identification, e.g. 10.9.0.9:9090'; COMMENT ON COLUMN alerting_engines.cluster IS 'target reader cluster'; diff --git a/etc/server.conf b/etc/server.conf index b80fc250..e63c474c 100644 --- a/etc/server.conf +++ b/etc/server.conf @@ -165,6 +165,19 @@ Timeout = 30000 DialTimeout = 3000 MaxIdleConnsPerHost = 100 +# [[Readers]] +# ClusterName = "Default" +# prometheus base url +# Url = "http://127.0.0.1:9090" +# Basic auth username +# BasicAuthUser = "" +# Basic auth password +# BasicAuthPass = "" +# timeout settings, unit: ms +# Timeout = 30000 +# DialTimeout = 3000 +# MaxIdleConnsPerHost = 100 + [WriterOpt] # queue channel count QueueCount = 1000 diff --git a/src/models/alert_cur_event.go b/src/models/alert_cur_event.go index 1e161ef3..0b6c4624 100644 --- a/src/models/alert_cur_event.go +++ b/src/models/alert_cur_event.go @@ -420,9 +420,9 @@ func AlertCurEventGetByIds(ids []int64) ([]*AlertCurEvent, error) { return lst, err } -func AlertCurEventGetByRule(ruleId int64) ([]*AlertCurEvent, error) { +func AlertCurEventGetByRuleIdAndCluster(ruleId int64, cluster string) ([]*AlertCurEvent, error) { var lst []*AlertCurEvent - err := DB().Where("rule_id=?", ruleId).Find(&lst).Error + err := DB().Where("rule_id=? and cluster=?", ruleId, cluster).Find(&lst).Error return lst, err } diff --git a/src/models/alerting_engine.go b/src/models/alerting_engine.go index 18f4add9..b14f2cdf 100644 --- a/src/models/alerting_engine.go +++ b/src/models/alerting_engine.go @@ -1,6 +1,9 @@ package models -import "time" +import ( + "fmt" + "time" +) type AlertingEngines struct { Id int64 `json:"id" gorm:"primaryKey"` @@ -15,23 +18,62 @@ func (e *AlertingEngines) TableName() string { // UpdateCluster 页面上用户会给各个n9e-server分配要关联的目标集群是什么 func (e *AlertingEngines) UpdateCluster(c string) error { + count, err := Count(DB().Model(&AlertingEngines{}).Where("id<>? and instance=? and cluster=?", e.Id, e.Instance, c)) + if err != nil { + return err + } + + if count > 0 { + return fmt.Errorf("instance %s and cluster %s already exists", e.Instance, c) + } + e.Cluster = c return DB().Model(e).Select("cluster").Updates(e).Error } +func AlertingEngineAdd(instance, cluster string) error { + count, err := Count(DB().Model(&AlertingEngines{}).Where("instance=? and cluster=?", instance, cluster)) + if err != nil { + return err + } + + if count > 0 { + return fmt.Errorf("instance %s and cluster %s already exists", instance, cluster) + } + + err = DB().Create(&AlertingEngines{ + Instance: instance, + Cluster: cluster, + Clock: time.Now().Unix(), + }).Error + + return err +} + +func AlertingEngineDel(ids []int64) error { + if len(ids) == 0 { + return nil + } + return DB().Where("id in ?", ids).Delete(new(AlertingEngines)).Error +} + // AlertingEngineGetCluster 根据实例名获取对应的集群名字 -func AlertingEngineGetCluster(instance string) (string, error) { +func AlertingEngineGetClusters(instance string) ([]string, error) { var objs []AlertingEngines err := DB().Where("instance=?", instance).Find(&objs).Error if err != nil { - return "", err + return []string{}, err } if len(objs) == 0 { - return "", nil + return []string{}, nil + } + var clusters []string + for i := 0; i < len(objs); i++ { + clusters = append(clusters, objs[i].Cluster) } - return objs[0].Cluster, nil + return clusters, nil } // AlertingEngineGets 拉取列表数据,用户要在页面上看到所有 n9e-server 实例列表,然后为其分配 cluster @@ -72,9 +114,9 @@ func AlertingEngineGetsInstances(where string, args ...interface{}) ([]string, e return arr, err } -func AlertingEngineHeartbeat(instance, cluster string) error { +func AlertingEngineHeartbeatWithCluster(instance, cluster string) error { var total int64 - err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Count(&total).Error + err := DB().Model(new(AlertingEngines)).Where("instance=? and cluster=?", instance, cluster).Count(&total).Error if err != nil { return err } @@ -88,9 +130,15 @@ func AlertingEngineHeartbeat(instance, cluster string) error { }).Error } else { // updates - fields := map[string]interface{}{"clock": time.Now().Unix(), "cluster": cluster} - err = DB().Model(new(AlertingEngines)).Where("instance=?", instance).Updates(fields).Error + fields := map[string]interface{}{"clock": time.Now().Unix()} + err = DB().Model(new(AlertingEngines)).Where("instance=? and cluster=?", instance, cluster).Updates(fields).Error } return err } + +func AlertingEngineHeartbeat(instance string) error { + fields := map[string]interface{}{"clock": time.Now().Unix()} + err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Updates(fields).Error + return err +} diff --git a/src/server/config/config.go b/src/server/config/config.go index c3487190..fae270dd 100644 --- a/src/server/config/config.go +++ b/src/server/config/config.go @@ -64,12 +64,19 @@ func DealConfigCrypto(key string) { } C.Ibex.BasicAuthPass = decryptIbexPwd - decryptReaderPwd, err := secu.DealWithDecrypt(C.Reader.BasicAuthPass, key) - if err != nil { - fmt.Println("failed to decrypt the reader password", err) - os.Exit(1) + if len(C.Readers) == 0 { + C.Reader.ClusterName = C.ClusterName + C.Readers = append(C.Readers, C.Reader) + } + + for index, v := range C.Readers { + decryptReaderPwd, err := secu.DealWithDecrypt(v.BasicAuthPass, key) + if err != nil { + fmt.Printf("failed to decrypt the reader password: %s , error: %s", v.BasicAuthPass, err.Error()) + os.Exit(1) + } + C.Readers[index].BasicAuthPass = decryptReaderPwd } - C.Reader.BasicAuthPass = decryptReaderPwd for index, v := range C.Writers { decryptWriterPwd, err := secu.DealWithDecrypt(v.BasicAuthPass, key) @@ -217,7 +224,11 @@ func MustLoad(key string, fpaths ...string) { C.WriterOpt.ShardingKey = "ident" } - for _, write := range C.Writers { + for i, write := range C.Writers { + if C.Writers[i].Name == "" { + C.Writers[i].Name = C.ClusterName + } + for _, relabel := range write.WriteRelabels { regex, ok := relabel.Regex.(string) if !ok { @@ -251,7 +262,7 @@ func MustLoad(key string, fpaths ...string) { type Config struct { RunMode string - ClusterName string + ClusterName string // 监控对象上报时,指定的集群名称 BusiGroupLabelKey string EngineDelay int64 DisableUsageReport bool @@ -269,10 +280,12 @@ type Config struct { WriterOpt WriterGlobalOpt Writers []WriterOptions Reader PromOption + Readers []PromOption Ibex Ibex } type WriterOptions struct { + Name string Url string BasicAuthUser string BasicAuthPass string diff --git a/src/server/config/prom_client.go b/src/server/config/prom_client.go index de550511..e60cca61 100644 --- a/src/server/config/prom_client.go +++ b/src/server/config/prom_client.go @@ -6,40 +6,38 @@ import ( "github.com/didi/nightingale/v5/src/pkg/prom" ) -type PromClient struct { - prom.API - ClusterName string +type PromClientMap struct { sync.RWMutex + Clients map[string]prom.API } -var ReaderClient *PromClient = &PromClient{} +var ReaderClients *PromClientMap = &PromClientMap{Clients: make(map[string]prom.API)} -func (pc *PromClient) Set(clusterName string, c prom.API) { +func (pc *PromClientMap) Set(clusterName string, c prom.API) { pc.Lock() defer pc.Unlock() - pc.ClusterName = clusterName - pc.API = c + pc.Clients[clusterName] = c } -func (pc *PromClient) Get() (string, prom.API) { +func (pc *PromClientMap) GetClusterNames() []string { pc.RLock() defer pc.RUnlock() - return pc.ClusterName, pc.API -} + var clusterNames []string + for k := range pc.Clients { + clusterNames = append(clusterNames, k) + } -func (pc *PromClient) GetClusterName() string { - pc.RLock() - defer pc.RUnlock() - return pc.ClusterName + return clusterNames } -func (pc *PromClient) GetCli() prom.API { +func (pc *PromClientMap) GetCli(cluster string) prom.API { pc.RLock() defer pc.RUnlock() - return pc.API + c := pc.Clients[cluster] + return c } -func (pc *PromClient) IsNil() bool { +func (pc *PromClientMap) IsNil(cluster string) bool { if pc == nil { return true } @@ -47,13 +45,23 @@ func (pc *PromClient) IsNil() bool { pc.RLock() defer pc.RUnlock() - return pc.API == nil + c, exists := pc.Clients[cluster] + if !exists { + return true + } + + return c == nil } -func (pc *PromClient) Reset() { +func (pc *PromClientMap) Reset() { pc.Lock() defer pc.Unlock() - pc.ClusterName = "" - pc.API = nil + pc.Clients = make(map[string]prom.API) +} + +func (pc *PromClientMap) Del(cluster string) { + pc.Lock() + defer pc.Unlock() + delete(pc.Clients, cluster) } diff --git a/src/server/config/prom_option.go b/src/server/config/prom_option.go index 3dad6dae..724ad9b5 100644 --- a/src/server/config/prom_option.go +++ b/src/server/config/prom_option.go @@ -3,6 +3,7 @@ package config import "sync" type PromOption struct { + ClusterName string Url string BasicAuthUser string BasicAuthPass string @@ -70,6 +71,12 @@ func (pos *PromOptionsStruct) Sets(clusterName string, po PromOption) { pos.Unlock() } +func (pos *PromOptionsStruct) Del(clusterName string) { + pos.Lock() + delete(pos.Data, clusterName) + pos.Unlock() +} + func (pos *PromOptionsStruct) Get(clusterName string) (PromOption, bool) { pos.RLock() defer pos.RUnlock() diff --git a/src/server/config/reader.go b/src/server/config/reader.go index 4098c8f2..e42f40db 100644 --- a/src/server/config/reader.go +++ b/src/server/config/reader.go @@ -17,7 +17,19 @@ import ( func InitReader() error { rf := strings.ToLower(strings.TrimSpace(C.ReaderFrom)) if rf == "" || rf == "config" { - return setClientFromPromOption(C.ClusterName, C.Reader) + if len(C.Readers) == 0 { + C.Reader.ClusterName = C.ClusterName + C.Readers = append(C.Readers, C.Reader) + } + + for _, reader := range C.Readers { + err := setClientFromPromOption(reader.ClusterName, reader) + if err != nil { + logger.Errorf("failed to setClientFromPromOption: %v", err) + continue + } + } + return nil } if rf == "database" { @@ -38,57 +50,71 @@ func initFromDatabase() error { } func loadFromDatabase() { - cluster, err := models.AlertingEngineGetCluster(C.Heartbeat.Endpoint) + clusters, err := models.AlertingEngineGetClusters(C.Heartbeat.Endpoint) if err != nil { logger.Errorf("failed to get current cluster, error: %v", err) return } - if cluster == "" { - ReaderClient.Reset() + if len(clusters) == 0 { + ReaderClients.Reset() logger.Warning("no datasource binded to me") return } - ckey := "prom." + cluster + ".option" - cval, err := models.ConfigsGet(ckey) - if err != nil { - logger.Errorf("failed to get ckey: %s, error: %v", ckey, err) - return - } + newCluster := make(map[string]struct{}) + for _, cluster := range clusters { + newCluster[cluster] = struct{}{} + ckey := "prom." + cluster + ".option" + cval, err := models.ConfigsGet(ckey) + if err != nil { + logger.Errorf("failed to get ckey: %s, error: %v", ckey, err) + continue + } - if cval == "" { - ReaderClient.Reset() - return - } + if cval == "" { + logger.Warningf("ckey: %s is empty", ckey) + continue + } - var po PromOption - err = json.Unmarshal([]byte(cval), &po) - if err != nil { - logger.Errorf("failed to unmarshal PromOption: %s", err) - return - } + var po PromOption + err = json.Unmarshal([]byte(cval), &po) + if err != nil { + logger.Errorf("failed to unmarshal PromOption: %s", err) + continue + } + + if ReaderClients.IsNil(cluster) { + // first time + if err = setClientFromPromOption(cluster, po); err != nil { + logger.Errorf("failed to setClientFromPromOption: %v", err) + continue + } - if ReaderClient.IsNil() { - // first time - if err = setClientFromPromOption(cluster, po); err != nil { - logger.Errorf("failed to setClientFromPromOption: %v", err) - return + logger.Info("setClientFromPromOption success: ", cluster) + PromOptions.Sets(cluster, po) + continue } - PromOptions.Sets(cluster, po) - return - } + localPo, has := PromOptions.Get(cluster) + if !has || !localPo.Equal(po) { + if err = setClientFromPromOption(cluster, po); err != nil { + logger.Errorf("failed to setClientFromPromOption: %v", err) + continue + } - localPo, has := PromOptions.Get(cluster) - if !has || !localPo.Equal(po) { - if err = setClientFromPromOption(cluster, po); err != nil { - logger.Errorf("failed to setClientFromPromOption: %v", err) - return + PromOptions.Sets(cluster, po) } + } - PromOptions.Sets(cluster, po) - return + // delete useless cluster + oldClusters := ReaderClients.GetClusterNames() + for _, oldCluster := range oldClusters { + if _, has := newCluster[oldCluster]; !has { + ReaderClients.Del(oldCluster) + PromOptions.Del(oldCluster) + logger.Info("delete cluster: ", oldCluster) + } } } @@ -121,7 +147,7 @@ func setClientFromPromOption(clusterName string, po PromOption) error { return fmt.Errorf("failed to newClientFromPromOption: %v", err) } - ReaderClient.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{ + ReaderClients.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{ BasicAuthUser: po.BasicAuthUser, BasicAuthPass: po.BasicAuthPass, Headers: po.Headers, diff --git a/src/server/engine/engine.go b/src/server/engine/engine.go index f15f5412..de36bafd 100644 --- a/src/server/engine/engine.go +++ b/src/server/engine/engine.go @@ -53,10 +53,7 @@ func Reload() { func reportQueueSize() { for { time.Sleep(time.Second) - clusterName := config.ReaderClient.GetClusterName() - if clusterName == "" { - continue - } - promstat.GaugeAlertQueueSize.WithLabelValues(clusterName).Set(float64(EventQueue.Len())) + + promstat.GaugeAlertQueueSize.Set(float64(EventQueue.Len())) } } diff --git a/src/server/engine/mute.go b/src/server/engine/mute.go index 510f46d4..074ff38c 100644 --- a/src/server/engine/mute.go +++ b/src/server/engine/mute.go @@ -1,6 +1,8 @@ package engine import ( + "strings" + "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/server/memsto" ) @@ -31,6 +33,21 @@ func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int ts = clock[0] } + // 如果不是全局的,判断 cluster + if mute.Cluster != models.ClusterAll { + // event.Cluster 是一个字符串,可能是多个cluster的组合,比如"cluster1 cluster2" + clusters := strings.Fields(mute.Cluster) + cm := make(map[string]struct{}, len(clusters)) + for i := 0; i < len(clusters); i++ { + cm[clusters[i]] = struct{}{} + } + + // 判断event.Cluster是否包含在cm中 + if _, has := cm[event.Cluster]; !has { + return false + } + } + if ts < mute.Btime || ts > mute.Etime { return false } diff --git a/src/server/engine/worker.go b/src/server/engine/worker.go index 7c419010..c8c9e576 100644 --- a/src/server/engine/worker.go +++ b/src/server/engine/worker.go @@ -39,22 +39,48 @@ func loopFilterRules(ctx context.Context) { } } +// 一个规则可能会在多个集群中生效,所以这里要把规则拆分成多个,此结构记录 id 和 cluster 的对应关系 +type RuleSimpleInfo struct { + Id int64 + Cluster string +} + func filterRules() { ids := memsto.AlertRuleCache.GetRuleIds() - logger.Debugf("AlertRuleCache.GetRuleIds success,ids.len: %d", len(ids)) + logger.Debugf("AlertRuleCache.GetRuleIds success, ids.len: %d", len(ids)) count := len(ids) - mines := make([]int64, 0, count) + mines := make([]*RuleSimpleInfo, 0, count) for i := 0; i < count; i++ { - node, err := naming.HashRing.GetNode(fmt.Sprint(ids[i])) - if err != nil { - logger.Warning("failed to get node from hashring:", err) + rule := memsto.AlertRuleCache.Get(ids[i]) + if rule == nil { + logger.Debugf("AlertRuleCache.Get(%d) failed", ids[i]) continue } - if node == config.C.Heartbeat.Endpoint { - mines = append(mines, ids[i]) + var clusters []string + if rule.Cluster == models.ClusterAll { + clusters = config.ReaderClients.GetClusterNames() + } else { + clusters = strings.Fields(rule.Cluster) + } + + for _, cluster := range clusters { + if config.ReaderClients.IsNil(cluster) { + // 没有这个集群的配置,跳过 + continue + } + + node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i])) + if err != nil { + logger.Warningf("rid:%d cluster:%s failed to get node from hashring:%v", ids[i], cluster, err) + continue + } + + if node == config.C.Heartbeat.Endpoint { + mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster}) + } } } @@ -63,6 +89,7 @@ func filterRules() { } type RuleEval struct { + cluster string rule *models.AlertRule fires *AlertCurEventMap pendings *AlertCurEventMap @@ -166,12 +193,12 @@ func (r *RuleEval) Work() { return } - if config.ReaderClient.IsNil() { + if config.ReaderClients.IsNil(r.cluster) { logger.Error("reader client is nil") return } - clusterName, readerClient := config.ReaderClient.Get() + readerClient := config.ReaderClients.GetCli(r.cluster) var value model.Value var err error @@ -192,7 +219,7 @@ func (r *RuleEval) Work() { logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value) } - r.Judge(clusterName, conv.ConvertVectors(value)) + r.Judge(r.cluster, conv.ConvertVectors(value)) } type WorkersType struct { @@ -202,22 +229,23 @@ type WorkersType struct { var Workers = &WorkersType{rules: make(map[string]*RuleEval), recordRules: make(map[string]RecordingRuleEval)} -func (ws *WorkersType) Build(rids []int64) { - rules := make(map[string]*models.AlertRule) +func (ws *WorkersType) Build(ris []*RuleSimpleInfo) { + rules := make(map[string]*RuleSimpleInfo) - for i := 0; i < len(rids); i++ { - rule := memsto.AlertRuleCache.Get(rids[i]) + for i := 0; i < len(ris); i++ { + rule := memsto.AlertRuleCache.Get(ris[i].Id) if rule == nil { continue } - hash := str.MD5(fmt.Sprintf("%d_%d_%s", + hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s", rule.Id, rule.PromEvalInterval, rule.PromQl, + ris[i].Cluster, )) - rules[hash] = rule + rules[hash] = ris[i] } // stop old @@ -235,7 +263,7 @@ func (ws *WorkersType) Build(rids []int64) { continue } - elst, err := models.AlertCurEventGetByRule(rules[hash].Id) + elst, err := models.AlertCurEventGetByRuleIdAndCluster(rules[hash].Id, rules[hash].Cluster) if err != nil { logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err) continue @@ -249,10 +277,11 @@ func (ws *WorkersType) Build(rids []int64) { fires := NewAlertCurEventMap() fires.SetAll(firemap) re := &RuleEval{ - rule: rules[hash], + rule: memsto.AlertRuleCache.Get(rules[hash].Id), quit: make(chan struct{}), fires: fires, pendings: NewAlertCurEventMap(), + cluster: rules[hash].Cluster, } go re.Start() @@ -260,27 +289,23 @@ func (ws *WorkersType) Build(rids []int64) { } } -func (ws *WorkersType) BuildRe(rids []int64) { - rules := make(map[string]*models.RecordingRule) +func (ws *WorkersType) BuildRe(ris []*RuleSimpleInfo) { + rules := make(map[string]*RuleSimpleInfo) - for i := 0; i < len(rids); i++ { - rule := memsto.RecordingRuleCache.Get(rids[i]) + for i := 0; i < len(ris); i++ { + rule := memsto.RecordingRuleCache.Get(ris[i].Id) if rule == nil { continue } - if rule.Disabled == 1 { - continue - } - hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s", rule.Id, rule.PromEvalInterval, rule.PromQl, - rule.AppendTags, + ris[i].Cluster, )) - rules[hash] = rule + rules[hash] = ris[i] } // stop old @@ -298,8 +323,9 @@ func (ws *WorkersType) BuildRe(rids []int64) { continue } re := RecordingRuleEval{ - rule: rules[hash], - quit: make(chan struct{}), + rule: memsto.RecordingRuleCache.Get(rules[hash].Id), + quit: make(chan struct{}), + cluster: rules[hash].Cluster, } go re.Start() @@ -334,7 +360,7 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vect alertingKeys := make(map[string]struct{}) for i := 0; i < count; i++ { // compute hash - hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key)) + hash := str.MD5(fmt.Sprintf("%d_%s_%s", r.rule.Id, vectors[i].Key, r.cluster)) alertingKeys[hash] = struct{}{} // rule disabled in this time span? @@ -598,17 +624,37 @@ func filterRecordingRules() { ids := memsto.RecordingRuleCache.GetRuleIds() count := len(ids) - mines := make([]int64, 0, count) + mines := make([]*RuleSimpleInfo, 0, count) for i := 0; i < count; i++ { - node, err := naming.HashRing.GetNode(fmt.Sprint(ids[i])) - if err != nil { - logger.Warning("failed to get node from hashring:", err) + rule := memsto.RecordingRuleCache.Get(ids[i]) + if rule == nil { + logger.Debugf("rule %d not found", ids[i]) continue } - if node == config.C.Heartbeat.Endpoint { - mines = append(mines, ids[i]) + var clusters []string + if rule.Cluster == models.ClusterAll { + clusters = config.ReaderClients.GetClusterNames() + } else { + clusters = strings.Fields(rule.Cluster) + } + + for _, cluster := range clusters { + if config.ReaderClients.IsNil(cluster) { + // 没有这个集群的配置,跳过 + continue + } + + node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i])) + if err != nil { + logger.Warning("failed to get node from hashring:", err) + continue + } + + if node == config.C.Heartbeat.Endpoint { + mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster}) + } } } @@ -616,8 +662,9 @@ func filterRecordingRules() { } type RecordingRuleEval struct { - rule *models.RecordingRule - quit chan struct{} + cluster string + rule *models.RecordingRule + quit chan struct{} } func (r RecordingRuleEval) Stop() { @@ -654,12 +701,12 @@ func (r RecordingRuleEval) Work() { return } - if config.ReaderClient.IsNil() { + if config.ReaderClients.IsNil(r.cluster) { log.Println("reader client is nil") return } - value, warnings, err := config.ReaderClient.GetCli().Query(context.Background(), promql, time.Now()) + value, warnings, err := config.ReaderClients.GetCli(r.cluster).Query(context.Background(), promql, time.Now()) if err != nil { logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err) return @@ -672,21 +719,21 @@ func (r RecordingRuleEval) Work() { ts := conv.ConvertToTimeSeries(value, r.rule) if len(ts) != 0 { for _, v := range ts { - writer.Writers.PushSample(r.rule.Name, v) + writer.Writers.PushSample(r.rule.Name, v, r.cluster) } } } type RuleEvalForExternalType struct { sync.RWMutex - rules map[int64]RuleEval + rules map[string]RuleEval // key: hash of ruleid_promevalinterval_promql_cluster } -var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[int64]RuleEval)} +var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[string]RuleEval)} func (re *RuleEvalForExternalType) Build() { rids := memsto.AlertRuleCache.GetRuleIds() - rules := make(map[int64]*models.AlertRule) + rules := make(map[string]*RuleSimpleInfo) for i := 0; i < len(rids); i++ { rule := memsto.AlertRuleCache.Get(rids[i]) @@ -694,16 +741,34 @@ func (re *RuleEvalForExternalType) Build() { continue } - re.Lock() - rules[rule.Id] = rule - re.Unlock() + var clusters []string + if rule.Cluster == models.ClusterAll { + clusters = config.ReaderClients.GetClusterNames() + } else { + clusters = strings.Fields(rule.Cluster) + } + + for _, cluster := range clusters { + hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s", + rule.Id, + rule.PromEvalInterval, + rule.PromQl, + cluster, + )) + re.Lock() + rules[hash] = &RuleSimpleInfo{ + Id: rule.Id, + Cluster: cluster, + } + re.Unlock() + } } // stop old - for rid := range re.rules { - if _, has := rules[rid]; !has { + for oldHash := range re.rules { + if _, has := rules[oldHash]; !has { re.Lock() - delete(re.rules, rid) + delete(re.rules, oldHash) re.Unlock() } } @@ -711,13 +776,13 @@ func (re *RuleEvalForExternalType) Build() { // start new re.Lock() defer re.Unlock() - for rid := range rules { - if _, has := re.rules[rid]; has { + for hash, ruleSimple := range rules { + if _, has := re.rules[hash]; has { // already exists continue } - elst, err := models.AlertCurEventGetByRule(rules[rid].Id) + elst, err := models.AlertCurEventGetByRuleIdAndCluster(ruleSimple.Id, ruleSimple.Cluster) if err != nil { logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err) continue @@ -731,27 +796,35 @@ func (re *RuleEvalForExternalType) Build() { fires := NewAlertCurEventMap() fires.SetAll(firemap) newRe := RuleEval{ - rule: rules[rid], + rule: memsto.AlertRuleCache.Get(ruleSimple.Id), quit: make(chan struct{}), fires: fires, pendings: NewAlertCurEventMap(), + cluster: ruleSimple.Cluster, } - re.rules[rid] = newRe + re.rules[hash] = newRe } } -func (re *RuleEvalForExternalType) Get(rid int64) (RuleEval, bool) { +func (re *RuleEvalForExternalType) Get(rid int64, cluster string) (RuleEval, bool) { + re.RLock() + defer re.RUnlock() rule := memsto.AlertRuleCache.Get(rid) if rule == nil { return RuleEval{}, false } - re.RLock() - defer re.RUnlock() - if ret, has := re.rules[rid]; has { - // already exists + hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s", + rule.Id, + rule.PromEvalInterval, + rule.PromQl, + cluster, + )) + + if ret, has := re.rules[hash]; has { return ret, has } + return RuleEval{}, false } diff --git a/src/server/idents/idents.go b/src/server/idents/idents.go index 7bddfe26..1df865b3 100644 --- a/src/server/idents/idents.go +++ b/src/server/idents/idents.go @@ -41,7 +41,7 @@ func toRedis() { return } - if config.ReaderClient.IsNil() { + if config.ReaderClients.IsNil(config.C.ClusterName) { return } @@ -53,7 +53,7 @@ func toRedis() { Idents.Remove(key) } else { // use now as timestamp to redis - err := storage.Redis.HSet(context.Background(), redisKey(config.ReaderClient.GetClusterName()), key, now).Err() + err := storage.Redis.HSet(context.Background(), redisKey(config.C.ClusterName), key, now).Err() if err != nil { logger.Errorf("redis hset idents failed: %v", err) } @@ -96,7 +96,8 @@ func loopPushMetrics(ctx context.Context) { } func pushMetrics() { - isLeader, err := naming.IamLeader() + clusterName := config.C.ClusterName + isLeader, err := naming.IamLeader(clusterName) if err != nil { logger.Errorf("handle_idents: %v", err) return @@ -107,12 +108,6 @@ func pushMetrics() { return } - clusterName := config.ReaderClient.GetClusterName() - if clusterName == "" { - logger.Warning("cluster name is blank") - return - } - // get all the target heartbeat timestamp ret, err := storage.Redis.HGetAll(context.Background(), redisKey(clusterName)).Result() if err != nil { diff --git a/src/server/memsto/alert_mute_cache.go b/src/server/memsto/alert_mute_cache.go index 723a20d7..a7c4b9bd 100644 --- a/src/server/memsto/alert_mute_cache.go +++ b/src/server/memsto/alert_mute_cache.go @@ -99,14 +99,20 @@ func loopSyncAlertMutes() { func syncAlertMutes() error { start := time.Now() - clusterName := config.ReaderClient.GetClusterName() - if clusterName == "" { - AlertMuteCache.Reset() - logger.Warning("cluster name is blank") + clusterNames := config.ReaderClients.GetClusterNames() + if len(clusterNames) == 0 { + AlertRuleCache.Reset() + logger.Warning("cluster is blank") return nil } - stat, err := models.AlertMuteStatistics(clusterName) + var clusterName string + if len(clusterNames) == 1 { + // 兼容老版本监控数据上报 + clusterName = clusterNames[0] + } + + stat, err := models.AlertMuteStatistics("") if err != nil { return errors.WithMessage(err, "failed to exec AlertMuteStatistics") } @@ -118,7 +124,7 @@ func syncAlertMutes() error { return nil } - lst, err := models.AlertMuteGetsByCluster(clusterName) + lst, err := models.AlertMuteGetsByCluster("") if err != nil { return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster") } diff --git a/src/server/memsto/alert_rule_cache.go b/src/server/memsto/alert_rule_cache.go index c4902a9d..56da23a3 100644 --- a/src/server/memsto/alert_rule_cache.go +++ b/src/server/memsto/alert_rule_cache.go @@ -96,14 +96,20 @@ func loopSyncAlertRules() { func syncAlertRules() error { start := time.Now() - clusterName := config.ReaderClient.GetClusterName() - if clusterName == "" { + clusterNames := config.ReaderClients.GetClusterNames() + if len(clusterNames) == 0 { AlertRuleCache.Reset() - logger.Warning("cluster name is blank") + logger.Warning("cluster is blank") return nil } - stat, err := models.AlertRuleStatistics(clusterName) + var clusterName string + if len(clusterNames) == 1 { + // 兼容老版本监控数据上报 + clusterName = clusterNames[0] + } + + stat, err := models.AlertRuleStatistics("") if err != nil { return errors.WithMessage(err, "failed to exec AlertRuleStatistics") } @@ -115,7 +121,7 @@ func syncAlertRules() error { return nil } - lst, err := models.AlertRuleGetsByCluster(clusterName) + lst, err := models.AlertRuleGetsByCluster("") if err != nil { return errors.WithMessage(err, "failed to exec AlertRuleGetsByCluster") } diff --git a/src/server/memsto/alert_subsribe_cache.go b/src/server/memsto/alert_subsribe_cache.go index 4b2a11e4..6904a197 100644 --- a/src/server/memsto/alert_subsribe_cache.go +++ b/src/server/memsto/alert_subsribe_cache.go @@ -102,14 +102,20 @@ func loopSyncAlertSubscribes() { func syncAlertSubscribes() error { start := time.Now() - clusterName := config.ReaderClient.GetClusterName() - if clusterName == "" { + clusterNames := config.ReaderClients.GetClusterNames() + if len(clusterNames) == 0 { AlertSubscribeCache.Reset() - logger.Warning("cluster name is blank") + logger.Warning("cluster is blank") return nil } - stat, err := models.AlertSubscribeStatistics(clusterName) + var clusterName string + if len(clusterNames) == 1 { + // 兼容老版本监控数据上报 + clusterName = clusterNames[0] + } + + stat, err := models.AlertSubscribeStatistics("") if err != nil { return errors.WithMessage(err, "failed to exec AlertSubscribeStatistics") } @@ -121,7 +127,7 @@ func syncAlertSubscribes() error { return nil } - lst, err := models.AlertSubscribeGetsByCluster(clusterName) + lst, err := models.AlertSubscribeGetsByCluster("") if err != nil { return errors.WithMessage(err, "failed to exec AlertSubscribeGetsByCluster") } diff --git a/src/server/memsto/busi_group_cache.go b/src/server/memsto/busi_group_cache.go index a32b641e..944bfa0a 100644 --- a/src/server/memsto/busi_group_cache.go +++ b/src/server/memsto/busi_group_cache.go @@ -79,7 +79,7 @@ func syncBusiGroups() error { return errors.WithMessage(err, "failed to exec BusiGroupStatistics") } - clusterName := config.ReaderClient.GetClusterName() + clusterName := config.C.ClusterName if !BusiGroupCache.StatChanged(stat.Total, stat.LastUpdated) { if clusterName != "" { diff --git a/src/server/memsto/recording_rule_cache.go b/src/server/memsto/recording_rule_cache.go index 7670bd28..da8568c4 100644 --- a/src/server/memsto/recording_rule_cache.go +++ b/src/server/memsto/recording_rule_cache.go @@ -95,13 +95,19 @@ func loopSyncRecordingRules() { func syncRecordingRules() error { start := time.Now() - clusterName := config.ReaderClient.GetClusterName() - if clusterName == "" { + clusterNames := config.ReaderClients.GetClusterNames() + if len(clusterNames) == 0 { RecordingRuleCache.Reset() - logger.Warning("cluster name is blank") + logger.Warning("cluster is blank") return nil } + var clusterName string + // 只有一个集群,使用单集群模式,如果大于1个集群,则获取全部的规则 + if len(clusterNames) == 1 { + clusterName = clusterNames[0] + } + stat, err := models.RecordingRuleStatistics(clusterName) if err != nil { return errors.WithMessage(err, "failed to exec RecordingRuleStatistics") diff --git a/src/server/memsto/target_cache.go b/src/server/memsto/target_cache.go index 9e5e04b4..c5a1a355 100644 --- a/src/server/memsto/target_cache.go +++ b/src/server/memsto/target_cache.go @@ -103,7 +103,7 @@ func loopSyncTargets() { func syncTargets() error { start := time.Now() - clusterName := config.ReaderClient.GetClusterName() + clusterName := config.C.ClusterName if clusterName == "" { TargetCache.Reset() logger.Warning("cluster name is blank") diff --git a/src/server/memsto/user_cache.go b/src/server/memsto/user_cache.go index 1d1b7fb7..66d32ca4 100644 --- a/src/server/memsto/user_cache.go +++ b/src/server/memsto/user_cache.go @@ -124,7 +124,7 @@ func syncUsers() error { return errors.WithMessage(err, "failed to exec UserStatistics") } - clusterName := config.ReaderClient.GetClusterName() + clusterName := config.C.ClusterName if !UserCache.StatChanged(stat.Total, stat.LastUpdated) { if clusterName != "" { diff --git a/src/server/memsto/user_group_cache.go b/src/server/memsto/user_group_cache.go index 4d9ec2b9..105a350a 100644 --- a/src/server/memsto/user_group_cache.go +++ b/src/server/memsto/user_group_cache.go @@ -106,8 +106,7 @@ func syncUserGroups() error { return errors.WithMessage(err, "failed to exec UserGroupStatistics") } - clusterName := config.ReaderClient.GetClusterName() - + clusterName := config.C.ClusterName if !UserGroupCache.StatChanged(stat.Total, stat.LastUpdated) { if clusterName != "" { promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(0) diff --git a/src/server/naming/hashring.go b/src/server/naming/hashring.go index 468b2ad4..8ccf6cd0 100644 --- a/src/server/naming/hashring.go +++ b/src/server/naming/hashring.go @@ -9,51 +9,47 @@ import ( const NodeReplicas = 500 -type ConsistentHashRing struct { +type ClusterHashRingType struct { sync.RWMutex - ring *consistent.Consistent + Rings map[string]*consistent.Consistent } // for alert_rule sharding -var HashRing = NewConsistentHashRing(int32(NodeReplicas), []string{}) +var ClusterHashRing = ClusterHashRingType{Rings: make(map[string]*consistent.Consistent)} -func (chr *ConsistentHashRing) GetNode(pk string) (string, error) { - chr.RLock() - defer chr.RUnlock() - - return chr.ring.Get(pk) -} - -func (chr *ConsistentHashRing) Set(r *consistent.Consistent) { - chr.Lock() - defer chr.Unlock() - chr.ring = r -} - -func (chr *ConsistentHashRing) GetRing() *consistent.Consistent { - chr.RLock() - defer chr.RUnlock() - - return chr.ring -} - -func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing { - ret := &ConsistentHashRing{ring: consistent.New()} - ret.ring.NumberOfReplicas = int(replicas) +func NewConsistentHashRing(replicas int32, nodes []string) *consistent.Consistent { + ret := consistent.New() + ret.NumberOfReplicas = int(replicas) for i := 0; i < len(nodes); i++ { - ret.ring.Add(nodes[i]) + ret.Add(nodes[i]) } return ret } -func RebuildConsistentHashRing(nodes []string) { +func RebuildConsistentHashRing(cluster string, nodes []string) { r := consistent.New() r.NumberOfReplicas = NodeReplicas for i := 0; i < len(nodes); i++ { r.Add(nodes[i]) } - HashRing.Set(r) + ClusterHashRing.Set(cluster, r) + logger.Infof("hash ring %s rebuild %+v", cluster, r.Members()) +} + +func (chr *ClusterHashRingType) GetNode(cluster, pk string) (string, error) { + chr.RLock() + defer chr.RUnlock() + _, exists := chr.Rings[cluster] + if !exists { + chr.Rings[cluster] = NewConsistentHashRing(int32(NodeReplicas), []string{}) + } + + return chr.Rings[cluster].Get(pk) +} - logger.Infof("hash ring rebuild %+v", r.Members()) +func (chr *ClusterHashRingType) Set(cluster string, r *consistent.Consistent) { + chr.RLock() + defer chr.RUnlock() + chr.Rings[cluster] = r } diff --git a/src/server/naming/heartbeat.go b/src/server/naming/heartbeat.go index 5c33d2cb..09e612a5 100644 --- a/src/server/naming/heartbeat.go +++ b/src/server/naming/heartbeat.go @@ -14,9 +14,10 @@ import ( ) // local servers -var localss string +var localss map[string]string func Heartbeat(ctx context.Context) error { + localss = make(map[string]string) if err := heartbeat(); err != nil { fmt.Println("failed to heartbeat:", err) return err @@ -37,35 +38,66 @@ func loopHeartbeat() { } func heartbeat() error { - cluster := "" + var clusters []string + var err error if config.C.ReaderFrom == "config" { - cluster = config.C.ClusterName - } + // 在配置文件维护实例和集群的对应关系 + for i := 0; i < len(config.C.Readers); i++ { + clusters = append(clusters, config.C.Readers[i].ClusterName) + err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName) + if err != nil { + logger.Warningf("heartbeat with cluster %s err:%v", config.C.Readers[i].ClusterName, err) + continue + } + } + } else { + // 在页面上维护实例和集群的对应关系 + clusters, err = models.AlertingEngineGetClusters(config.C.Heartbeat.Endpoint) + if err != nil { + return err + } + if len(clusters) == 0 { + // 实例刚刚部署,还没有在页面配置 cluster 的情况,先使用配置文件中的 cluster 上报心跳 + for i := 0; i < len(config.C.Readers); i++ { + err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName) + if err != nil { + logger.Warningf("heartbeat with cluster %s err:%v", config.C.Readers[i].ClusterName, err) + continue + } + } + } - err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint, cluster) - if err != nil { - return err + err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint) + if err != nil { + return err + } } - servers, err := ActiveServers() - if err != nil { - return err - } + for i := 0; i < len(clusters); i++ { + servers, err := ActiveServers(clusters[i]) + if err != nil { + logger.Warningf("hearbeat %s get active server err:", clusters[i], err) + continue + } + + sort.Strings(servers) + newss := strings.Join(servers, " ") + + oldss, exists := localss[clusters[i]] + if exists && oldss == newss { + continue + } - sort.Strings(servers) - newss := strings.Join(servers, " ") - if newss != localss { - RebuildConsistentHashRing(servers) - localss = newss + RebuildConsistentHashRing(clusters[i], servers) + localss[clusters[i]] = newss } return nil } -func ActiveServers() ([]string, error) { - cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint) - if err != nil { - return nil, err +func ActiveServers(cluster string) ([]string, error) { + if cluster == "" { + return nil, fmt.Errorf("cluster is empty") } // 30秒内有心跳,就认为是活的 diff --git a/src/server/naming/leader.go b/src/server/naming/leader.go index b154e827..94b080ee 100644 --- a/src/server/naming/leader.go +++ b/src/server/naming/leader.go @@ -7,8 +7,8 @@ import ( "github.com/toolkits/pkg/logger" ) -func IamLeader() (bool, error) { - servers, err := ActiveServers() +func IamLeader(cluster string) (bool, error) { + servers, err := ActiveServers(cluster) if err != nil { logger.Errorf("failed to get active servers: %v", err) return false, err diff --git a/src/server/router/router.go b/src/server/router/router.go index ded11e3f..13891252 100644 --- a/src/server/router/router.go +++ b/src/server/router/router.go @@ -69,7 +69,7 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) { }) r.GET("/servers/active", func(c *gin.Context) { - lst, err := naming.ActiveServers() + lst, err := naming.ActiveServers(ginx.QueryStr(c, "cluster")) ginx.NewRender(c).Data(lst, err) }) diff --git a/src/server/router/router_datadog.go b/src/server/router/router_datadog.go index e8afce39..c040ba53 100644 --- a/src/server/router/router_datadog.go +++ b/src/server/router/router_datadog.go @@ -281,7 +281,7 @@ func datadogSeries(c *gin.Context) { } if succ > 0 { - cn := config.ReaderClient.GetClusterName() + cn := config.C.ClusterName if cn != "" { promstat.CounterSampleTotal.WithLabelValues(cn, "datadog").Add(float64(succ)) } diff --git a/src/server/router/router_event.go b/src/server/router/router_event.go index 03e60518..79f889ce 100644 --- a/src/server/router/router_event.go +++ b/src/server/router/router_event.go @@ -7,7 +7,6 @@ import ( "github.com/didi/nightingale/v5/src/models" "github.com/didi/nightingale/v5/src/server/common/conv" - "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/engine" promstat "github.com/didi/nightingale/v5/src/server/stat" @@ -67,10 +66,7 @@ func pushEventToQueue(c *gin.Context) { event.NotifyChannels = strings.Join(event.NotifyChannelsJSON, " ") event.NotifyGroups = strings.Join(event.NotifyGroupsJSON, " ") - cn := config.ReaderClient.GetClusterName() - if cn != "" { - promstat.CounterAlertsTotal.WithLabelValues(cn).Inc() - } + promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc() engine.LogEvent(event, "http_push_queue") if !engine.EventQueue.PushFront(event) { @@ -91,7 +87,7 @@ type eventForm struct { func judgeEvent(c *gin.Context) { var form eventForm ginx.BindJSON(c, &form) - re, exists := engine.RuleEvalForExternal.Get(form.RuleId) + re, exists := engine.RuleEvalForExternal.Get(form.RuleId, form.Cluster) if !exists { ginx.Bomb(200, "rule not exists") } @@ -104,7 +100,7 @@ func makeEvent(c *gin.Context) { ginx.BindJSON(c, &events) now := time.Now().Unix() for i := 0; i < len(events); i++ { - re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId) + re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId, events[i].Cluster) logger.Debugf("handle event:%+v exists:%v", events[i], exists) if !exists { ginx.Bomb(200, "rule not exists") @@ -114,7 +110,7 @@ func makeEvent(c *gin.Context) { go re.MakeNewEvent("http", now, events[i].Cluster, events[i].Vectors) } else { for _, vector := range events[i].Vectors { - hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key)) + hash := str.MD5(fmt.Sprintf("%d_%s_%s", events[i].RuleId, vector.Key, events[i].Cluster)) now := vector.Timestamp go re.RecoverEvent(hash, now, vector.Value) } diff --git a/src/server/router/router_memsto.go b/src/server/router/router_memsto.go index da8a6346..65ab2904 100644 --- a/src/server/router/router_memsto.go +++ b/src/server/router/router_memsto.go @@ -2,10 +2,14 @@ package router import ( "net/http" + "strconv" + "strings" "github.com/gin-gonic/gin" "github.com/toolkits/pkg/ginx" + "github.com/didi/nightingale/v5/src/models" + "github.com/didi/nightingale/v5/src/server/config" "github.com/didi/nightingale/v5/src/server/idents" "github.com/didi/nightingale/v5/src/server/memsto" "github.com/didi/nightingale/v5/src/server/naming" @@ -48,12 +52,28 @@ func userGroupGet(c *gin.Context) { } func alertRuleLocationGet(c *gin.Context) { - id := ginx.QueryStr(c, "id") - node, err := naming.HashRing.GetNode(id) - if err != nil { - http.Error(c.Writer, err.Error(), http.StatusInternalServerError) + id := ginx.QueryInt64(c, "id") + rule := memsto.AlertRuleCache.Get(id) + if rule == nil { + http.Error(c.Writer, "rule not found", http.StatusNotFound) return } + var clusters []string + if rule.Cluster == models.ClusterAll { + clusters = config.ReaderClients.GetClusterNames() + } else { + clusters = strings.Fields(rule.Cluster) + } + + var arr []gin.H + for _, cluster := range clusters { + node, err := naming.ClusterHashRing.GetNode(cluster, strconv.FormatInt(id, 10)) + if err != nil { + http.Error(c.Writer, err.Error(), http.StatusInternalServerError) + return + } + arr = append(arr, gin.H{"id": id, "cluster": cluster, "node": node}) + } - c.JSON(200, gin.H{"id": id, "node": node}) + c.JSON(200, gin.H{"list": arr}) } diff --git a/src/server/router/router_openfalcon.go b/src/server/router/router_openfalcon.go index e69597e4..4a5d5726 100644 --- a/src/server/router/router_openfalcon.go +++ b/src/server/router/router_openfalcon.go @@ -228,7 +228,7 @@ func falconPush(c *gin.Context) { } if succ > 0 { - cn := config.ReaderClient.GetClusterName() + cn := config.C.ClusterName if cn != "" { promstat.CounterSampleTotal.WithLabelValues(cn, "openfalcon").Add(float64(succ)) } diff --git a/src/server/router/router_opentsdb.go b/src/server/router/router_opentsdb.go index 9d2f640d..ecae5286 100644 --- a/src/server/router/router_opentsdb.go +++ b/src/server/router/router_opentsdb.go @@ -222,7 +222,7 @@ func handleOpenTSDB(c *gin.Context) { } if succ > 0 { - cn := config.ReaderClient.GetClusterName() + cn := config.C.ClusterName if cn != "" { promstat.CounterSampleTotal.WithLabelValues(cn, "opentsdb").Add(float64(succ)) } diff --git a/src/server/router/router_prom.go b/src/server/router/router_prom.go index b6dab400..ee0b9164 100644 --- a/src/server/router/router_prom.go +++ b/src/server/router/router_prom.go @@ -37,12 +37,12 @@ func queryPromql(c *gin.Context) { var f promqlForm ginx.BindJSON(c, &f) - if config.ReaderClient.IsNil() { + if config.ReaderClients.IsNil(config.C.ClusterName) { c.String(500, "reader client is nil") return } - value, warnings, err := config.ReaderClient.GetCli().Query(c.Request.Context(), f.PromQL, time.Now()) + value, warnings, err := config.ReaderClients.GetCli(config.C.ClusterName).Query(c.Request.Context(), f.PromQL, time.Now()) if err != nil { c.String(500, "promql:%s error:%v", f.PromQL, err) return @@ -160,7 +160,7 @@ func remoteWrite(c *gin.Context) { } } - cn := config.ReaderClient.GetClusterName() + cn := config.C.ClusterName if cn != "" { promstat.CounterSampleTotal.WithLabelValues(cn, "prometheus").Add(float64(count)) } diff --git a/src/server/stat/stat.go b/src/server/stat/stat.go index 10824e45..0f75abb6 100644 --- a/src/server/stat/stat.go +++ b/src/server/stat/stat.go @@ -43,12 +43,12 @@ var ( }, []string{"cluster"}) // 内存中的告警事件队列的长度 - GaugeAlertQueueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + GaugeAlertQueueSize = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "alert_queue_size", Help: "The size of alert queue.", - }, []string{"cluster"}) + }) // 数据转发队列,各个队列的长度 GaugeSampleQueueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/src/server/writer/writer.go b/src/server/writer/writer.go index 786626e6..f44f5087 100644 --- a/src/server/writer/writer.go +++ b/src/server/writer/writer.go @@ -37,7 +37,7 @@ func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSerie return ritems } -func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[string]string) { +func (w WriterType) Write(cluster string, index int, items []*prompb.TimeSeries, headers ...map[string]string) { if len(items) == 0 { return } @@ -49,9 +49,8 @@ func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[ start := time.Now() defer func() { - cn := config.ReaderClient.GetClusterName() - if cn != "" { - promstat.ForwardDuration.WithLabelValues(cn, fmt.Sprint(index)).Observe(time.Since(start).Seconds()) + if cluster != "" { + promstat.ForwardDuration.WithLabelValues(cluster, fmt.Sprint(index)).Observe(time.Since(start).Seconds()) } }() @@ -130,17 +129,28 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error { type WritersType struct { globalOpt config.WriterGlobalOpt backends map[string]WriterType - queues map[int]*SafeListLimited + queues map[string]map[int]*SafeListLimited } func (ws *WritersType) Put(name string, writer WriterType) { ws.backends[name] = writer } -func (ws *WritersType) PushSample(ident string, v interface{}) { +func (ws *WritersType) PushSample(ident string, v interface{}, clusters ...string) { hashkey := crc32.ChecksumIEEE([]byte(ident)) % uint32(ws.globalOpt.QueueCount) - c, ok := ws.queues[int(hashkey)] + cluster := config.C.ClusterName + if len(clusters) > 0 { + cluster = clusters[0] + } + + if _, ok := ws.queues[cluster]; !ok { + // 待写入的集群不存在 + logger.Warningf("Write cluster:%s not found, v:%+v", cluster, v) + return + } + + c, ok := ws.queues[cluster][int(hashkey)] if ok { succ := c.PushFront(v) if !succ { @@ -149,7 +159,7 @@ func (ws *WritersType) PushSample(ident string, v interface{}) { } } -func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited) { +func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited, clusterName string) { for { series := ch.PopBack(ws.globalOpt.QueuePopSize) if len(series) == 0 { @@ -158,7 +168,10 @@ func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited) { } for key := range ws.backends { - go ws.backends[key].Write(index, series) + if ws.backends[key].Opts.Name != clusterName { + continue + } + go ws.backends[key].Write(clusterName, index, series) } } } @@ -173,11 +186,15 @@ var Writers = NewWriters() func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error { Writers.globalOpt = globalOpt - Writers.queues = make(map[int]*SafeListLimited) - - for i := 0; i < globalOpt.QueueCount; i++ { - Writers.queues[i] = NewSafeListLimited(Writers.globalOpt.QueueMaxSize) - go Writers.StartConsumer(i, Writers.queues[i]) + Writers.queues = make(map[string]map[int]*SafeListLimited) + for _, opt := range opts { + if _, ok := Writers.queues[opt.Name]; !ok { + Writers.queues[opt.Name] = make(map[int]*SafeListLimited) + for i := 0; i < globalOpt.QueueCount; i++ { + Writers.queues[opt.Name][i] = NewSafeListLimited(Writers.globalOpt.QueueMaxSize) + go Writers.StartConsumer(i, Writers.queues[opt.Name][i], opt.Name) + } + } } go reportChanSize() @@ -218,16 +235,18 @@ func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error { } func reportChanSize() { - clusterName := config.ReaderClient.GetClusterName() + clusterName := config.C.ClusterName if clusterName == "" { return } for { time.Sleep(time.Second * 3) - for i, c := range Writers.queues { - size := c.Len() - promstat.GaugeSampleQueueSize.WithLabelValues(clusterName, fmt.Sprint(i)).Set(float64(size)) + for cluster, m := range Writers.queues { + for i, c := range m { + size := c.Len() + promstat.GaugeSampleQueueSize.WithLabelValues(cluster, fmt.Sprint(i)).Set(float64(size)) + } } } } diff --git a/src/webapi/router/router.go b/src/webapi/router/router.go index 2b844c48..29b7913e 100644 --- a/src/webapi/router/router.go +++ b/src/webapi/router/router.go @@ -301,6 +301,8 @@ func configRoute(r *gin.Engine, version string) { pages.GET("/servers", auth(), admin(), serversGet) pages.PUT("/server/:id", auth(), admin(), serverBindCluster) + pages.POST("/servers", auth(), admin(), serverAddCluster) + pages.DELETE("/servers", auth(), admin(), serverDelCluster) } service := r.Group("/v1/n9e") diff --git a/src/webapi/router/router_server.go b/src/webapi/router/router_server.go index ff1b2898..c900c20b 100644 --- a/src/webapi/router/router_server.go +++ b/src/webapi/router/router_server.go @@ -13,7 +13,8 @@ func serversGet(c *gin.Context) { } type serverBindClusterForm struct { - Cluster string `json:"cluster"` + Cluster string `json:"cluster"` + Instance string `json:"instance"` } // 用户为某个 n9e-server 分配一个集群,也可以清空,设置cluster为空字符串即可 @@ -33,3 +34,18 @@ func serverBindCluster(c *gin.Context) { ginx.NewRender(c).Message(ae.UpdateCluster(f.Cluster)) } + +func serverAddCluster(c *gin.Context) { + var f serverBindClusterForm + ginx.BindJSON(c, &f) + + ginx.NewRender(c).Message(models.AlertingEngineAdd(f.Instance, f.Cluster)) +} + +func serverDelCluster(c *gin.Context) { + var f idsForm + ginx.BindJSON(c, &f) + f.Verify() + + ginx.NewRender(c).Message(models.AlertingEngineDel(f.Ids)) +}