diff --git a/go.mod b/go.mod index 055c5cbb..3f46ccf2 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/shirou/gopsutil v2.20.7+incompatible github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 github.com/spf13/viper v1.7.1 + github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.6.1 github.com/toolkits/pkg v1.1.2 github.com/ugorji/go/codec v1.1.7 diff --git a/go.sum b/go.sum index aba3521b..fe87eee5 100644 --- a/go.sum +++ b/go.sum @@ -363,6 +363,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/src/models/node.go b/src/models/node.go index cdab7911..120cedf8 100644 --- a/src/models/node.go +++ b/src/models/node.go @@ -86,6 +86,10 @@ func NodeGet(where string, args ...interface{}) (*Node, error) { return &obj, nil } +func NodeGetById(id int64) (*Node, error) { + return NodeGet("id=?", id) +} + // NodeGets 在所有节点范围查询,比如管理员看服务树,就需要load所有数据 func NodeGets(where string, args ...interface{}) (nodes []Node, err error) { if where != "" { diff --git a/src/models/resource_service.go b/src/models/resource_service.go new file mode 100644 index 00000000..ba177c15 --- /dev/null +++ b/src/models/resource_service.go @@ -0,0 +1,133 @@ +package models + +import ( + "fmt" +) + +type ResourceRegisterItem struct { + UUID string `json:"uuid"` + Ident string `json:"ident"` + Name string `json:"name"` + Labels string `json:"labels"` + Extend string `json:"extend"` + Cate string `json:"cate"` + NID int64 `json:"nid"` +} + +func (i ResourceRegisterItem) Validate() error { + if i.Cate == "" { + return fmt.Errorf("cate is blank") + } + + if i.UUID == "" { + return fmt.Errorf("uuid is blank") + } + + if i.Ident == "" { + return fmt.Errorf("ident is blank") + } + + return nil +} + +// ResourceRegisterFor3rd 用于第三方资源注册 errCode=400: 表示传入的参数有问题 errCode=500: 表示DB出了问题 +// 之所以要通过errCode对错误做区分,是因为这个方法同时被同步和异步两种方式调用,上层需要依托这个信息做判断 +func ResourceRegisterFor3rd(item ResourceRegisterItem) (errCode int, err error) { + err = item.Validate() + if err != nil { + return 400, err + } + + node, err := NodeGetById(item.NID) + if err != nil { + return 500, err + } + + if node == nil { + return 400, fmt.Errorf("node not found") + } + + if node.Cate != "project" { + return 400, fmt.Errorf("node not project") + } + + res, err := ResourceGet("uuid=?", item.UUID) + if err != nil { + return 500, err + } + + if res != nil { + // 这个资源之前就已经存在过了,这次可能是更新了部分字段 + res.Name = item.Name + res.Labels = item.Labels + res.Extend = item.Extend + err = res.Update("name", "labels", "extend") + if err != nil { + return 500, err + } + } else { + // 之前没有过这个资源,在RDB注册这个资源 + res = new(Resource) + res.UUID = item.UUID + res.Ident = item.Ident + res.Name = item.Name + res.Labels = item.Labels + res.Extend = item.Extend + res.Cate = item.Cate + res.Tenant = node.Tenant() + err = res.Save() + if err != nil { + return 500, err + } + } + + // 检查这个资源是否有挂载过,没有的话就补齐挂载关系,这个动作是幂等的 + leafPath := node.Path + "." + item.Cate + leafNode, err := NodeGet("path=?", leafPath) + if err != nil { + return 500, err + } + + // 第一个挂载位置:项目下面的${cate}节点 + if leafNode == nil { + leafNode, err = node.CreateChild(item.Cate, item.Cate, "", "resource", "system", 1, 1, []int64{}) + if err != nil { + return 500, err + } + } + + err = leafNode.Bind([]int64{res.Id}) + if err != nil { + return 500, err + } + + // 第二个挂载位置:inner.${cate} + innerCatePath := "inner." + item.Cate + innerCateNode, err := NodeGet("path=?", innerCatePath) + if err != nil { + return 500, err + } + + if innerCateNode == nil { + innerNode, err := NodeGet("path=?", "inner") + if err != nil { + return 500, err + } + + if innerNode == nil { + return 500, fmt.Errorf("inner node not exists, maybe forget init system") + } + + innerCateNode, err = innerNode.CreateChild(item.Cate, item.Cate, "", "resource", "system", 1, 1, []int64{}) + if err != nil { + return 500, err + } + } + + err = innerCateNode.Bind([]int64{res.Id}) + if err != nil { + return 500, err + } + + return 0, nil +} diff --git a/src/modules/rdb/config/yaml.go b/src/modules/rdb/config/yaml.go index 0d3282b0..72b206c3 100644 --- a/src/modules/rdb/config/yaml.go +++ b/src/modules/rdb/config/yaml.go @@ -9,13 +9,14 @@ import ( ) type ConfigT struct { - Logger loggeri.Config `yaml:"logger"` - HTTP httpSection `yaml:"http"` - LDAP ldapSection `yaml:"ldap"` - SSO ssoSection `yaml:"sso"` - Tokens []string `yaml:"tokens"` - Redis redisSection `yaml:"redis"` - Sender map[string]senderSection `yaml:"sender"` + Logger loggeri.Config `yaml:"logger"` + HTTP httpSection `yaml:"http"` + LDAP ldapSection `yaml:"ldap"` + SSO ssoSection `yaml:"sso"` + Tokens []string `yaml:"tokens"` + Redis redisSection `yaml:"redis"` + Sender map[string]senderSection `yaml:"sender"` + RabbitMQ rabbitmqSection `yaml:"rabbitmq"` } type ssoSection struct { @@ -74,6 +75,11 @@ type timeoutSection struct { Write int `yaml:"write"` } +type rabbitmqSection struct { + Addr string `yaml:"addr"` + Queue string `yaml:"queue"` +} + var Config *ConfigT // Parse configuration file diff --git a/src/modules/rdb/http/router_resource.go b/src/modules/rdb/http/router_resource.go index 1ba17e3e..83baedde 100644 --- a/src/modules/rdb/http/router_resource.go +++ b/src/modules/rdb/http/router_resource.go @@ -330,7 +330,7 @@ func (f v1ResourcesRegisterItem) Validate() { // 资源注册后面要用MQ的方式,不能用HTTP接口,RDB可能挂,数据库可能挂,如果RDB或数据库挂了,子系统就会注册资源失败 // MQ的方式就不怕RDB挂掉了,使用MQ的手工ack方式,只有确认资源正常入库了才发送ack给MQ func v1ResourcesRegisterPost(c *gin.Context) { - var items []v1ResourcesRegisterItem + var items []models.ResourceRegisterItem bind(c, &items) count := len(items) @@ -339,66 +339,10 @@ func v1ResourcesRegisterPost(c *gin.Context) { } for i := 0; i < count; i++ { - items[i].Validate() - - node := Node(items[i].NID) - if node.Cate != "project" { - bomb("node not project") - } - - res, err := models.ResourceGet("uuid=?", items[i].UUID) - dangerous(err) - - if res != nil { - // 这个资源之前就已经存在过了,这次可能是更新了部分字段 - res.Name = items[i].Name - res.Labels = items[i].Labels - res.Extend = items[i].Extend - dangerous(res.Update("name", "labels", "extend")) - } else { - // 之前没有过这个资源,在RDB注册这个资源 - res = new(models.Resource) - res.UUID = items[i].UUID - res.Ident = items[i].Ident - res.Name = items[i].Name - res.Labels = items[i].Labels - res.Extend = items[i].Extend - res.Cate = items[i].Cate - res.Tenant = node.Tenant() - dangerous(res.Save()) - } - - // 检查这个资源是否有挂载过,没有的话就补齐挂载关系,这个动作是幂等的 - leafPath := node.Path + "." + items[i].Cate - leafNode, err := models.NodeGet("path=?", leafPath) - dangerous(err) - - // 第一个挂载位置:项目下面的${cate}节点 - if leafNode == nil { - leafNode, err = node.CreateChild(items[i].Cate, items[i].Cate, "", "resource", "system", 1, 1, []int64{}) + errCode, err := models.ResourceRegisterFor3rd(items[i]) + if errCode != 0 { dangerous(err) } - - dangerous(leafNode.Bind([]int64{res.Id})) - - // 第二个挂载位置:inner.${cate} - innerCatePath := "inner." + items[i].Cate - innerCateNode, err := models.NodeGet("path=?", innerCatePath) - dangerous(err) - - if innerCateNode == nil { - innerNode, err := models.NodeGet("path=?", "inner") - dangerous(err) - - if innerNode == nil { - bomb("inner node not exists") - } - - innerCateNode, err = innerNode.CreateChild(items[i].Cate, items[i].Cate, "", "resource", "system", 1, 1, []int64{}) - dangerous(err) - } - - dangerous(innerCateNode.Bind([]int64{res.Id})) } renderMessage(c, nil) diff --git a/src/modules/rdb/rabbitmq/conn.go b/src/modules/rdb/rabbitmq/conn.go new file mode 100644 index 00000000..bee5089b --- /dev/null +++ b/src/modules/rdb/rabbitmq/conn.go @@ -0,0 +1,25 @@ +package rabbitmq + +import ( + "log" + + "github.com/streadway/amqp" +) + +var ( + conn *amqp.Connection + exit = make(chan bool) +) + +func Init(url string) { + var err error + conn, err = amqp.Dial(url) + if err != nil { + log.Fatal(err) + } +} + +func Shutdown() { + conn.Close() + exit <- true +} diff --git a/src/modules/rdb/rabbitmq/queue_consume.go b/src/modules/rdb/rabbitmq/queue_consume.go new file mode 100644 index 00000000..633f6ccb --- /dev/null +++ b/src/modules/rdb/rabbitmq/queue_consume.go @@ -0,0 +1,90 @@ +package rabbitmq + +import ( + "time" + + "github.com/toolkits/pkg/logger" +) + +func Consume(queueName string) { + go func(queueName string) { + for { + sleep := consume(queueName) + if sleep { + time.Sleep(300 * time.Millisecond) + } + + if _, ok := <-exit; ok { + return + } + } + }(queueName) +} + +// 如果操作MQ出现问题,或者没有load到数据,就sleep一下 +func consume(queueName string) bool { + ch, err := conn.Channel() + if err != nil { + logger.Error(err) + return true + } + + defer ch.Close() + + q, err := ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + logger.Error(err) + return true + } + + err = ch.Qos( + 0, // prefetch count + 0, // prefetch size + false, // global + ) + if err != nil { + logger.Error(err) + return true + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + logger.Error(err) + return true + } + + size := 0 + for d := range msgs { + size++ + logger.Infof("rabbitmq consume message: %s", d.Body) + + if handleMessage(d.Body) { + d.Ack(true) + } else { + // 底层代码认为不应该ack,说明处理的过程出现问题,可能是DB有问题之类的,sleep一下 + return true + } + } + + if size == 0 { + // MQ里没有消息,就sleep一下,否则上层代码一直在死循环空转,浪费算力 + return true + } + + return false +} diff --git a/src/modules/rdb/rabbitmq/request_handler.go b/src/modules/rdb/rabbitmq/request_handler.go new file mode 100644 index 00000000..a363dc3b --- /dev/null +++ b/src/modules/rdb/rabbitmq/request_handler.go @@ -0,0 +1,125 @@ +package rabbitmq + +import ( + "encoding/json" + + "github.com/toolkits/pkg/logger" + + "github.com/didi/nightingale/src/models" +) + +type MQRequest struct { + Method string `json:"method"` + Payload interface{} `json:"payload"` +} + +// 返回的bool值代表是否让上层给mq发送ack +func handleMessage(msgBody []byte) bool { + if len(msgBody) <= 0 { + logger.Warning("msg body is blank") + // 这是个异常消息,需要ack并丢弃 + return true + } + + var req MQRequest + err := json.Unmarshal(msgBody, &req) + if err != nil { + logger.Warning("unmarshal msg body fail") + return true + } + + if req.Method == "" { + logger.Warning("mq_request.method is blank") + return true + } + + logger.Infof("mq_request, method: %s, payload: %v", req.Method, req.Payload) + + jsonBytes, err := json.Marshal(req.Payload) + if err != nil { + logger.Warning("mq_request.payload marshal fail: ", err) + return true + } + + err = dispatchHandler(req.Method, jsonBytes) + if err != nil { + // 如果处理的有问题,可能是后端DB挂了,不能ack,等DB恢复了还可以继续处理 + return false + } + + return true +} + +func dispatchHandler(method string, jsonBytes []byte) error { + switch method { + case "oplog_add": + return oplogAdd(jsonBytes) + case "resource_register": + return resourceRegister(jsonBytes) + case "resource_unregister": + return resourceUnregister(jsonBytes) + default: + logger.Warning("mq_request.method not support") + return nil + } +} + +// 第三方系统通过MQ把操作日志推给RDB保存 +func oplogAdd(jsonBytes []byte) error { + var ol models.OperationLog + err := json.Unmarshal(jsonBytes, &ol) + if err != nil { + // 传入的数据不合理,无法decode,这种数据要被消费丢掉 + logger.Error("cannot unmarshal OperationLog: ", err) + return nil + } + + return ol.New() +} + +// 第三方系统,比如RDS、Redis等,资源创建了,要注册到RDB +func resourceRegister(jsonBytes []byte) error { + var item models.ResourceRegisterItem + err := json.Unmarshal(jsonBytes, &item) + if err != nil { + logger.Warning(err) + return nil + } + + errCode, err := models.ResourceRegisterFor3rd(item) + if errCode == 0 { + return nil + } + + if errCode == 400 { + logger.Warningf("item invalid: %v", err) + return nil + } + + // errCode == 500 + logger.Errorf("system internal error: %v", err) + return err +} + +// 第三方系统,比如RDS、Redis等,资源销毁了,要通知到RDB +func resourceUnregister(jsonBytes []byte) error { + var uuids []string + err := json.Unmarshal(jsonBytes, &uuids) + if err != nil { + logger.Error(err) + // 这种错误不需要重试,所以也就不需要return err了 + return nil + } + + if len(uuids) == 0 { + return nil + } + + err = models.ResourceUnregister(uuids) + if err != nil { + logger.Error(err) + return err + } + + return nil +} diff --git a/src/modules/rdb/rdb.go b/src/modules/rdb/rdb.go index 13afd261..4004da28 100644 --- a/src/modules/rdb/rdb.go +++ b/src/modules/rdb/rdb.go @@ -17,6 +17,7 @@ import ( "github.com/didi/nightingale/src/modules/rdb/config" "github.com/didi/nightingale/src/modules/rdb/cron" "github.com/didi/nightingale/src/modules/rdb/http" + "github.com/didi/nightingale/src/modules/rdb/rabbitmq" "github.com/didi/nightingale/src/modules/rdb/redisc" "github.com/didi/nightingale/src/modules/rdb/ssoc" ) @@ -62,10 +63,16 @@ func main() { ssoc.InitSSO() - // 初始化redis用来发送邮件短信等 + // 初始化 redis 用来发送邮件短信等 redisc.InitRedis() cron.InitWorker() + // 初始化 rabbitmq 处理部分异步逻辑 + if len(config.Config.RabbitMQ.Addr) > 0 { + rabbitmq.Init(config.Config.RabbitMQ.Addr) + rabbitmq.Consume(config.Config.RabbitMQ.Queue) + } + go cron.ConsumeMail() go cron.ConsumeSms() go cron.ConsumeVoice() @@ -94,5 +101,10 @@ func endingProc() { logger.Close() http.Shutdown() redisc.CloseRedis() - fmt.Println("portal stopped successfully") + + if len(config.Config.RabbitMQ.Addr) > 0 { + rabbitmq.Shutdown() + } + + fmt.Println("stopped successfully") }