You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 5.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package executor
  18. import (
  19. "context"
  20. "database/sql"
  21. "database/sql/driver"
  22. "fmt"
  23. "strings"
  24. "github.com/goccy/go-json"
  25. "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
  26. "seata.apache.org/seata-go/pkg/datasource/sql/types"
  27. "seata.apache.org/seata-go/pkg/datasource/sql/undo"
  28. serr "seata.apache.org/seata-go/pkg/util/errors"
  29. "seata.apache.org/seata-go/pkg/util/log"
  30. )
  31. var _ undo.UndoExecutor = (*BaseExecutor)(nil)
  32. const (
  33. checkSQLTemplate = "SELECT * FROM %s WHERE %s FOR UPDATE"
  34. maxInSize = 1000
  35. )
  36. type BaseExecutor struct {
  37. sqlUndoLog undo.SQLUndoLog
  38. undoImage *types.RecordImage
  39. }
  40. // ExecuteOn
  41. func (b *BaseExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, conn *sql.Conn) error {
  42. // check data if valid
  43. return nil
  44. }
  45. // UndoPrepare
  46. func (b *BaseExecutor) UndoPrepare(undoPST *sql.Stmt, undoValues []types.ColumnImage, pkValueList []types.ColumnImage) {
  47. }
  48. func (b *BaseExecutor) dataValidationAndGoOn(ctx context.Context, conn *sql.Conn) (bool, error) {
  49. if !undo.UndoConfig.DataValidation {
  50. return true, nil
  51. }
  52. beforeImage := b.sqlUndoLog.BeforeImage
  53. afterImage := b.sqlUndoLog.AfterImage
  54. equals, err := IsRecordsEquals(beforeImage, afterImage)
  55. if err != nil {
  56. return false, err
  57. }
  58. if equals {
  59. log.Infof("Stop rollback because there is no data change between the before data snapshot and the after data snapshot.")
  60. return false, nil
  61. }
  62. // Validate if data is dirty.
  63. currentImage, err := b.queryCurrentRecords(ctx, conn)
  64. if err != nil {
  65. return false, err
  66. }
  67. // compare with current data and after image.
  68. equals, err = IsRecordsEquals(afterImage, currentImage)
  69. if err != nil {
  70. return false, err
  71. }
  72. if !equals {
  73. // If current data is not equivalent to the after data, then compare the current data with the before
  74. // data, too. No need continue to undo if current data is equivalent to the before data snapshot
  75. equals, err = IsRecordsEquals(beforeImage, currentImage)
  76. if err != nil {
  77. return false, err
  78. }
  79. if equals {
  80. log.Infof("Stop rollback because there is no data change between the before data snapshot and the current data snapshot.")
  81. // no need continue undo.
  82. return false, nil
  83. } else {
  84. oldRowJson, _ := json.Marshal(afterImage.Rows)
  85. newRowJson, _ := json.Marshal(currentImage.Rows)
  86. log.Infof("check dirty data failed, old and new data are not equal, "+
  87. "tableName:[%s], oldRows:[%s],newRows:[%s].", afterImage.TableName, oldRowJson, newRowJson)
  88. return false, serr.New(serr.SQLUndoDirtyError, "has dirty records when undo", nil)
  89. }
  90. }
  91. return true, nil
  92. }
  93. func (b *BaseExecutor) queryCurrentRecords(ctx context.Context, conn *sql.Conn) (*types.RecordImage, error) {
  94. if b.undoImage == nil {
  95. return nil, fmt.Errorf("undo image is nil")
  96. }
  97. tableMeta := b.undoImage.TableMeta
  98. pkNameList := tableMeta.GetPrimaryKeyOnlyName()
  99. pkValues := b.parsePkValues(b.undoImage.Rows, pkNameList)
  100. if len(pkValues) == 0 {
  101. return nil, nil
  102. }
  103. where := buildWhereConditionByPKs(pkNameList, len(b.undoImage.Rows), maxInSize)
  104. checkSQL := fmt.Sprintf(checkSQLTemplate, b.undoImage.TableName, where)
  105. params := buildPKParams(b.undoImage.Rows, pkNameList)
  106. rows, err := conn.QueryContext(ctx, checkSQL, params...)
  107. if err != nil {
  108. return nil, err
  109. }
  110. defer rows.Close()
  111. image := types.RecordImage{
  112. TableName: b.undoImage.TableName,
  113. TableMeta: tableMeta,
  114. SQLType: types.SQLTypeSelect,
  115. }
  116. rowImages := make([]types.RowImage, 0)
  117. for rows.Next() {
  118. columnTypes, err := rows.ColumnTypes()
  119. if err != nil {
  120. return nil, err
  121. }
  122. slice := datasource.GetScanSlice(columnTypes)
  123. if err = rows.Scan(slice...); err != nil {
  124. return nil, err
  125. }
  126. colNames, err := rows.Columns()
  127. if err != nil {
  128. return nil, err
  129. }
  130. columns := make([]types.ColumnImage, 0)
  131. for i, val := range slice {
  132. actualVal := val
  133. if v, ok := val.(driver.Valuer); ok {
  134. actualVal, _ = v.Value()
  135. }
  136. columns = append(columns, types.ColumnImage{
  137. ColumnName: colNames[i],
  138. Value: actualVal,
  139. })
  140. }
  141. rowImages = append(rowImages, types.RowImage{Columns: columns})
  142. }
  143. if err := rows.Err(); err != nil {
  144. return nil, err
  145. }
  146. image.Rows = rowImages
  147. return &image, nil
  148. }
  149. func (b *BaseExecutor) parsePkValues(rows []types.RowImage, pkNameList []string) map[string][]types.ColumnImage {
  150. pkValues := make(map[string][]types.ColumnImage)
  151. // todo optimize 3 fors
  152. for _, row := range rows {
  153. for _, column := range row.Columns {
  154. for _, pk := range pkNameList {
  155. if strings.EqualFold(pk, column.ColumnName) {
  156. values := pkValues[strings.ToUpper(pk)]
  157. if values == nil {
  158. values = make([]types.ColumnImage, 0)
  159. }
  160. values = append(values, column)
  161. pkValues[pk] = values
  162. }
  163. }
  164. }
  165. }
  166. return pkValues
  167. }