You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package base
  18. import (
  19. "context"
  20. "database/sql"
  21. "database/sql/driver"
  22. "encoding/json"
  23. "fmt"
  24. "strconv"
  25. "strings"
  26. "github.com/arana-db/parser/mysql"
  27. "seata.apache.org/seata-go/pkg/compressor"
  28. "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
  29. "seata.apache.org/seata-go/pkg/datasource/sql/types"
  30. "seata.apache.org/seata-go/pkg/datasource/sql/undo"
  31. "seata.apache.org/seata-go/pkg/datasource/sql/undo/factor"
  32. "seata.apache.org/seata-go/pkg/datasource/sql/undo/parser"
  33. "seata.apache.org/seata-go/pkg/util/collection"
  34. serr "seata.apache.org/seata-go/pkg/util/errors"
  35. "seata.apache.org/seata-go/pkg/util/log"
  36. )
  37. const (
  38. compressorTypeKey = "compressorTypeKey"
  39. serializerKey = "serializerKey"
  40. defaultUndoLogTableName = " undo_log "
  41. )
  42. func getUndoLogTableName() string {
  43. if undo.UndoConfig.LogTable != "" {
  44. return undo.UndoConfig.LogTable
  45. }
  46. return defaultUndoLogTableName
  47. }
  48. func getCheckUndoLogTableExistSql() string {
  49. return "SELECT 1 FROM " + getUndoLogTableName() + " LIMIT 1"
  50. }
  51. func getInsertUndoLogSql() string {
  52. return "INSERT INTO " + getUndoLogTableName() + "(branch_id,xid,context,rollback_info,log_status,log_created,log_modified) VALUES (?, ?, ?, ?, ?, now(6), now(6))"
  53. }
  54. func getSelectUndoLogSql() string {
  55. return "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + getUndoLogTableName() + " WHERE branch_id = ? AND xid = ? FOR UPDATE"
  56. }
  57. func getDeleteUndoLogSql() string {
  58. return "DELETE FROM " + getUndoLogTableName() + " WHERE branch_id = ? AND xid = ?"
  59. }
  60. // undo log status
  61. const (
  62. // UndoLogStatusNormal This state can be properly rolled back by services
  63. UndoLogStatusNormal = iota
  64. // UndoLogStatusGlobalFinished This state prevents the branch transaction from inserting undo_log after the global transaction is rolled back.
  65. UndoLogStatusGlobalFinished
  66. )
  67. // BaseUndoLogManager
  68. type BaseUndoLogManager struct{}
  69. func NewBaseUndoLogManager() *BaseUndoLogManager {
  70. return &BaseUndoLogManager{}
  71. }
  72. // Init
  73. func (m *BaseUndoLogManager) Init() {
  74. }
  75. // InsertUndoLog
  76. func (m *BaseUndoLogManager) InsertUndoLog(record undo.UndologRecord, conn driver.Conn) error {
  77. log.Infof("begin to insert undo log, xid %v, branch id %v", record.XID, record.BranchID)
  78. stmt, err := conn.Prepare(getInsertUndoLogSql())
  79. if err != nil {
  80. return err
  81. }
  82. _, err = stmt.Exec([]driver.Value{record.BranchID, record.XID, record.Context, record.RollbackInfo, int64(record.LogStatus)})
  83. if err != nil {
  84. return err
  85. }
  86. return nil
  87. }
  88. func (m *BaseUndoLogManager) InsertUndoLogWithSqlConn(ctx context.Context, record undo.UndologRecord, conn *sql.Conn) error {
  89. stmt, err := conn.PrepareContext(ctx, getInsertUndoLogSql())
  90. if err != nil {
  91. return err
  92. }
  93. defer stmt.Close()
  94. _, err = stmt.Exec(record.BranchID, record.XID, record.Context, record.RollbackInfo, int64(record.LogStatus))
  95. if err != nil {
  96. return err
  97. }
  98. return nil
  99. }
  100. // DeleteUndoLog exec delete single undo log operate
  101. func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error {
  102. stmt, err := conn.PrepareContext(ctx, getDeleteUndoLogSql())
  103. if err != nil {
  104. log.Errorf("[DeleteUndoLog] prepare sql fail, err: %v", err)
  105. return err
  106. }
  107. defer stmt.Close()
  108. if _, err = stmt.Exec(branchID, xid); err != nil {
  109. log.Errorf("[DeleteUndoLog] exec delete undo log fail, err: %v", err)
  110. return err
  111. }
  112. return nil
  113. }
  114. // BatchDeleteUndoLog exec delete undo log operate
  115. func (m *BaseUndoLogManager) BatchDeleteUndoLog(xid []string, branchID []int64, conn *sql.Conn) error {
  116. // build delete undo log sql
  117. batchDeleteSql, err := m.getBatchDeleteUndoLogSql(xid, branchID)
  118. if err != nil {
  119. log.Errorf("get undo sql log fail, err: %v", err)
  120. return err
  121. }
  122. ctx := context.Background()
  123. // prepare deal sql
  124. stmt, err := conn.PrepareContext(ctx, batchDeleteSql)
  125. if err != nil {
  126. log.Errorf("prepare sql fail, err: %v", err)
  127. return err
  128. }
  129. defer stmt.Close()
  130. branchIDStr, err := Int64Slice2Str(branchID, ",")
  131. if err != nil {
  132. log.Errorf("slice to string transfer fail, err: %v", err)
  133. return err
  134. }
  135. // exec sql stmt
  136. if _, err = stmt.ExecContext(ctx, branchIDStr, strings.Join(xid, ",")); err != nil {
  137. log.Errorf("exec delete undo log fail, err: %v", err)
  138. return err
  139. }
  140. return nil
  141. }
  142. // FlushUndoLog flush undo log
  143. func (m *BaseUndoLogManager) FlushUndoLog(tranCtx *types.TransactionContext, conn driver.Conn) error {
  144. if tranCtx.RoundImages.IsEmpty() {
  145. return nil
  146. }
  147. sqlUndoLogs := make([]undo.SQLUndoLog, 0)
  148. beforeImages := tranCtx.RoundImages.BeofreImages()
  149. afterImages := tranCtx.RoundImages.AfterImages()
  150. if beforeImages.IsEmptyImage() && afterImages.IsEmptyImage() {
  151. return nil
  152. }
  153. size := len(beforeImages)
  154. if size < len(afterImages) {
  155. size = len(afterImages)
  156. }
  157. for i := 0; i < size; i++ {
  158. var (
  159. tableName string
  160. sqlType types.SQLType
  161. beforeImage *types.RecordImage
  162. afterImage *types.RecordImage
  163. )
  164. if i < len(beforeImages) && beforeImages[i] != nil {
  165. tableName = beforeImages[i].TableName
  166. sqlType = beforeImages[i].SQLType
  167. } else if i < len(afterImages) && afterImages[i] != nil {
  168. tableName = afterImages[i].TableName
  169. sqlType = afterImages[i].SQLType
  170. } else {
  171. continue
  172. }
  173. if i < len(beforeImages) {
  174. beforeImage = beforeImages[i]
  175. }
  176. if i < len(afterImages) {
  177. afterImage = afterImages[i]
  178. }
  179. undoLog := undo.SQLUndoLog{
  180. SQLType: sqlType,
  181. TableName: tableName,
  182. BeforeImage: beforeImage,
  183. AfterImage: afterImage,
  184. }
  185. sqlUndoLogs = append(sqlUndoLogs, undoLog)
  186. }
  187. branchUndoLog := undo.BranchUndoLog{
  188. Xid: tranCtx.XID,
  189. BranchID: tranCtx.BranchID,
  190. Logs: sqlUndoLogs,
  191. }
  192. parseContext := make(map[string]string, 0)
  193. parseContext[serializerKey] = undo.UndoConfig.LogSerialization
  194. parseContext[compressorTypeKey] = undo.UndoConfig.CompressConfig.Type
  195. undoLogContent := m.encodeUndoLogCtx(parseContext)
  196. rollbackInfo, err := m.serializeBranchUndoLog(&branchUndoLog, parseContext[serializerKey])
  197. if err != nil {
  198. return err
  199. }
  200. return m.InsertUndoLog(undo.UndologRecord{
  201. BranchID: tranCtx.BranchID,
  202. XID: tranCtx.XID,
  203. Context: undoLogContent,
  204. RollbackInfo: rollbackInfo,
  205. LogStatus: undo.UndoLogStatueNormnal,
  206. }, conn)
  207. }
  208. // RunUndo undo sql
  209. func (m *BaseUndoLogManager) RunUndo(ctx context.Context, xid string, branchID int64, conn *sql.DB, dbName string) error {
  210. return nil
  211. }
  212. // Undo undo sql
  213. func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) (err error) {
  214. conn, err := db.Conn(ctx)
  215. if err != nil {
  216. return err
  217. }
  218. tx, err := conn.BeginTx(ctx, &sql.TxOptions{})
  219. if err != nil {
  220. return err
  221. }
  222. defer func() {
  223. if err != nil {
  224. if err = tx.Rollback(); err != nil {
  225. log.Errorf("rollback fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  226. return
  227. }
  228. }
  229. }()
  230. stmt, err := conn.PrepareContext(ctx, getSelectUndoLogSql())
  231. if err != nil {
  232. log.Errorf("prepare sql fail, err: %v", err)
  233. return err
  234. }
  235. defer func() {
  236. if err = stmt.Close(); err != nil {
  237. log.Errorf("stmt close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  238. return
  239. }
  240. }()
  241. rows, err := stmt.Query(branchID, xid)
  242. if err != nil {
  243. log.Errorf("query sql fail, err: %v", err)
  244. return err
  245. }
  246. defer func() {
  247. if err = rows.Close(); err != nil {
  248. log.Errorf("rows close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  249. return
  250. }
  251. }()
  252. var undoLogRecords []undo.UndologRecord
  253. for rows.Next() {
  254. var record undo.UndologRecord
  255. err = rows.Scan(&record.BranchID, &record.XID, &record.Context, &record.RollbackInfo, &record.LogStatus)
  256. if err != nil {
  257. return err
  258. }
  259. undoLogRecords = append(undoLogRecords, record)
  260. }
  261. if err := rows.Err(); err != nil {
  262. log.Errorf("read rows next fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  263. return err
  264. }
  265. var exists bool
  266. for _, record := range undoLogRecords {
  267. exists = true
  268. if !record.CanUndo() {
  269. log.Infof("xid %v branch %v, ignore %v undo_log", record.XID, record.BranchID, record.LogStatus)
  270. return nil
  271. }
  272. var logCtx map[string]string
  273. if record.Context != nil && string(record.Context) != "" {
  274. logCtx = m.decodeUndoLogCtx(record.Context)
  275. }
  276. if logCtx == nil {
  277. return fmt.Errorf("undo log context not exist in record %+v", record)
  278. }
  279. rollbackInfo, err := m.getRollbackInfo(record.RollbackInfo, logCtx)
  280. if err != nil {
  281. return err
  282. }
  283. var branchUndoLog *undo.BranchUndoLog
  284. if branchUndoLog, err = m.deserializeBranchUndoLog(rollbackInfo, logCtx); err != nil {
  285. return err
  286. }
  287. sqlUndoLogs := branchUndoLog.Logs
  288. if len(sqlUndoLogs) == 0 {
  289. return nil
  290. }
  291. branchUndoLog.Reverse()
  292. for _, undoLog := range sqlUndoLogs {
  293. tableMeta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, dbName, undoLog.TableName)
  294. if err != nil {
  295. log.Errorf("get table meta fail, err: %v", err)
  296. return err
  297. }
  298. undoLog.SetTableMeta(tableMeta)
  299. undoExecutor, err := factor.GetUndoExecutor(dbType, undoLog)
  300. if err != nil {
  301. log.Errorf("get undo executor, err: %v", err)
  302. return err
  303. }
  304. if err = undoExecutor.ExecuteOn(ctx, dbType, conn); err != nil {
  305. log.Errorf("execute on fail, err: %v", err)
  306. if undoErr, ok := err.(*serr.SeataError); ok && undoErr.Code == serr.SQLUndoDirtyError {
  307. log.Errorf("Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = %s branchId = %d: %v", xid, branchID, undoErr)
  308. return serr.New(serr.TransactionErrorCodeBranchRollbackFailedUnretriable, "dirty undo log, manual cleanup required", nil)
  309. }
  310. return err
  311. }
  312. }
  313. }
  314. if exists {
  315. if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil {
  316. log.Errorf("[Undo] delete undo fail, err: %v", err)
  317. return err
  318. }
  319. log.Infof("xid %v branch %v, undo_log deleted with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
  320. } else {
  321. if err = m.insertUndoLogWithGlobalFinished(ctx, xid, uint64(branchID), conn); err != nil {
  322. log.Errorf("[Undo] insert undo with global finished fail, err: %v", err)
  323. return err
  324. }
  325. log.Infof("xid %v branch %v, undo_log added with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
  326. }
  327. if err = tx.Commit(); err != nil {
  328. log.Errorf("[Undo] execute on fail, err: %v", err)
  329. return err
  330. }
  331. return nil
  332. }
  333. func (m *BaseUndoLogManager) insertUndoLogWithGlobalFinished(ctx context.Context, xid string, branchID uint64, conn *sql.Conn) error {
  334. // todo use config to replace
  335. parseContext := make(map[string]string, 0)
  336. parseContext[serializerKey] = undo.UndoConfig.LogSerialization
  337. parseContext[compressorTypeKey] = undo.UndoConfig.CompressConfig.Type
  338. undoLogContent := m.encodeUndoLogCtx(parseContext)
  339. logParse, err := parser.GetCache().Load(parseContext[serializerKey])
  340. if err != nil {
  341. return err
  342. }
  343. rbInfo := logParse.GetDefaultContent()
  344. record := undo.UndologRecord{
  345. BranchID: branchID,
  346. XID: xid,
  347. RollbackInfo: rbInfo,
  348. LogStatus: UndoLogStatusGlobalFinished,
  349. Context: undoLogContent,
  350. }
  351. err = m.InsertUndoLogWithSqlConn(ctx, record, conn)
  352. if err != nil {
  353. log.Errorf("insert undo log fail, err: %v", err)
  354. return err
  355. }
  356. return nil
  357. }
  358. // DBType
  359. func (m *BaseUndoLogManager) DBType() types.DBType {
  360. panic("implement me")
  361. }
  362. // HasUndoLogTable check undo log table if exist
  363. func (m *BaseUndoLogManager) HasUndoLogTable(ctx context.Context, conn *sql.Conn) (res bool, err error) {
  364. if _, err = conn.QueryContext(ctx, getCheckUndoLogTableExistSql()); err != nil { //nolint:rowserrcheck,sqlclosecheck
  365. // 1146 mysql table not exist fault code
  366. if e, ok := err.(*mysql.SQLError); ok && e.Code == mysql.ErrNoSuchTable {
  367. return false, nil
  368. }
  369. log.Errorf("[HasUndoLogTable] query sql fail, err: %v", err)
  370. return
  371. }
  372. return true, nil
  373. }
  374. // getBatchDeleteUndoLogSql build batch delete undo log
  375. func (m *BaseUndoLogManager) getBatchDeleteUndoLogSql(xid []string, branchID []int64) (string, error) {
  376. if len(xid) == 0 || len(branchID) == 0 {
  377. return "", fmt.Errorf("xid or branch_id can't nil")
  378. }
  379. var undoLogDeleteSql strings.Builder
  380. undoLogDeleteSql.WriteString(" DELETE FROM ")
  381. undoLogDeleteSql.WriteString(getUndoLogTableName())
  382. undoLogDeleteSql.WriteString(" WHERE branch_id IN ")
  383. m.appendInParam(len(branchID), &undoLogDeleteSql)
  384. undoLogDeleteSql.WriteString(" AND xid IN ")
  385. m.appendInParam(len(xid), &undoLogDeleteSql)
  386. return undoLogDeleteSql.String(), nil
  387. }
  388. // appendInParam build in param
  389. func (m *BaseUndoLogManager) appendInParam(size int, str *strings.Builder) {
  390. if size <= 0 {
  391. return
  392. }
  393. str.WriteString(" (")
  394. for i := 0; i < size; i++ {
  395. str.WriteString("?")
  396. if i < size-1 {
  397. str.WriteString(",")
  398. }
  399. }
  400. str.WriteString(") ")
  401. }
  402. // Int64Slice2Str
  403. func Int64Slice2Str(values interface{}, sep string) (string, error) {
  404. v, ok := values.([]int64)
  405. if !ok {
  406. return "", fmt.Errorf("param type is fault")
  407. }
  408. var valuesText []string
  409. for i := range v {
  410. text := strconv.FormatInt(v[i], 10)
  411. valuesText = append(valuesText, text)
  412. }
  413. return strings.Join(valuesText, sep), nil
  414. }
  415. // canUndo check if it can undo
  416. func (m *BaseUndoLogManager) canUndo(state int32) bool {
  417. return state == UndoLogStatusNormal
  418. }
  419. func (m *BaseUndoLogManager) UnmarshalContext(undoContext []byte) (map[string]string, error) {
  420. res := make(map[string]string)
  421. if err := json.Unmarshal(undoContext, &res); err != nil {
  422. return nil, err
  423. }
  424. return res, nil
  425. }
  426. // getRollbackInfo parser rollback info
  427. func (m *BaseUndoLogManager) getRollbackInfo(rollbackInfo []byte, undoContext map[string]string) ([]byte, error) {
  428. var err error
  429. res := rollbackInfo
  430. // get compress type
  431. if v, ok := undoContext[compressorTypeKey]; ok {
  432. res, err = compressor.CompressorType(v).GetCompressor().Decompress(rollbackInfo)
  433. if err != nil {
  434. log.Errorf("[getRollbackInfo] decompress fail, err: %+v", err)
  435. return nil, err
  436. }
  437. }
  438. return res, nil
  439. }
  440. // getSerializer get serializer from undo context
  441. func (m *BaseUndoLogManager) getSerializer(undoLogContext map[string]string) (serializer string) {
  442. if undoLogContext == nil {
  443. return
  444. }
  445. serializer, _ = undoLogContext[serializerKey]
  446. return
  447. }
  448. func (m *BaseUndoLogManager) deserializeBranchUndoLog(rbInfo []byte, logCtx map[string]string) (*undo.BranchUndoLog, error) {
  449. var (
  450. err error
  451. logParser parser.UndoLogParser
  452. )
  453. if serialzerType := m.getSerializer(logCtx); serialzerType != "" {
  454. if logParser, err = parser.GetCache().Load(serialzerType); err != nil {
  455. return nil, err
  456. }
  457. }
  458. var branchUndoLog *undo.BranchUndoLog
  459. if branchUndoLog, err = logParser.Decode(rbInfo); err != nil {
  460. return nil, err
  461. }
  462. return branchUndoLog, nil
  463. }
  464. func (m *BaseUndoLogManager) serializeBranchUndoLog(log *undo.BranchUndoLog, serializerType string) ([]byte, error) {
  465. logParser, err := parser.GetCache().Load(serializerType)
  466. if err != nil {
  467. return nil, err
  468. }
  469. return logParser.Encode(log)
  470. }
  471. func (m *BaseUndoLogManager) encodeUndoLogCtx(undoLogCtx map[string]string) []byte {
  472. return collection.EncodeMap(undoLogCtx)
  473. }
  474. func (m *BaseUndoLogManager) decodeUndoLogCtx(undoLogCtx []byte) map[string]string {
  475. return collection.DecodeMap(undoLogCtx)
  476. }