You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

multi_delete_executor.go 5.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package at
  18. import (
  19. "bytes"
  20. "context"
  21. "database/sql/driver"
  22. "fmt"
  23. "github.com/arana-db/parser/ast"
  24. "github.com/arana-db/parser/format"
  25. "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
  26. "seata.apache.org/seata-go/pkg/datasource/sql/exec"
  27. "seata.apache.org/seata-go/pkg/datasource/sql/types"
  28. "seata.apache.org/seata-go/pkg/datasource/sql/util"
  29. "seata.apache.org/seata-go/pkg/util/log"
  30. )
  31. type multiDeleteExecutor struct {
  32. baseExecutor
  33. parserCtx *types.ParseContext
  34. execContext *types.ExecContext
  35. }
  36. func (m *multiDeleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
  37. m.beforeHooks(ctx, m.execContext)
  38. defer func() {
  39. m.afterHooks(ctx, m.execContext)
  40. }()
  41. beforeImage, err := m.beforeImage(ctx)
  42. if err != nil {
  43. return nil, err
  44. }
  45. res, err := f(ctx, m.execContext.Query, m.execContext.NamedValues)
  46. if err != nil {
  47. return nil, err
  48. }
  49. afterImage, err := m.afterImage(ctx)
  50. if err != nil {
  51. return nil, err
  52. }
  53. m.execContext.TxCtx.RoundImages.AppendBeofreImages(beforeImage)
  54. m.execContext.TxCtx.RoundImages.AppendAfterImages(afterImage)
  55. return res, nil
  56. }
  57. type multiDelete struct {
  58. sql string
  59. clear bool
  60. }
  61. // NewMultiDeleteExecutor get multiDelete executor
  62. func NewMultiDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) *multiDeleteExecutor {
  63. return &multiDeleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}}
  64. }
  65. func (m *multiDeleteExecutor) beforeImage(ctx context.Context) ([]*types.RecordImage, error) {
  66. selectSQL, args, err := m.buildBeforeImageSQL()
  67. if err != nil {
  68. return nil, err
  69. }
  70. var (
  71. rowsi driver.Rows
  72. image *types.RecordImage
  73. records []*types.RecordImage
  74. )
  75. queryerCtx, ok := m.execContext.Conn.(driver.QueryerContext)
  76. var queryer driver.Queryer
  77. if !ok {
  78. queryer, ok = m.execContext.Conn.(driver.Queryer)
  79. }
  80. if !ok {
  81. log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
  82. return nil, fmt.Errorf("invalid conn")
  83. }
  84. rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, args)
  85. defer func() {
  86. if rowsi != nil {
  87. rowsi.Close()
  88. }
  89. }()
  90. if err != nil {
  91. log.Errorf("ctx driver query: %+v", err)
  92. return nil, err
  93. }
  94. tableName, err := m.getFromTableInSQL()
  95. if err != nil {
  96. return nil, err
  97. }
  98. metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, m.execContext.DBName, tableName)
  99. if err != nil {
  100. return nil, err
  101. }
  102. image, err = m.buildRecordImages(rowsi, metaData, types.SQLTypeDelete)
  103. if err != nil {
  104. log.Errorf("record images : %+v", err)
  105. return nil, err
  106. }
  107. records = append(records, image)
  108. lockKey := m.buildLockKey(image, *metaData)
  109. m.execContext.TxCtx.LockKeys[lockKey] = struct{}{}
  110. return records, err
  111. }
  112. func (m *multiDeleteExecutor) afterImage(ctx context.Context) ([]*types.RecordImage, error) {
  113. tableName, _ := m.parserCtx.GetTableName()
  114. metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, m.execContext.DBName, tableName)
  115. if err != nil {
  116. return nil, err
  117. }
  118. image := types.NewEmptyRecordImage(metaData, types.SQLTypeDelete)
  119. return []*types.RecordImage{image}, nil
  120. }
  121. func (m *multiDeleteExecutor) buildBeforeImageSQL() (string, []driver.NamedValue, error) {
  122. tableName, err := m.getFromTableInSQL()
  123. if err != nil {
  124. return "", nil, err
  125. }
  126. var (
  127. // todo optimize replace * by use columns
  128. selectSQL = "SELECT SQL_NO_CACHE * FROM " + tableName
  129. params []driver.NamedValue
  130. whereCondition string
  131. hasWhereCondition = true
  132. )
  133. for _, parser := range m.parserCtx.MultiStmt {
  134. deleteParser := parser.DeleteStmt
  135. if deleteParser == nil {
  136. continue
  137. }
  138. if deleteParser.Limit != nil {
  139. return "", nil, fmt.Errorf("Multi delete SQL with limit condition is not support yet!")
  140. }
  141. if deleteParser.Order != nil {
  142. return "", nil, fmt.Errorf("Multi delete SQL with orderBy condition is not support yet!")
  143. }
  144. if deleteParser.Where == nil || !hasWhereCondition {
  145. hasWhereCondition = false
  146. continue
  147. }
  148. var whereBuffer bytes.Buffer
  149. if err = deleteParser.Where.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, &whereBuffer)); err != nil {
  150. return "", nil, err
  151. }
  152. if whereCondition != "" {
  153. whereCondition += " OR "
  154. }
  155. whereCondition += fmt.Sprintf("(%s)", string(whereBuffer.Bytes()))
  156. newParams := m.buildSelectArgs(&ast.SelectStmt{
  157. Where: deleteParser.Where,
  158. From: deleteParser.TableRefs,
  159. Limit: deleteParser.Limit,
  160. OrderBy: deleteParser.Order,
  161. TableHints: deleteParser.TableHints,
  162. }, m.execContext.NamedValues)
  163. params = append(params, newParams...)
  164. }
  165. if hasWhereCondition {
  166. selectSQL += " WHERE " + whereCondition
  167. } else {
  168. params = []driver.NamedValue{}
  169. }
  170. selectSQL += " FOR UPDATE"
  171. return selectSQL, params, nil
  172. }
  173. func (m *multiDeleteExecutor) getFromTableInSQL() (string, error) {
  174. for _, parser := range m.parserCtx.MultiStmt {
  175. if parser != nil {
  176. return parser.GetTableName()
  177. }
  178. }
  179. return "", fmt.Errorf("multi delete sql has no table name")
  180. }