You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

postgresql.go 9.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package postgresql
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "time"
  8. "github.com/ccfos/nightingale/v6/datasource"
  9. "github.com/ccfos/nightingale/v6/pkg/macros"
  10. "github.com/ccfos/nightingale/v6/dskit/postgres"
  11. "github.com/ccfos/nightingale/v6/dskit/sqlbase"
  12. "github.com/ccfos/nightingale/v6/dskit/types"
  13. "github.com/ccfos/nightingale/v6/models"
  14. "github.com/mitchellh/mapstructure"
  15. "github.com/toolkits/pkg/logger"
  16. )
  17. const (
  18. PostgreSQLType = "pgsql"
  19. )
  20. var (
  21. regx = `(?i)from\s+((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))`
  22. )
  23. func init() {
  24. datasource.RegisterDatasource(PostgreSQLType, new(PostgreSQL))
  25. }
  26. type PostgreSQL struct {
  27. Shards []*postgres.PostgreSQL `json:"pgsql.shards" mapstructure:"pgsql.shards"`
  28. }
  29. type QueryParam struct {
  30. Ref string `json:"ref" mapstructure:"ref"`
  31. Database string `json:"database" mapstructure:"database"`
  32. Table string `json:"table" mapstructure:"table"`
  33. SQL string `json:"sql" mapstructure:"sql"`
  34. Keys datasource.Keys `json:"keys" mapstructure:"keys"`
  35. From int64 `json:"from" mapstructure:"from"`
  36. To int64 `json:"to" mapstructure:"to"`
  37. }
  38. func (p *PostgreSQL) InitClient() error {
  39. if len(p.Shards) == 0 {
  40. return fmt.Errorf("not found postgresql addr, please check datasource config")
  41. }
  42. for _, shard := range p.Shards {
  43. if db, err := shard.NewConn(context.TODO(), "postgres"); err != nil {
  44. defer sqlbase.CloseDB(db)
  45. return err
  46. }
  47. }
  48. return nil
  49. }
  50. func (p *PostgreSQL) Init(settings map[string]interface{}) (datasource.Datasource, error) {
  51. newest := new(PostgreSQL)
  52. err := mapstructure.Decode(settings, newest)
  53. return newest, err
  54. }
  55. func (p *PostgreSQL) Validate(ctx context.Context) error {
  56. if len(p.Shards) == 0 || len(strings.TrimSpace(p.Shards[0].Addr)) == 0 {
  57. return fmt.Errorf("postgresql addr is invalid, please check datasource setting")
  58. }
  59. if len(strings.TrimSpace(p.Shards[0].User)) == 0 {
  60. return fmt.Errorf("postgresql user is invalid, please check datasource setting")
  61. }
  62. return nil
  63. }
  64. // Equal compares whether two objects are the same, used for caching
  65. func (p *PostgreSQL) Equal(d datasource.Datasource) bool {
  66. newest, ok := d.(*PostgreSQL)
  67. if !ok {
  68. logger.Errorf("unexpected plugin type, expected is postgresql")
  69. return false
  70. }
  71. if len(p.Shards) == 0 || len(newest.Shards) == 0 {
  72. return false
  73. }
  74. oldShard := p.Shards[0]
  75. newShard := newest.Shards[0]
  76. if oldShard.Addr != newShard.Addr {
  77. return false
  78. }
  79. if oldShard.User != newShard.User {
  80. return false
  81. }
  82. if oldShard.Password != newShard.Password {
  83. return false
  84. }
  85. if oldShard.MaxQueryRows != newShard.MaxQueryRows {
  86. return false
  87. }
  88. if oldShard.Timeout != newShard.Timeout {
  89. return false
  90. }
  91. if oldShard.MaxIdleConns != newShard.MaxIdleConns {
  92. return false
  93. }
  94. if oldShard.MaxOpenConns != newShard.MaxOpenConns {
  95. return false
  96. }
  97. if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
  98. return false
  99. }
  100. return true
  101. }
  102. func (p *PostgreSQL) ShowDatabases(ctx context.Context) ([]string, error) {
  103. return p.Shards[0].ShowDatabases(ctx, "")
  104. }
  105. func (p *PostgreSQL) ShowTables(ctx context.Context, database string) ([]string, error) {
  106. p.Shards[0].DB = database
  107. rets, err := p.Shards[0].ShowTables(ctx, "")
  108. if err != nil {
  109. return nil, err
  110. }
  111. tables := make([]string, 0, len(rets))
  112. for scheme, tabs := range rets {
  113. for _, tab := range tabs {
  114. tables = append(tables, scheme+"."+tab)
  115. }
  116. }
  117. return tables, nil
  118. }
  119. func (p *PostgreSQL) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
  120. return nil, nil
  121. }
  122. func (p *PostgreSQL) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
  123. return nil, nil
  124. }
  125. func (p *PostgreSQL) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
  126. return nil, nil
  127. }
  128. func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models.DataResp, error) {
  129. postgresqlQueryParam := new(QueryParam)
  130. if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
  131. return nil, err
  132. }
  133. postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
  134. if strings.Contains(postgresqlQueryParam.SQL, "$__") {
  135. var err error
  136. postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
  137. if err != nil {
  138. return nil, err
  139. }
  140. }
  141. if postgresqlQueryParam.Database != "" {
  142. p.Shards[0].DB = postgresqlQueryParam.Database
  143. } else {
  144. db, err := parseDBName(postgresqlQueryParam.SQL)
  145. if err != nil {
  146. return nil, err
  147. }
  148. p.Shards[0].DB = db
  149. }
  150. timeout := p.Shards[0].Timeout
  151. if timeout == 0 {
  152. timeout = 60
  153. }
  154. timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
  155. defer cancel()
  156. items, err := p.Shards[0].QueryTimeseries(timeoutCtx, &sqlbase.QueryParam{
  157. Sql: postgresqlQueryParam.SQL,
  158. Keys: types.Keys{
  159. ValueKey: postgresqlQueryParam.Keys.ValueKey,
  160. LabelKey: postgresqlQueryParam.Keys.LabelKey,
  161. TimeKey: postgresqlQueryParam.Keys.TimeKey,
  162. },
  163. })
  164. if err != nil {
  165. logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
  166. return []models.DataResp{}, err
  167. }
  168. data := make([]models.DataResp, 0)
  169. for i := range items {
  170. data = append(data, models.DataResp{
  171. Ref: postgresqlQueryParam.Ref,
  172. Metric: items[i].Metric,
  173. Values: items[i].Values,
  174. })
  175. }
  176. // parse resp to time series data
  177. logger.Infof("req:%+v keys:%+v \n data:%v", postgresqlQueryParam, postgresqlQueryParam.Keys, data)
  178. return data, nil
  179. }
  180. func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interface{}, int64, error) {
  181. postgresqlQueryParam := new(QueryParam)
  182. if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
  183. return nil, 0, err
  184. }
  185. if postgresqlQueryParam.Database != "" {
  186. p.Shards[0].DB = postgresqlQueryParam.Database
  187. } else {
  188. db, err := parseDBName(postgresqlQueryParam.SQL)
  189. if err != nil {
  190. return nil, 0, err
  191. }
  192. p.Shards[0].DB = db
  193. }
  194. postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
  195. if strings.Contains(postgresqlQueryParam.SQL, "$__") {
  196. var err error
  197. postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
  198. if err != nil {
  199. return nil, 0, err
  200. }
  201. }
  202. timeout := p.Shards[0].Timeout
  203. if timeout == 0 {
  204. timeout = 60
  205. }
  206. timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
  207. defer cancel()
  208. items, err := p.Shards[0].Query(timeoutCtx, &sqlbase.QueryParam{
  209. Sql: postgresqlQueryParam.SQL,
  210. })
  211. if err != nil {
  212. logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
  213. return []interface{}{}, 0, err
  214. }
  215. logs := make([]interface{}, 0)
  216. for i := range items {
  217. logs = append(logs, items[i])
  218. }
  219. return logs, 0, nil
  220. }
  221. func (p *PostgreSQL) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
  222. postgresqlQueryParam := new(QueryParam)
  223. if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
  224. return nil, err
  225. }
  226. p.Shards[0].DB = postgresqlQueryParam.Database
  227. pairs := strings.Split(postgresqlQueryParam.Table, ".") // format: scheme.table_name
  228. scheme := ""
  229. table := postgresqlQueryParam.Table
  230. if len(pairs) == 2 {
  231. scheme = pairs[0]
  232. table = pairs[1]
  233. }
  234. return p.Shards[0].DescTable(ctx, scheme, table)
  235. }
  236. func parseDBName(sql string) (db string, err error) {
  237. re := regexp.MustCompile(regx)
  238. matches := re.FindStringSubmatch(sql)
  239. if len(matches) != 4 {
  240. return "", fmt.Errorf("no valid table name in format database.schema.table found")
  241. }
  242. return strings.Trim(matches[1], `"`), nil
  243. }
  244. // formatSQLDatabaseNameWithRegex 只对 dbname.scheme.tabname 格式进行数据库名称格式化,转为 "dbname".scheme.tabname
  245. // 在pgsql中,大小写是通过"" 双引号括起来区分的,默认pg都是转为小写的,所以这里转为 "dbname".scheme."tabname"
  246. func formatSQLDatabaseNameWithRegex(sql string) string {
  247. // 匹配 from dbname.scheme.table_name 的模式
  248. // 使用捕获组来精确匹配数据库名称,确保后面跟着scheme和table
  249. re := regexp.MustCompile(`(?i)\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)`)
  250. return re.ReplaceAllString(sql, `from "$1"."$2"."$3"`)
  251. }
  252. func extractColumns(sql string) ([]string, error) {
  253. // 将 SQL 转换为小写以简化匹配
  254. sql = strings.ToLower(sql)
  255. // 匹配 SELECT 和 FROM 之间的内容
  256. re := regexp.MustCompile(`select\s+(.*?)\s+from`)
  257. matches := re.FindStringSubmatch(sql)
  258. if len(matches) < 2 {
  259. return nil, fmt.Errorf("no columns found or invalid SQL syntax")
  260. }
  261. // 提取列部分
  262. columnsString := matches[1]
  263. // 分割列
  264. columns := splitColumns(columnsString)
  265. // 清理每个列名
  266. for i, col := range columns {
  267. columns[i] = strings.TrimSpace(col)
  268. }
  269. return columns, nil
  270. }
  271. func splitColumns(columnsString string) []string {
  272. var columns []string
  273. var currentColumn strings.Builder
  274. parenthesesCount := 0
  275. inQuotes := false
  276. for _, char := range columnsString {
  277. switch char {
  278. case '(':
  279. parenthesesCount++
  280. currentColumn.WriteRune(char)
  281. case ')':
  282. parenthesesCount--
  283. currentColumn.WriteRune(char)
  284. case '\'', '"':
  285. inQuotes = !inQuotes
  286. currentColumn.WriteRune(char)
  287. case ',':
  288. if parenthesesCount == 0 && !inQuotes {
  289. columns = append(columns, currentColumn.String())
  290. currentColumn.Reset()
  291. } else {
  292. currentColumn.WriteRune(char)
  293. }
  294. default:
  295. currentColumn.WriteRune(char)
  296. }
  297. }
  298. if currentColumn.Len() > 0 {
  299. columns = append(columns, currentColumn.String())
  300. }
  301. return columns
  302. }