Browse Source

n9e server support multi cluster alert (#1318)

* support multi

* refactor

* code refactor

* refactor

* code refactor

* fix run mult cluster rule

* code refactor

* add alerting_engine api

* add alerting_engine api

* update sql

* refactor recording push

* refactor

* refactor

* delete useless cluster

* split to fields

* change stats

* change stats
tags/v5.14.3
Yening Qin GitHub 3 years ago
parent
commit
b0c05368f7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 570 additions and 271 deletions
  1. +2
    -3
      docker/initsql/a-n9e.sql
  2. +0
    -1
      docker/initsql_for_postgres/a-n9e-for-Postgres.sql
  3. +13
    -0
      etc/server.conf
  4. +2
    -2
      src/models/alert_cur_event.go
  5. +57
    -9
      src/models/alerting_engine.go
  6. +20
    -7
      src/server/config/config.go
  7. +29
    -21
      src/server/config/prom_client.go
  8. +7
    -0
      src/server/config/prom_option.go
  9. +62
    -36
      src/server/config/reader.go
  10. +2
    -5
      src/server/engine/engine.go
  11. +17
    -0
      src/server/engine/mute.go
  12. +134
    -61
      src/server/engine/worker.go
  13. +4
    -9
      src/server/idents/idents.go
  14. +12
    -6
      src/server/memsto/alert_mute_cache.go
  15. +11
    -5
      src/server/memsto/alert_rule_cache.go
  16. +11
    -5
      src/server/memsto/alert_subsribe_cache.go
  17. +1
    -1
      src/server/memsto/busi_group_cache.go
  18. +9
    -3
      src/server/memsto/recording_rule_cache.go
  19. +1
    -1
      src/server/memsto/target_cache.go
  20. +1
    -1
      src/server/memsto/user_cache.go
  21. +1
    -2
      src/server/memsto/user_group_cache.go
  22. +26
    -30
      src/server/naming/hashring.go
  23. +52
    -20
      src/server/naming/heartbeat.go
  24. +2
    -2
      src/server/naming/leader.go
  25. +1
    -1
      src/server/router/router.go
  26. +1
    -1
      src/server/router/router_datadog.go
  27. +4
    -8
      src/server/router/router_event.go
  28. +25
    -5
      src/server/router/router_memsto.go
  29. +1
    -1
      src/server/router/router_openfalcon.go
  30. +1
    -1
      src/server/router/router_opentsdb.go
  31. +3
    -3
      src/server/router/router_prom.go
  32. +2
    -2
      src/server/stat/stat.go
  33. +37
    -18
      src/server/writer/writer.go
  34. +2
    -0
      src/webapi/router/router.go
  35. +17
    -1
      src/webapi/router/router_server.go

+ 2
- 3
docker/initsql/a-n9e.sql View File

@@ -525,6 +525,5 @@ CREATE TABLE `alerting_engines`
`instance` varchar(128) not null default '' comment 'instance identification, e.g. 10.9.0.9:9090',
`cluster` varchar(128) not null default '' comment 'target reader cluster',
`clock` bigint not null,
PRIMARY KEY (`id`),
UNIQUE KEY (`instance`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

+ 0
- 1
docker/initsql_for_postgres/a-n9e-for-Postgres.sql View File

@@ -610,6 +610,5 @@ CREATE TABLE alerting_engines
clock bigint not null
) ;
ALTER TABLE alerting_engines ADD CONSTRAINT alerting_engines_pk PRIMARY KEY (id);
ALTER TABLE alerting_engines ADD CONSTRAINT alerting_engines_un UNIQUE (instance);
COMMENT ON COLUMN alerting_engines.instance IS 'instance identification, e.g. 10.9.0.9:9090';
COMMENT ON COLUMN alerting_engines.cluster IS 'target reader cluster';

+ 13
- 0
etc/server.conf View File

@@ -165,6 +165,19 @@ Timeout = 30000
DialTimeout = 3000
MaxIdleConnsPerHost = 100

# [[Readers]]
# ClusterName = "Default"
# prometheus base url
# Url = "http://127.0.0.1:9090"
# Basic auth username
# BasicAuthUser = ""
# Basic auth password
# BasicAuthPass = ""
# timeout settings, unit: ms
# Timeout = 30000
# DialTimeout = 3000
# MaxIdleConnsPerHost = 100

[WriterOpt]
# queue channel count
QueueCount = 1000


+ 2
- 2
src/models/alert_cur_event.go View File

@@ -420,9 +420,9 @@ func AlertCurEventGetByIds(ids []int64) ([]*AlertCurEvent, error) {
return lst, err
}

func AlertCurEventGetByRule(ruleId int64) ([]*AlertCurEvent, error) {
func AlertCurEventGetByRuleIdAndCluster(ruleId int64, cluster string) ([]*AlertCurEvent, error) {
var lst []*AlertCurEvent
err := DB().Where("rule_id=?", ruleId).Find(&lst).Error
err := DB().Where("rule_id=? and cluster=?", ruleId, cluster).Find(&lst).Error
return lst, err
}



+ 57
- 9
src/models/alerting_engine.go View File

@@ -1,6 +1,9 @@
package models

import "time"
import (
"fmt"
"time"
)

type AlertingEngines struct {
Id int64 `json:"id" gorm:"primaryKey"`
@@ -15,23 +18,62 @@ func (e *AlertingEngines) TableName() string {

// UpdateCluster 页面上用户会给各个n9e-server分配要关联的目标集群是什么
func (e *AlertingEngines) UpdateCluster(c string) error {
count, err := Count(DB().Model(&AlertingEngines{}).Where("id<>? and instance=? and cluster=?", e.Id, e.Instance, c))
if err != nil {
return err
}

if count > 0 {
return fmt.Errorf("instance %s and cluster %s already exists", e.Instance, c)
}

e.Cluster = c
return DB().Model(e).Select("cluster").Updates(e).Error
}

func AlertingEngineAdd(instance, cluster string) error {
count, err := Count(DB().Model(&AlertingEngines{}).Where("instance=? and cluster=?", instance, cluster))
if err != nil {
return err
}

if count > 0 {
return fmt.Errorf("instance %s and cluster %s already exists", instance, cluster)
}

err = DB().Create(&AlertingEngines{
Instance: instance,
Cluster: cluster,
Clock: time.Now().Unix(),
}).Error

return err
}

func AlertingEngineDel(ids []int64) error {
if len(ids) == 0 {
return nil
}
return DB().Where("id in ?", ids).Delete(new(AlertingEngines)).Error
}

// AlertingEngineGetCluster 根据实例名获取对应的集群名字
func AlertingEngineGetCluster(instance string) (string, error) {
func AlertingEngineGetClusters(instance string) ([]string, error) {
var objs []AlertingEngines
err := DB().Where("instance=?", instance).Find(&objs).Error
if err != nil {
return "", err
return []string{}, err
}

if len(objs) == 0 {
return "", nil
return []string{}, nil
}
var clusters []string
for i := 0; i < len(objs); i++ {
clusters = append(clusters, objs[i].Cluster)
}

return objs[0].Cluster, nil
return clusters, nil
}

// AlertingEngineGets 拉取列表数据,用户要在页面上看到所有 n9e-server 实例列表,然后为其分配 cluster
@@ -72,9 +114,9 @@ func AlertingEngineGetsInstances(where string, args ...interface{}) ([]string, e
return arr, err
}

func AlertingEngineHeartbeat(instance, cluster string) error {
func AlertingEngineHeartbeatWithCluster(instance, cluster string) error {
var total int64
err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Count(&total).Error
err := DB().Model(new(AlertingEngines)).Where("instance=? and cluster=?", instance, cluster).Count(&total).Error
if err != nil {
return err
}
@@ -88,9 +130,15 @@ func AlertingEngineHeartbeat(instance, cluster string) error {
}).Error
} else {
// updates
fields := map[string]interface{}{"clock": time.Now().Unix(), "cluster": cluster}
err = DB().Model(new(AlertingEngines)).Where("instance=?", instance).Updates(fields).Error
fields := map[string]interface{}{"clock": time.Now().Unix()}
err = DB().Model(new(AlertingEngines)).Where("instance=? and cluster=?", instance, cluster).Updates(fields).Error
}

return err
}

func AlertingEngineHeartbeat(instance string) error {
fields := map[string]interface{}{"clock": time.Now().Unix()}
err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Updates(fields).Error
return err
}

+ 20
- 7
src/server/config/config.go View File

@@ -64,12 +64,19 @@ func DealConfigCrypto(key string) {
}
C.Ibex.BasicAuthPass = decryptIbexPwd

decryptReaderPwd, err := secu.DealWithDecrypt(C.Reader.BasicAuthPass, key)
if err != nil {
fmt.Println("failed to decrypt the reader password", err)
os.Exit(1)
if len(C.Readers) == 0 {
C.Reader.ClusterName = C.ClusterName
C.Readers = append(C.Readers, C.Reader)
}

for index, v := range C.Readers {
decryptReaderPwd, err := secu.DealWithDecrypt(v.BasicAuthPass, key)
if err != nil {
fmt.Printf("failed to decrypt the reader password: %s , error: %s", v.BasicAuthPass, err.Error())
os.Exit(1)
}
C.Readers[index].BasicAuthPass = decryptReaderPwd
}
C.Reader.BasicAuthPass = decryptReaderPwd

for index, v := range C.Writers {
decryptWriterPwd, err := secu.DealWithDecrypt(v.BasicAuthPass, key)
@@ -217,7 +224,11 @@ func MustLoad(key string, fpaths ...string) {
C.WriterOpt.ShardingKey = "ident"
}

for _, write := range C.Writers {
for i, write := range C.Writers {
if C.Writers[i].Name == "" {
C.Writers[i].Name = C.ClusterName
}

for _, relabel := range write.WriteRelabels {
regex, ok := relabel.Regex.(string)
if !ok {
@@ -251,7 +262,7 @@ func MustLoad(key string, fpaths ...string) {

type Config struct {
RunMode string
ClusterName string
ClusterName string // 监控对象上报时,指定的集群名称
BusiGroupLabelKey string
EngineDelay int64
DisableUsageReport bool
@@ -269,10 +280,12 @@ type Config struct {
WriterOpt WriterGlobalOpt
Writers []WriterOptions
Reader PromOption
Readers []PromOption
Ibex Ibex
}

type WriterOptions struct {
Name string
Url string
BasicAuthUser string
BasicAuthPass string


+ 29
- 21
src/server/config/prom_client.go View File

@@ -6,40 +6,38 @@ import (
"github.com/didi/nightingale/v5/src/pkg/prom"
)

type PromClient struct {
prom.API
ClusterName string
type PromClientMap struct {
sync.RWMutex
Clients map[string]prom.API
}

var ReaderClient *PromClient = &PromClient{}
var ReaderClients *PromClientMap = &PromClientMap{Clients: make(map[string]prom.API)}

func (pc *PromClient) Set(clusterName string, c prom.API) {
func (pc *PromClientMap) Set(clusterName string, c prom.API) {
pc.Lock()
defer pc.Unlock()
pc.ClusterName = clusterName
pc.API = c
pc.Clients[clusterName] = c
}

func (pc *PromClient) Get() (string, prom.API) {
func (pc *PromClientMap) GetClusterNames() []string {
pc.RLock()
defer pc.RUnlock()
return pc.ClusterName, pc.API
}
var clusterNames []string
for k := range pc.Clients {
clusterNames = append(clusterNames, k)
}

func (pc *PromClient) GetClusterName() string {
pc.RLock()
defer pc.RUnlock()
return pc.ClusterName
return clusterNames
}

func (pc *PromClient) GetCli() prom.API {
func (pc *PromClientMap) GetCli(cluster string) prom.API {
pc.RLock()
defer pc.RUnlock()
return pc.API
c := pc.Clients[cluster]
return c
}

func (pc *PromClient) IsNil() bool {
func (pc *PromClientMap) IsNil(cluster string) bool {
if pc == nil {
return true
}
@@ -47,13 +45,23 @@ func (pc *PromClient) IsNil() bool {
pc.RLock()
defer pc.RUnlock()

return pc.API == nil
c, exists := pc.Clients[cluster]
if !exists {
return true
}

return c == nil
}

func (pc *PromClient) Reset() {
func (pc *PromClientMap) Reset() {
pc.Lock()
defer pc.Unlock()

pc.ClusterName = ""
pc.API = nil
pc.Clients = make(map[string]prom.API)
}

func (pc *PromClientMap) Del(cluster string) {
pc.Lock()
defer pc.Unlock()
delete(pc.Clients, cluster)
}

+ 7
- 0
src/server/config/prom_option.go View File

@@ -3,6 +3,7 @@ package config
import "sync"

type PromOption struct {
ClusterName string
Url string
BasicAuthUser string
BasicAuthPass string
@@ -70,6 +71,12 @@ func (pos *PromOptionsStruct) Sets(clusterName string, po PromOption) {
pos.Unlock()
}

func (pos *PromOptionsStruct) Del(clusterName string) {
pos.Lock()
delete(pos.Data, clusterName)
pos.Unlock()
}

func (pos *PromOptionsStruct) Get(clusterName string) (PromOption, bool) {
pos.RLock()
defer pos.RUnlock()


+ 62
- 36
src/server/config/reader.go View File

@@ -17,7 +17,19 @@ import (
func InitReader() error {
rf := strings.ToLower(strings.TrimSpace(C.ReaderFrom))
if rf == "" || rf == "config" {
return setClientFromPromOption(C.ClusterName, C.Reader)
if len(C.Readers) == 0 {
C.Reader.ClusterName = C.ClusterName
C.Readers = append(C.Readers, C.Reader)
}

for _, reader := range C.Readers {
err := setClientFromPromOption(reader.ClusterName, reader)
if err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
continue
}
}
return nil
}

if rf == "database" {
@@ -38,57 +50,71 @@ func initFromDatabase() error {
}

func loadFromDatabase() {
cluster, err := models.AlertingEngineGetCluster(C.Heartbeat.Endpoint)
clusters, err := models.AlertingEngineGetClusters(C.Heartbeat.Endpoint)
if err != nil {
logger.Errorf("failed to get current cluster, error: %v", err)
return
}

if cluster == "" {
ReaderClient.Reset()
if len(clusters) == 0 {
ReaderClients.Reset()
logger.Warning("no datasource binded to me")
return
}

ckey := "prom." + cluster + ".option"
cval, err := models.ConfigsGet(ckey)
if err != nil {
logger.Errorf("failed to get ckey: %s, error: %v", ckey, err)
return
}
newCluster := make(map[string]struct{})
for _, cluster := range clusters {
newCluster[cluster] = struct{}{}
ckey := "prom." + cluster + ".option"
cval, err := models.ConfigsGet(ckey)
if err != nil {
logger.Errorf("failed to get ckey: %s, error: %v", ckey, err)
continue
}

if cval == "" {
ReaderClient.Reset()
return
}
if cval == "" {
logger.Warningf("ckey: %s is empty", ckey)
continue
}

var po PromOption
err = json.Unmarshal([]byte(cval), &po)
if err != nil {
logger.Errorf("failed to unmarshal PromOption: %s", err)
return
}
var po PromOption
err = json.Unmarshal([]byte(cval), &po)
if err != nil {
logger.Errorf("failed to unmarshal PromOption: %s", err)
continue
}

if ReaderClients.IsNil(cluster) {
// first time
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
continue
}

if ReaderClient.IsNil() {
// first time
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
return
logger.Info("setClientFromPromOption success: ", cluster)
PromOptions.Sets(cluster, po)
continue
}

PromOptions.Sets(cluster, po)
return
}
localPo, has := PromOptions.Get(cluster)
if !has || !localPo.Equal(po) {
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
continue
}

localPo, has := PromOptions.Get(cluster)
if !has || !localPo.Equal(po) {
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
return
PromOptions.Sets(cluster, po)
}
}

PromOptions.Sets(cluster, po)
return
// delete useless cluster
oldClusters := ReaderClients.GetClusterNames()
for _, oldCluster := range oldClusters {
if _, has := newCluster[oldCluster]; !has {
ReaderClients.Del(oldCluster)
PromOptions.Del(oldCluster)
logger.Info("delete cluster: ", oldCluster)
}
}
}

@@ -121,7 +147,7 @@ func setClientFromPromOption(clusterName string, po PromOption) error {
return fmt.Errorf("failed to newClientFromPromOption: %v", err)
}

ReaderClient.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{
ReaderClients.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: po.BasicAuthUser,
BasicAuthPass: po.BasicAuthPass,
Headers: po.Headers,


+ 2
- 5
src/server/engine/engine.go View File

@@ -53,10 +53,7 @@ func Reload() {
func reportQueueSize() {
for {
time.Sleep(time.Second)
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
continue
}
promstat.GaugeAlertQueueSize.WithLabelValues(clusterName).Set(float64(EventQueue.Len()))

promstat.GaugeAlertQueueSize.Set(float64(EventQueue.Len()))
}
}

+ 17
- 0
src/server/engine/mute.go View File

@@ -1,6 +1,8 @@
package engine

import (
"strings"

"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/memsto"
)
@@ -31,6 +33,21 @@ func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int
ts = clock[0]
}

// 如果不是全局的,判断 cluster
if mute.Cluster != models.ClusterAll {
// event.Cluster 是一个字符串,可能是多个cluster的组合,比如"cluster1 cluster2"
clusters := strings.Fields(mute.Cluster)
cm := make(map[string]struct{}, len(clusters))
for i := 0; i < len(clusters); i++ {
cm[clusters[i]] = struct{}{}
}

// 判断event.Cluster是否包含在cm中
if _, has := cm[event.Cluster]; !has {
return false
}
}

if ts < mute.Btime || ts > mute.Etime {
return false
}


+ 134
- 61
src/server/engine/worker.go View File

@@ -39,22 +39,48 @@ func loopFilterRules(ctx context.Context) {
}
}

// 一个规则可能会在多个集群中生效,所以这里要把规则拆分成多个,此结构记录 id 和 cluster 的对应关系
type RuleSimpleInfo struct {
Id int64
Cluster string
}

func filterRules() {
ids := memsto.AlertRuleCache.GetRuleIds()
logger.Debugf("AlertRuleCache.GetRuleIds success,ids.len: %d", len(ids))
logger.Debugf("AlertRuleCache.GetRuleIds success, ids.len: %d", len(ids))

count := len(ids)
mines := make([]int64, 0, count)
mines := make([]*RuleSimpleInfo, 0, count)

for i := 0; i < count; i++ {
node, err := naming.HashRing.GetNode(fmt.Sprint(ids[i]))
if err != nil {
logger.Warning("failed to get node from hashring:", err)
rule := memsto.AlertRuleCache.Get(ids[i])
if rule == nil {
logger.Debugf("AlertRuleCache.Get(%d) failed", ids[i])
continue
}

if node == config.C.Heartbeat.Endpoint {
mines = append(mines, ids[i])
var clusters []string
if rule.Cluster == models.ClusterAll {
clusters = config.ReaderClients.GetClusterNames()
} else {
clusters = strings.Fields(rule.Cluster)
}

for _, cluster := range clusters {
if config.ReaderClients.IsNil(cluster) {
// 没有这个集群的配置,跳过
continue
}

node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i]))
if err != nil {
logger.Warningf("rid:%d cluster:%s failed to get node from hashring:%v", ids[i], cluster, err)
continue
}

if node == config.C.Heartbeat.Endpoint {
mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster})
}
}
}

@@ -63,6 +89,7 @@ func filterRules() {
}

type RuleEval struct {
cluster string
rule *models.AlertRule
fires *AlertCurEventMap
pendings *AlertCurEventMap
@@ -166,12 +193,12 @@ func (r *RuleEval) Work() {
return
}

if config.ReaderClient.IsNil() {
if config.ReaderClients.IsNil(r.cluster) {
logger.Error("reader client is nil")
return
}

clusterName, readerClient := config.ReaderClient.Get()
readerClient := config.ReaderClients.GetCli(r.cluster)

var value model.Value
var err error
@@ -192,7 +219,7 @@ func (r *RuleEval) Work() {
logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value)
}

r.Judge(clusterName, conv.ConvertVectors(value))
r.Judge(r.cluster, conv.ConvertVectors(value))
}

type WorkersType struct {
@@ -202,22 +229,23 @@ type WorkersType struct {

var Workers = &WorkersType{rules: make(map[string]*RuleEval), recordRules: make(map[string]RecordingRuleEval)}

func (ws *WorkersType) Build(rids []int64) {
rules := make(map[string]*models.AlertRule)
func (ws *WorkersType) Build(ris []*RuleSimpleInfo) {
rules := make(map[string]*RuleSimpleInfo)

for i := 0; i < len(rids); i++ {
rule := memsto.AlertRuleCache.Get(rids[i])
for i := 0; i < len(ris); i++ {
rule := memsto.AlertRuleCache.Get(ris[i].Id)
if rule == nil {
continue
}

hash := str.MD5(fmt.Sprintf("%d_%d_%s",
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
ris[i].Cluster,
))

rules[hash] = rule
rules[hash] = ris[i]
}

// stop old
@@ -235,7 +263,7 @@ func (ws *WorkersType) Build(rids []int64) {
continue
}

elst, err := models.AlertCurEventGetByRule(rules[hash].Id)
elst, err := models.AlertCurEventGetByRuleIdAndCluster(rules[hash].Id, rules[hash].Cluster)
if err != nil {
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
continue
@@ -249,10 +277,11 @@ func (ws *WorkersType) Build(rids []int64) {
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
re := &RuleEval{
rule: rules[hash],
rule: memsto.AlertRuleCache.Get(rules[hash].Id),
quit: make(chan struct{}),
fires: fires,
pendings: NewAlertCurEventMap(),
cluster: rules[hash].Cluster,
}

go re.Start()
@@ -260,27 +289,23 @@ func (ws *WorkersType) Build(rids []int64) {
}
}

func (ws *WorkersType) BuildRe(rids []int64) {
rules := make(map[string]*models.RecordingRule)
func (ws *WorkersType) BuildRe(ris []*RuleSimpleInfo) {
rules := make(map[string]*RuleSimpleInfo)

for i := 0; i < len(rids); i++ {
rule := memsto.RecordingRuleCache.Get(rids[i])
for i := 0; i < len(ris); i++ {
rule := memsto.RecordingRuleCache.Get(ris[i].Id)
if rule == nil {
continue
}

if rule.Disabled == 1 {
continue
}

hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
rule.AppendTags,
ris[i].Cluster,
))

rules[hash] = rule
rules[hash] = ris[i]
}

// stop old
@@ -298,8 +323,9 @@ func (ws *WorkersType) BuildRe(rids []int64) {
continue
}
re := RecordingRuleEval{
rule: rules[hash],
quit: make(chan struct{}),
rule: memsto.RecordingRuleCache.Get(rules[hash].Id),
quit: make(chan struct{}),
cluster: rules[hash].Cluster,
}

go re.Start()
@@ -334,7 +360,7 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vect
alertingKeys := make(map[string]struct{})
for i := 0; i < count; i++ {
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key))
hash := str.MD5(fmt.Sprintf("%d_%s_%s", r.rule.Id, vectors[i].Key, r.cluster))
alertingKeys[hash] = struct{}{}

// rule disabled in this time span?
@@ -598,17 +624,37 @@ func filterRecordingRules() {
ids := memsto.RecordingRuleCache.GetRuleIds()

count := len(ids)
mines := make([]int64, 0, count)
mines := make([]*RuleSimpleInfo, 0, count)

for i := 0; i < count; i++ {
node, err := naming.HashRing.GetNode(fmt.Sprint(ids[i]))
if err != nil {
logger.Warning("failed to get node from hashring:", err)
rule := memsto.RecordingRuleCache.Get(ids[i])
if rule == nil {
logger.Debugf("rule %d not found", ids[i])
continue
}

if node == config.C.Heartbeat.Endpoint {
mines = append(mines, ids[i])
var clusters []string
if rule.Cluster == models.ClusterAll {
clusters = config.ReaderClients.GetClusterNames()
} else {
clusters = strings.Fields(rule.Cluster)
}

for _, cluster := range clusters {
if config.ReaderClients.IsNil(cluster) {
// 没有这个集群的配置,跳过
continue
}

node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i]))
if err != nil {
logger.Warning("failed to get node from hashring:", err)
continue
}

if node == config.C.Heartbeat.Endpoint {
mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster})
}
}
}

@@ -616,8 +662,9 @@ func filterRecordingRules() {
}

type RecordingRuleEval struct {
rule *models.RecordingRule
quit chan struct{}
cluster string
rule *models.RecordingRule
quit chan struct{}
}

func (r RecordingRuleEval) Stop() {
@@ -654,12 +701,12 @@ func (r RecordingRuleEval) Work() {
return
}

if config.ReaderClient.IsNil() {
if config.ReaderClients.IsNil(r.cluster) {
log.Println("reader client is nil")
return
}

value, warnings, err := config.ReaderClient.GetCli().Query(context.Background(), promql, time.Now())
value, warnings, err := config.ReaderClients.GetCli(r.cluster).Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
return
@@ -672,21 +719,21 @@ func (r RecordingRuleEval) Work() {
ts := conv.ConvertToTimeSeries(value, r.rule)
if len(ts) != 0 {
for _, v := range ts {
writer.Writers.PushSample(r.rule.Name, v)
writer.Writers.PushSample(r.rule.Name, v, r.cluster)
}
}
}

type RuleEvalForExternalType struct {
sync.RWMutex
rules map[int64]RuleEval
rules map[string]RuleEval // key: hash of ruleid_promevalinterval_promql_cluster
}

var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[int64]RuleEval)}
var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[string]RuleEval)}

func (re *RuleEvalForExternalType) Build() {
rids := memsto.AlertRuleCache.GetRuleIds()
rules := make(map[int64]*models.AlertRule)
rules := make(map[string]*RuleSimpleInfo)

for i := 0; i < len(rids); i++ {
rule := memsto.AlertRuleCache.Get(rids[i])
@@ -694,16 +741,34 @@ func (re *RuleEvalForExternalType) Build() {
continue
}

re.Lock()
rules[rule.Id] = rule
re.Unlock()
var clusters []string
if rule.Cluster == models.ClusterAll {
clusters = config.ReaderClients.GetClusterNames()
} else {
clusters = strings.Fields(rule.Cluster)
}

for _, cluster := range clusters {
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
cluster,
))
re.Lock()
rules[hash] = &RuleSimpleInfo{
Id: rule.Id,
Cluster: cluster,
}
re.Unlock()
}
}

// stop old
for rid := range re.rules {
if _, has := rules[rid]; !has {
for oldHash := range re.rules {
if _, has := rules[oldHash]; !has {
re.Lock()
delete(re.rules, rid)
delete(re.rules, oldHash)
re.Unlock()
}
}
@@ -711,13 +776,13 @@ func (re *RuleEvalForExternalType) Build() {
// start new
re.Lock()
defer re.Unlock()
for rid := range rules {
if _, has := re.rules[rid]; has {
for hash, ruleSimple := range rules {
if _, has := re.rules[hash]; has {
// already exists
continue
}

elst, err := models.AlertCurEventGetByRule(rules[rid].Id)
elst, err := models.AlertCurEventGetByRuleIdAndCluster(ruleSimple.Id, ruleSimple.Cluster)
if err != nil {
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
continue
@@ -731,27 +796,35 @@ func (re *RuleEvalForExternalType) Build() {
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
newRe := RuleEval{
rule: rules[rid],
rule: memsto.AlertRuleCache.Get(ruleSimple.Id),
quit: make(chan struct{}),
fires: fires,
pendings: NewAlertCurEventMap(),
cluster: ruleSimple.Cluster,
}

re.rules[rid] = newRe
re.rules[hash] = newRe
}
}

func (re *RuleEvalForExternalType) Get(rid int64) (RuleEval, bool) {
func (re *RuleEvalForExternalType) Get(rid int64, cluster string) (RuleEval, bool) {
re.RLock()
defer re.RUnlock()
rule := memsto.AlertRuleCache.Get(rid)
if rule == nil {
return RuleEval{}, false
}

re.RLock()
defer re.RUnlock()
if ret, has := re.rules[rid]; has {
// already exists
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
cluster,
))

if ret, has := re.rules[hash]; has {
return ret, has
}

return RuleEval{}, false
}

+ 4
- 9
src/server/idents/idents.go View File

@@ -41,7 +41,7 @@ func toRedis() {
return
}

if config.ReaderClient.IsNil() {
if config.ReaderClients.IsNil(config.C.ClusterName) {
return
}

@@ -53,7 +53,7 @@ func toRedis() {
Idents.Remove(key)
} else {
// use now as timestamp to redis
err := storage.Redis.HSet(context.Background(), redisKey(config.ReaderClient.GetClusterName()), key, now).Err()
err := storage.Redis.HSet(context.Background(), redisKey(config.C.ClusterName), key, now).Err()
if err != nil {
logger.Errorf("redis hset idents failed: %v", err)
}
@@ -96,7 +96,8 @@ func loopPushMetrics(ctx context.Context) {
}

func pushMetrics() {
isLeader, err := naming.IamLeader()
clusterName := config.C.ClusterName
isLeader, err := naming.IamLeader(clusterName)
if err != nil {
logger.Errorf("handle_idents: %v", err)
return
@@ -107,12 +108,6 @@ func pushMetrics() {
return
}

clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
logger.Warning("cluster name is blank")
return
}

// get all the target heartbeat timestamp
ret, err := storage.Redis.HGetAll(context.Background(), redisKey(clusterName)).Result()
if err != nil {


+ 12
- 6
src/server/memsto/alert_mute_cache.go View File

@@ -99,14 +99,20 @@ func loopSyncAlertMutes() {
func syncAlertMutes() error {
start := time.Now()

clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
AlertMuteCache.Reset()
logger.Warning("cluster name is blank")
clusterNames := config.ReaderClients.GetClusterNames()
if len(clusterNames) == 0 {
AlertRuleCache.Reset()
logger.Warning("cluster is blank")
return nil
}

stat, err := models.AlertMuteStatistics(clusterName)
var clusterName string
if len(clusterNames) == 1 {
// 兼容老版本监控数据上报
clusterName = clusterNames[0]
}

stat, err := models.AlertMuteStatistics("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteStatistics")
}
@@ -118,7 +124,7 @@ func syncAlertMutes() error {
return nil
}

lst, err := models.AlertMuteGetsByCluster(clusterName)
lst, err := models.AlertMuteGetsByCluster("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster")
}


+ 11
- 5
src/server/memsto/alert_rule_cache.go View File

@@ -96,14 +96,20 @@ func loopSyncAlertRules() {
func syncAlertRules() error {
start := time.Now()

clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
clusterNames := config.ReaderClients.GetClusterNames()
if len(clusterNames) == 0 {
AlertRuleCache.Reset()
logger.Warning("cluster name is blank")
logger.Warning("cluster is blank")
return nil
}

stat, err := models.AlertRuleStatistics(clusterName)
var clusterName string
if len(clusterNames) == 1 {
// 兼容老版本监控数据上报
clusterName = clusterNames[0]
}

stat, err := models.AlertRuleStatistics("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleStatistics")
}
@@ -115,7 +121,7 @@ func syncAlertRules() error {
return nil
}

lst, err := models.AlertRuleGetsByCluster(clusterName)
lst, err := models.AlertRuleGetsByCluster("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleGetsByCluster")
}


+ 11
- 5
src/server/memsto/alert_subsribe_cache.go View File

@@ -102,14 +102,20 @@ func loopSyncAlertSubscribes() {
func syncAlertSubscribes() error {
start := time.Now()

clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
clusterNames := config.ReaderClients.GetClusterNames()
if len(clusterNames) == 0 {
AlertSubscribeCache.Reset()
logger.Warning("cluster name is blank")
logger.Warning("cluster is blank")
return nil
}

stat, err := models.AlertSubscribeStatistics(clusterName)
var clusterName string
if len(clusterNames) == 1 {
// 兼容老版本监控数据上报
clusterName = clusterNames[0]
}

stat, err := models.AlertSubscribeStatistics("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertSubscribeStatistics")
}
@@ -121,7 +127,7 @@ func syncAlertSubscribes() error {
return nil
}

lst, err := models.AlertSubscribeGetsByCluster(clusterName)
lst, err := models.AlertSubscribeGetsByCluster("")
if err != nil {
return errors.WithMessage(err, "failed to exec AlertSubscribeGetsByCluster")
}


+ 1
- 1
src/server/memsto/busi_group_cache.go View File

@@ -79,7 +79,7 @@ func syncBusiGroups() error {
return errors.WithMessage(err, "failed to exec BusiGroupStatistics")
}

clusterName := config.ReaderClient.GetClusterName()
clusterName := config.C.ClusterName

if !BusiGroupCache.StatChanged(stat.Total, stat.LastUpdated) {
if clusterName != "" {


+ 9
- 3
src/server/memsto/recording_rule_cache.go View File

@@ -95,13 +95,19 @@ func loopSyncRecordingRules() {
func syncRecordingRules() error {
start := time.Now()

clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
clusterNames := config.ReaderClients.GetClusterNames()
if len(clusterNames) == 0 {
RecordingRuleCache.Reset()
logger.Warning("cluster name is blank")
logger.Warning("cluster is blank")
return nil
}

var clusterName string
// 只有一个集群,使用单集群模式,如果大于1个集群,则获取全部的规则
if len(clusterNames) == 1 {
clusterName = clusterNames[0]
}

stat, err := models.RecordingRuleStatistics(clusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec RecordingRuleStatistics")


+ 1
- 1
src/server/memsto/target_cache.go View File

@@ -103,7 +103,7 @@ func loopSyncTargets() {
func syncTargets() error {
start := time.Now()

clusterName := config.ReaderClient.GetClusterName()
clusterName := config.C.ClusterName
if clusterName == "" {
TargetCache.Reset()
logger.Warning("cluster name is blank")


+ 1
- 1
src/server/memsto/user_cache.go View File

@@ -124,7 +124,7 @@ func syncUsers() error {
return errors.WithMessage(err, "failed to exec UserStatistics")
}

clusterName := config.ReaderClient.GetClusterName()
clusterName := config.C.ClusterName

if !UserCache.StatChanged(stat.Total, stat.LastUpdated) {
if clusterName != "" {


+ 1
- 2
src/server/memsto/user_group_cache.go View File

@@ -106,8 +106,7 @@ func syncUserGroups() error {
return errors.WithMessage(err, "failed to exec UserGroupStatistics")
}

clusterName := config.ReaderClient.GetClusterName()

clusterName := config.C.ClusterName
if !UserGroupCache.StatChanged(stat.Total, stat.LastUpdated) {
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(0)


+ 26
- 30
src/server/naming/hashring.go View File

@@ -9,51 +9,47 @@ import (

const NodeReplicas = 500

type ConsistentHashRing struct {
type ClusterHashRingType struct {
sync.RWMutex
ring *consistent.Consistent
Rings map[string]*consistent.Consistent
}

// for alert_rule sharding
var HashRing = NewConsistentHashRing(int32(NodeReplicas), []string{})
var ClusterHashRing = ClusterHashRingType{Rings: make(map[string]*consistent.Consistent)}

func (chr *ConsistentHashRing) GetNode(pk string) (string, error) {
chr.RLock()
defer chr.RUnlock()

return chr.ring.Get(pk)
}

func (chr *ConsistentHashRing) Set(r *consistent.Consistent) {
chr.Lock()
defer chr.Unlock()
chr.ring = r
}

func (chr *ConsistentHashRing) GetRing() *consistent.Consistent {
chr.RLock()
defer chr.RUnlock()

return chr.ring
}

func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing {
ret := &ConsistentHashRing{ring: consistent.New()}
ret.ring.NumberOfReplicas = int(replicas)
func NewConsistentHashRing(replicas int32, nodes []string) *consistent.Consistent {
ret := consistent.New()
ret.NumberOfReplicas = int(replicas)
for i := 0; i < len(nodes); i++ {
ret.ring.Add(nodes[i])
ret.Add(nodes[i])
}
return ret
}

func RebuildConsistentHashRing(nodes []string) {
func RebuildConsistentHashRing(cluster string, nodes []string) {
r := consistent.New()
r.NumberOfReplicas = NodeReplicas
for i := 0; i < len(nodes); i++ {
r.Add(nodes[i])
}

HashRing.Set(r)
ClusterHashRing.Set(cluster, r)
logger.Infof("hash ring %s rebuild %+v", cluster, r.Members())
}

func (chr *ClusterHashRingType) GetNode(cluster, pk string) (string, error) {
chr.RLock()
defer chr.RUnlock()
_, exists := chr.Rings[cluster]
if !exists {
chr.Rings[cluster] = NewConsistentHashRing(int32(NodeReplicas), []string{})
}

return chr.Rings[cluster].Get(pk)
}

logger.Infof("hash ring rebuild %+v", r.Members())
func (chr *ClusterHashRingType) Set(cluster string, r *consistent.Consistent) {
chr.RLock()
defer chr.RUnlock()
chr.Rings[cluster] = r
}

+ 52
- 20
src/server/naming/heartbeat.go View File

@@ -14,9 +14,10 @@ import (
)

// local servers
var localss string
var localss map[string]string

func Heartbeat(ctx context.Context) error {
localss = make(map[string]string)
if err := heartbeat(); err != nil {
fmt.Println("failed to heartbeat:", err)
return err
@@ -37,35 +38,66 @@ func loopHeartbeat() {
}

func heartbeat() error {
cluster := ""
var clusters []string
var err error
if config.C.ReaderFrom == "config" {
cluster = config.C.ClusterName
}
// 在配置文件维护实例和集群的对应关系
for i := 0; i < len(config.C.Readers); i++ {
clusters = append(clusters, config.C.Readers[i].ClusterName)
err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", config.C.Readers[i].ClusterName, err)
continue
}
}
} else {
// 在页面上维护实例和集群的对应关系
clusters, err = models.AlertingEngineGetClusters(config.C.Heartbeat.Endpoint)
if err != nil {
return err
}
if len(clusters) == 0 {
// 实例刚刚部署,还没有在页面配置 cluster 的情况,先使用配置文件中的 cluster 上报心跳
for i := 0; i < len(config.C.Readers); i++ {
err := models.AlertingEngineHeartbeatWithCluster(config.C.Heartbeat.Endpoint, config.C.Readers[i].ClusterName)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", config.C.Readers[i].ClusterName, err)
continue
}
}
}

err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint, cluster)
if err != nil {
return err
err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint)
if err != nil {
return err
}
}

servers, err := ActiveServers()
if err != nil {
return err
}
for i := 0; i < len(clusters); i++ {
servers, err := ActiveServers(clusters[i])
if err != nil {
logger.Warningf("hearbeat %s get active server err:", clusters[i], err)
continue
}

sort.Strings(servers)
newss := strings.Join(servers, " ")

oldss, exists := localss[clusters[i]]
if exists && oldss == newss {
continue
}

sort.Strings(servers)
newss := strings.Join(servers, " ")
if newss != localss {
RebuildConsistentHashRing(servers)
localss = newss
RebuildConsistentHashRing(clusters[i], servers)
localss[clusters[i]] = newss
}

return nil
}

func ActiveServers() ([]string, error) {
cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint)
if err != nil {
return nil, err
func ActiveServers(cluster string) ([]string, error) {
if cluster == "" {
return nil, fmt.Errorf("cluster is empty")
}

// 30秒内有心跳,就认为是活的


+ 2
- 2
src/server/naming/leader.go View File

@@ -7,8 +7,8 @@ import (
"github.com/toolkits/pkg/logger"
)

func IamLeader() (bool, error) {
servers, err := ActiveServers()
func IamLeader(cluster string) (bool, error) {
servers, err := ActiveServers(cluster)
if err != nil {
logger.Errorf("failed to get active servers: %v", err)
return false, err


+ 1
- 1
src/server/router/router.go View File

@@ -69,7 +69,7 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) {
})

r.GET("/servers/active", func(c *gin.Context) {
lst, err := naming.ActiveServers()
lst, err := naming.ActiveServers(ginx.QueryStr(c, "cluster"))
ginx.NewRender(c).Data(lst, err)
})



+ 1
- 1
src/server/router/router_datadog.go View File

@@ -281,7 +281,7 @@ func datadogSeries(c *gin.Context) {
}

if succ > 0 {
cn := config.ReaderClient.GetClusterName()
cn := config.C.ClusterName
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "datadog").Add(float64(succ))
}


+ 4
- 8
src/server/router/router_event.go View File

@@ -7,7 +7,6 @@ import (

"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/engine"
promstat "github.com/didi/nightingale/v5/src/server/stat"

@@ -67,10 +66,7 @@ func pushEventToQueue(c *gin.Context) {
event.NotifyChannels = strings.Join(event.NotifyChannelsJSON, " ")
event.NotifyGroups = strings.Join(event.NotifyGroupsJSON, " ")

cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterAlertsTotal.WithLabelValues(cn).Inc()
}
promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc()

engine.LogEvent(event, "http_push_queue")
if !engine.EventQueue.PushFront(event) {
@@ -91,7 +87,7 @@ type eventForm struct {
func judgeEvent(c *gin.Context) {
var form eventForm
ginx.BindJSON(c, &form)
re, exists := engine.RuleEvalForExternal.Get(form.RuleId)
re, exists := engine.RuleEvalForExternal.Get(form.RuleId, form.Cluster)
if !exists {
ginx.Bomb(200, "rule not exists")
}
@@ -104,7 +100,7 @@ func makeEvent(c *gin.Context) {
ginx.BindJSON(c, &events)
now := time.Now().Unix()
for i := 0; i < len(events); i++ {
re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId)
re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId, events[i].Cluster)
logger.Debugf("handle event:%+v exists:%v", events[i], exists)
if !exists {
ginx.Bomb(200, "rule not exists")
@@ -114,7 +110,7 @@ func makeEvent(c *gin.Context) {
go re.MakeNewEvent("http", now, events[i].Cluster, events[i].Vectors)
} else {
for _, vector := range events[i].Vectors {
hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key))
hash := str.MD5(fmt.Sprintf("%d_%s_%s", events[i].RuleId, vector.Key, events[i].Cluster))
now := vector.Timestamp
go re.RecoverEvent(hash, now, vector.Value)
}


+ 25
- 5
src/server/router/router_memsto.go View File

@@ -2,10 +2,14 @@ package router

import (
"net/http"
"strconv"
"strings"

"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"

"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/naming"
@@ -48,12 +52,28 @@ func userGroupGet(c *gin.Context) {
}

func alertRuleLocationGet(c *gin.Context) {
id := ginx.QueryStr(c, "id")
node, err := naming.HashRing.GetNode(id)
if err != nil {
http.Error(c.Writer, err.Error(), http.StatusInternalServerError)
id := ginx.QueryInt64(c, "id")
rule := memsto.AlertRuleCache.Get(id)
if rule == nil {
http.Error(c.Writer, "rule not found", http.StatusNotFound)
return
}
var clusters []string
if rule.Cluster == models.ClusterAll {
clusters = config.ReaderClients.GetClusterNames()
} else {
clusters = strings.Fields(rule.Cluster)
}

var arr []gin.H
for _, cluster := range clusters {
node, err := naming.ClusterHashRing.GetNode(cluster, strconv.FormatInt(id, 10))
if err != nil {
http.Error(c.Writer, err.Error(), http.StatusInternalServerError)
return
}
arr = append(arr, gin.H{"id": id, "cluster": cluster, "node": node})
}

c.JSON(200, gin.H{"id": id, "node": node})
c.JSON(200, gin.H{"list": arr})
}

+ 1
- 1
src/server/router/router_openfalcon.go View File

@@ -228,7 +228,7 @@ func falconPush(c *gin.Context) {
}

if succ > 0 {
cn := config.ReaderClient.GetClusterName()
cn := config.C.ClusterName
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "openfalcon").Add(float64(succ))
}


+ 1
- 1
src/server/router/router_opentsdb.go View File

@@ -222,7 +222,7 @@ func handleOpenTSDB(c *gin.Context) {
}

if succ > 0 {
cn := config.ReaderClient.GetClusterName()
cn := config.C.ClusterName
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "opentsdb").Add(float64(succ))
}


+ 3
- 3
src/server/router/router_prom.go View File

@@ -37,12 +37,12 @@ func queryPromql(c *gin.Context) {
var f promqlForm
ginx.BindJSON(c, &f)

if config.ReaderClient.IsNil() {
if config.ReaderClients.IsNil(config.C.ClusterName) {
c.String(500, "reader client is nil")
return
}

value, warnings, err := config.ReaderClient.GetCli().Query(c.Request.Context(), f.PromQL, time.Now())
value, warnings, err := config.ReaderClients.GetCli(config.C.ClusterName).Query(c.Request.Context(), f.PromQL, time.Now())
if err != nil {
c.String(500, "promql:%s error:%v", f.PromQL, err)
return
@@ -160,7 +160,7 @@ func remoteWrite(c *gin.Context) {
}
}

cn := config.ReaderClient.GetClusterName()
cn := config.C.ClusterName
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "prometheus").Add(float64(count))
}


+ 2
- 2
src/server/stat/stat.go View File

@@ -43,12 +43,12 @@ var (
}, []string{"cluster"})

// 内存中的告警事件队列的长度
GaugeAlertQueueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
GaugeAlertQueueSize = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "alert_queue_size",
Help: "The size of alert queue.",
}, []string{"cluster"})
})

// 数据转发队列,各个队列的长度
GaugeSampleQueueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{


+ 37
- 18
src/server/writer/writer.go View File

@@ -37,7 +37,7 @@ func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSerie
return ritems
}

func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[string]string) {
func (w WriterType) Write(cluster string, index int, items []*prompb.TimeSeries, headers ...map[string]string) {
if len(items) == 0 {
return
}
@@ -49,9 +49,8 @@ func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[

start := time.Now()
defer func() {
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.ForwardDuration.WithLabelValues(cn, fmt.Sprint(index)).Observe(time.Since(start).Seconds())
if cluster != "" {
promstat.ForwardDuration.WithLabelValues(cluster, fmt.Sprint(index)).Observe(time.Since(start).Seconds())
}
}()

@@ -130,17 +129,28 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
type WritersType struct {
globalOpt config.WriterGlobalOpt
backends map[string]WriterType
queues map[int]*SafeListLimited
queues map[string]map[int]*SafeListLimited
}

func (ws *WritersType) Put(name string, writer WriterType) {
ws.backends[name] = writer
}

func (ws *WritersType) PushSample(ident string, v interface{}) {
func (ws *WritersType) PushSample(ident string, v interface{}, clusters ...string) {
hashkey := crc32.ChecksumIEEE([]byte(ident)) % uint32(ws.globalOpt.QueueCount)

c, ok := ws.queues[int(hashkey)]
cluster := config.C.ClusterName
if len(clusters) > 0 {
cluster = clusters[0]
}

if _, ok := ws.queues[cluster]; !ok {
// 待写入的集群不存在
logger.Warningf("Write cluster:%s not found, v:%+v", cluster, v)
return
}

c, ok := ws.queues[cluster][int(hashkey)]
if ok {
succ := c.PushFront(v)
if !succ {
@@ -149,7 +159,7 @@ func (ws *WritersType) PushSample(ident string, v interface{}) {
}
}

func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited) {
func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited, clusterName string) {
for {
series := ch.PopBack(ws.globalOpt.QueuePopSize)
if len(series) == 0 {
@@ -158,7 +168,10 @@ func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited) {
}

for key := range ws.backends {
go ws.backends[key].Write(index, series)
if ws.backends[key].Opts.Name != clusterName {
continue
}
go ws.backends[key].Write(clusterName, index, series)
}
}
}
@@ -173,11 +186,15 @@ var Writers = NewWriters()

func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error {
Writers.globalOpt = globalOpt
Writers.queues = make(map[int]*SafeListLimited)

for i := 0; i < globalOpt.QueueCount; i++ {
Writers.queues[i] = NewSafeListLimited(Writers.globalOpt.QueueMaxSize)
go Writers.StartConsumer(i, Writers.queues[i])
Writers.queues = make(map[string]map[int]*SafeListLimited)
for _, opt := range opts {
if _, ok := Writers.queues[opt.Name]; !ok {
Writers.queues[opt.Name] = make(map[int]*SafeListLimited)
for i := 0; i < globalOpt.QueueCount; i++ {
Writers.queues[opt.Name][i] = NewSafeListLimited(Writers.globalOpt.QueueMaxSize)
go Writers.StartConsumer(i, Writers.queues[opt.Name][i], opt.Name)
}
}
}

go reportChanSize()
@@ -218,16 +235,18 @@ func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error {
}

func reportChanSize() {
clusterName := config.ReaderClient.GetClusterName()
clusterName := config.C.ClusterName
if clusterName == "" {
return
}

for {
time.Sleep(time.Second * 3)
for i, c := range Writers.queues {
size := c.Len()
promstat.GaugeSampleQueueSize.WithLabelValues(clusterName, fmt.Sprint(i)).Set(float64(size))
for cluster, m := range Writers.queues {
for i, c := range m {
size := c.Len()
promstat.GaugeSampleQueueSize.WithLabelValues(cluster, fmt.Sprint(i)).Set(float64(size))
}
}
}
}

+ 2
- 0
src/webapi/router/router.go View File

@@ -301,6 +301,8 @@ func configRoute(r *gin.Engine, version string) {

pages.GET("/servers", auth(), admin(), serversGet)
pages.PUT("/server/:id", auth(), admin(), serverBindCluster)
pages.POST("/servers", auth(), admin(), serverAddCluster)
pages.DELETE("/servers", auth(), admin(), serverDelCluster)
}

service := r.Group("/v1/n9e")


+ 17
- 1
src/webapi/router/router_server.go View File

@@ -13,7 +13,8 @@ func serversGet(c *gin.Context) {
}

type serverBindClusterForm struct {
Cluster string `json:"cluster"`
Cluster string `json:"cluster"`
Instance string `json:"instance"`
}

// 用户为某个 n9e-server 分配一个集群,也可以清空,设置cluster为空字符串即可
@@ -33,3 +34,18 @@ func serverBindCluster(c *gin.Context) {

ginx.NewRender(c).Message(ae.UpdateCluster(f.Cluster))
}

func serverAddCluster(c *gin.Context) {
var f serverBindClusterForm
ginx.BindJSON(c, &f)

ginx.NewRender(c).Message(models.AlertingEngineAdd(f.Instance, f.Cluster))
}

func serverDelCluster(c *gin.Context) {
var f idsForm
ginx.BindJSON(c, &f)
f.Verify()

ginx.NewRender(c).Message(models.AlertingEngineDel(f.Ids))
}

Loading…
Cancel
Save