|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- package postgresql
-
- import (
- "context"
- "fmt"
- "regexp"
- "strings"
- "time"
-
- "github.com/ccfos/nightingale/v6/datasource"
- "github.com/ccfos/nightingale/v6/pkg/macros"
-
- "github.com/ccfos/nightingale/v6/dskit/postgres"
- "github.com/ccfos/nightingale/v6/dskit/sqlbase"
- "github.com/ccfos/nightingale/v6/dskit/types"
- "github.com/ccfos/nightingale/v6/models"
- "github.com/mitchellh/mapstructure"
- "github.com/toolkits/pkg/logger"
- )
-
- const (
- PostgreSQLType = "pgsql"
- )
-
- var (
- regx = `(?i)from\s+((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))`
- )
-
- func init() {
- datasource.RegisterDatasource(PostgreSQLType, new(PostgreSQL))
- }
-
- type PostgreSQL struct {
- Shards []*postgres.PostgreSQL `json:"pgsql.shards" mapstructure:"pgsql.shards"`
- }
-
- type QueryParam struct {
- Ref string `json:"ref" mapstructure:"ref"`
- Database string `json:"database" mapstructure:"database"`
- Table string `json:"table" mapstructure:"table"`
- SQL string `json:"sql" mapstructure:"sql"`
- Keys datasource.Keys `json:"keys" mapstructure:"keys"`
- From int64 `json:"from" mapstructure:"from"`
- To int64 `json:"to" mapstructure:"to"`
- }
-
- func (p *PostgreSQL) InitClient() error {
- if len(p.Shards) == 0 {
- return fmt.Errorf("not found postgresql addr, please check datasource config")
- }
- for _, shard := range p.Shards {
- if db, err := shard.NewConn(context.TODO(), "postgres"); err != nil {
- defer sqlbase.CloseDB(db)
- return err
- }
- }
- return nil
- }
-
- func (p *PostgreSQL) Init(settings map[string]interface{}) (datasource.Datasource, error) {
- newest := new(PostgreSQL)
- err := mapstructure.Decode(settings, newest)
- return newest, err
- }
-
- func (p *PostgreSQL) Validate(ctx context.Context) error {
- if len(p.Shards) == 0 || len(strings.TrimSpace(p.Shards[0].Addr)) == 0 {
- return fmt.Errorf("postgresql addr is invalid, please check datasource setting")
- }
-
- if len(strings.TrimSpace(p.Shards[0].User)) == 0 {
- return fmt.Errorf("postgresql user is invalid, please check datasource setting")
- }
-
- return nil
- }
-
- // Equal compares whether two objects are the same, used for caching
- func (p *PostgreSQL) Equal(d datasource.Datasource) bool {
- newest, ok := d.(*PostgreSQL)
- if !ok {
- logger.Errorf("unexpected plugin type, expected is postgresql")
- return false
- }
-
- if len(p.Shards) == 0 || len(newest.Shards) == 0 {
- return false
- }
-
- oldShard := p.Shards[0]
- newShard := newest.Shards[0]
-
- if oldShard.Addr != newShard.Addr {
- return false
- }
-
- if oldShard.User != newShard.User {
- return false
- }
-
- if oldShard.Password != newShard.Password {
- return false
- }
-
- if oldShard.MaxQueryRows != newShard.MaxQueryRows {
- return false
- }
-
- if oldShard.Timeout != newShard.Timeout {
- return false
- }
-
- if oldShard.MaxIdleConns != newShard.MaxIdleConns {
- return false
- }
-
- if oldShard.MaxOpenConns != newShard.MaxOpenConns {
- return false
- }
-
- if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
- return false
- }
-
- return true
- }
-
- func (p *PostgreSQL) ShowDatabases(ctx context.Context) ([]string, error) {
- return p.Shards[0].ShowDatabases(ctx, "")
- }
-
- func (p *PostgreSQL) ShowTables(ctx context.Context, database string) ([]string, error) {
- p.Shards[0].DB = database
- rets, err := p.Shards[0].ShowTables(ctx, "")
- if err != nil {
- return nil, err
- }
- tables := make([]string, 0, len(rets))
- for scheme, tabs := range rets {
- for _, tab := range tabs {
- tables = append(tables, scheme+"."+tab)
- }
- }
- return tables, nil
- }
-
- func (p *PostgreSQL) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
- return nil, nil
- }
-
- func (p *PostgreSQL) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
- return nil, nil
- }
-
- func (p *PostgreSQL) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
- return nil, nil
- }
-
- func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models.DataResp, error) {
- postgresqlQueryParam := new(QueryParam)
- if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
- return nil, err
- }
-
- postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
- if strings.Contains(postgresqlQueryParam.SQL, "$__") {
- var err error
- postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
- if err != nil {
- return nil, err
- }
- }
- if postgresqlQueryParam.Database != "" {
- p.Shards[0].DB = postgresqlQueryParam.Database
- } else {
- db, err := parseDBName(postgresqlQueryParam.SQL)
- if err != nil {
- return nil, err
- }
- p.Shards[0].DB = db
- }
-
- timeout := p.Shards[0].Timeout
- if timeout == 0 {
- timeout = 60
- }
- timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
- defer cancel()
-
- items, err := p.Shards[0].QueryTimeseries(timeoutCtx, &sqlbase.QueryParam{
- Sql: postgresqlQueryParam.SQL,
- Keys: types.Keys{
- ValueKey: postgresqlQueryParam.Keys.ValueKey,
- LabelKey: postgresqlQueryParam.Keys.LabelKey,
- TimeKey: postgresqlQueryParam.Keys.TimeKey,
- },
- })
-
- if err != nil {
- logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
- return []models.DataResp{}, err
- }
- data := make([]models.DataResp, 0)
- for i := range items {
- data = append(data, models.DataResp{
- Ref: postgresqlQueryParam.Ref,
- Metric: items[i].Metric,
- Values: items[i].Values,
- })
- }
-
- // parse resp to time series data
- logger.Infof("req:%+v keys:%+v \n data:%v", postgresqlQueryParam, postgresqlQueryParam.Keys, data)
-
- return data, nil
- }
-
- func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interface{}, int64, error) {
- postgresqlQueryParam := new(QueryParam)
- if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
- return nil, 0, err
- }
- if postgresqlQueryParam.Database != "" {
- p.Shards[0].DB = postgresqlQueryParam.Database
- } else {
- db, err := parseDBName(postgresqlQueryParam.SQL)
- if err != nil {
- return nil, 0, err
- }
- p.Shards[0].DB = db
- }
-
- postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
- if strings.Contains(postgresqlQueryParam.SQL, "$__") {
- var err error
- postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
- if err != nil {
- return nil, 0, err
- }
- }
-
- timeout := p.Shards[0].Timeout
- if timeout == 0 {
- timeout = 60
- }
- timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
- defer cancel()
- items, err := p.Shards[0].Query(timeoutCtx, &sqlbase.QueryParam{
- Sql: postgresqlQueryParam.SQL,
- })
- if err != nil {
- logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
- return []interface{}{}, 0, err
- }
- logs := make([]interface{}, 0)
- for i := range items {
- logs = append(logs, items[i])
- }
-
- return logs, 0, nil
- }
-
- func (p *PostgreSQL) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
- postgresqlQueryParam := new(QueryParam)
- if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
- return nil, err
- }
- p.Shards[0].DB = postgresqlQueryParam.Database
- pairs := strings.Split(postgresqlQueryParam.Table, ".") // format: scheme.table_name
- scheme := ""
- table := postgresqlQueryParam.Table
- if len(pairs) == 2 {
- scheme = pairs[0]
- table = pairs[1]
- }
- return p.Shards[0].DescTable(ctx, scheme, table)
- }
-
- func parseDBName(sql string) (db string, err error) {
- re := regexp.MustCompile(regx)
- matches := re.FindStringSubmatch(sql)
- if len(matches) != 4 {
- return "", fmt.Errorf("no valid table name in format database.schema.table found")
- }
- return strings.Trim(matches[1], `"`), nil
- }
-
- // formatSQLDatabaseNameWithRegex 只对 dbname.scheme.tabname 格式进行数据库名称格式化,转为 "dbname".scheme.tabname
- // 在pgsql中,大小写是通过"" 双引号括起来区分的,默认pg都是转为小写的,所以这里转为 "dbname".scheme."tabname"
- func formatSQLDatabaseNameWithRegex(sql string) string {
- // 匹配 from dbname.scheme.table_name 的模式
- // 使用捕获组来精确匹配数据库名称,确保后面跟着scheme和table
- re := regexp.MustCompile(`(?i)\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)`)
-
- return re.ReplaceAllString(sql, `from "$1"."$2"."$3"`)
- }
-
- func extractColumns(sql string) ([]string, error) {
- // 将 SQL 转换为小写以简化匹配
- sql = strings.ToLower(sql)
-
- // 匹配 SELECT 和 FROM 之间的内容
- re := regexp.MustCompile(`select\s+(.*?)\s+from`)
- matches := re.FindStringSubmatch(sql)
-
- if len(matches) < 2 {
- return nil, fmt.Errorf("no columns found or invalid SQL syntax")
- }
-
- // 提取列部分
- columnsString := matches[1]
-
- // 分割列
- columns := splitColumns(columnsString)
-
- // 清理每个列名
- for i, col := range columns {
- columns[i] = strings.TrimSpace(col)
- }
-
- return columns, nil
- }
-
- func splitColumns(columnsString string) []string {
- var columns []string
- var currentColumn strings.Builder
- parenthesesCount := 0
- inQuotes := false
-
- for _, char := range columnsString {
- switch char {
- case '(':
- parenthesesCount++
- currentColumn.WriteRune(char)
- case ')':
- parenthesesCount--
- currentColumn.WriteRune(char)
- case '\'', '"':
- inQuotes = !inQuotes
- currentColumn.WriteRune(char)
- case ',':
- if parenthesesCount == 0 && !inQuotes {
- columns = append(columns, currentColumn.String())
- currentColumn.Reset()
- } else {
- currentColumn.WriteRune(char)
- }
- default:
- currentColumn.WriteRune(char)
- }
- }
-
- if currentColumn.Len() > 0 {
- columns = append(columns, currentColumn.String())
- }
-
- return columns
- }
|