You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

insert_executor.go 18 kB

Fix autoincrement insert panic (#452) * Initial commit * add config file * init * add rm * add processor * bug fix * use processor on message * change default config * init github Actions * init github Actions * add tcc process * Adjust the structure of the project * Adjust the structure of the project * Adjust the structure of the project * fix linter error * fix cli * add apache license * fix config * add unit test * add go imports shell * Adjust project directory * remove iota * fix cli bug * remove the useless comments * fix branch commit bug * remove goetty package * Optimize codec code * Create enhancement.md * style:change bool to struct{} * fix commit and rollback codec * realize branchReport * add seata-go samples * add processor unittest * remove blank and annotation * branchReport return value judgment * optimize-zap-log * update changed the format of log * remove the duplicate code same with sample/local * optimize format of logging * optimize format of logging * optimize format of logging * Update logging.go * feature add action context for tcc * add err check for unmarshal json * Feat add two phase (#122) * add two phase * support seata dubbo * fix getty auto close bug (#130) * optimize named for the resource manager api and tcc resource, adjust … (#125) update optimize the resource manager. * docs: add readme ,contributing and pr template doc (#153) * add license (#146) * bugfix: fix rollback response status bug (#155) * feature add unit test and labeler workflow (#165) * add license * add license * add unit test workflow, rename golangci lint workflow * changed golang version from 1.15 to 1.16 in workflow. * fix golangci-lint invaild * fix golangci-lint invalid. * add labeler workflow * fix labeler invaild * fix typo in reademe (#167) fix typo * doc: add seata server startup docker compose file (#172) doc: add seata server startup docker composer file * doc: add release 0.1.0 changes note (#169) doc: add release 0.0.1-rc1 changes note * optimize add ut for message (#154) * add ut for message * format imports * optimieze add ut for tm (#163) * add ut * add ut for tm * format imports * resolve conflict in go mod * fix loop variable v captured by func literal (govet) * optimize ut for tm in gomonkey mock. * format by goimports * optimize code style * resolve conflict. * [modify] git ignore add build products in 'dist/' (#177) * bugfix fix ut bug of msg, it will cause ci failed (#176) fix ut bug of msg. * optimize: optimise way of init seata (#187) optimize: optimise way of init seata * Feature add tcc branch report (#190) feature: add tcc branch report * refactor(pkg/rm): optimize function's parameters into one struct (#196) optimize function's parameters into one struct,comment and code style Co-authored-by: liushao <liushao@inke.cn> * feature add integration for grpc (#158) * finish ut for interceptor * add client interceptor * add server interceptor * adjust location for test file * format in goimports * goimformat * adjust package struct of grpc interceptor, move constant of grpc to common package. * adjust gprc interceptor file struct * refact directory, optimize grpc test case * optimize proto * delte test file * delte test file * fix npe for server integration * go mod tidy * remove duplicate constant * optimize: remove unnecessary codes (#208) * Add unit testing for getty (#203) * getty tests * test * test * style * style go mod * style:format imports * style:Delete the useless gomonkey Reset method Co-authored-by: xubaisheng <> * optimize workflow, add condecov and issue, stale robot (#202) optimize workflow, add condecov and issue robot * Feature add tcc branch report unit test (#210) test: add tcc branch report unit test * optimize: support instance BusinessActionContext outside the TCC try … (#179) optimize: support instance BusinessActionContext outside the TCC try method * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * optimize ResourceManagerInbound function's parameters into one struct-rm_api.go (#198) optimiz: optimize function's parameters into one struct-rm_api.go * feature add tcc grpc sample,adjust register resource and branch register (#200) feature: add grpc integration for tcc * fix enhancement: make time parameters easier to read (#215) optimize: make time parameters easier to read * feature/datasource_at merge to master (#213) feature: add transaction at datasource * add rm test (#192) add test for rm * optimize: add some todo comment, add a undo hook sample (#238) add some todo comment, add a undo hook sample * replace the underline naming in the code with the hump naming method (#235) * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * add dubbo transtation filter test (#204) test: add unit test for dubbo transtation filter * fix: do register resource where execute OpenConnector function (#237) * fix: register resource where execute OpenConnector function * remove chinese comment * optimize Add unit testing for common (#229) * add unit test * add unit test * add unit test Co-authored-by: miaoxueyu <miaoxueyu@xs901.com> * bugfix: fix infinite loop of asyncCallback (#230) asyncCallback -> syncCallback * feature: add undo log manager delete (#240) feature:add DeleteUndoLogs func, issue:#217 * frature: add update sql parser (#243) feature: parse select sql from update sql * feature: add license github action (#254) * add license github action * fix global transation time out (#258) * feature add fence for tcc, and add fence sample in tcc local mode. (#191) * frature: add update sql parser and remove tidb parser (#264) add update sql parser and remove tidb parser (#243) * fix ci failed because mock is invalid (#263) * optimize: nested loop retries (#261) * feat: add http tcc (#280) feat: add http tcc * feat: optimize retry (#284) feat: optimize retry * optimize: split client.Init into rm.Init and tm.Init methods (#286) optimize: nested loop retries (#261) * optimize: optimize global transaction usage (#281) * refact the tm executor and adjust the sample * optimize interface, adjust samples * fix ci * optimize comment * upgrade go version * add tx name configuration * add panic handler * feat: add has undo log table (#245) feat:add has undo log table func * (WIP)refactor:seata conn (#287) * refactor:seata conn * test: add unit test * test: add unit test * feat: add mysql update undo log builder (#288) feat: add mysql update undo log builder * change dubbo-go version (#302) optimize: change dubbo-go version * test:add DoParser ut (#299) test: add DoParser unit test * config github action not to automatically close ps or issue (#305) * add mysql delete undo log builder (#296) * add mysql delete undo log builder * add mysql delete undo log builder * add mysql delete undo log builder * add mysql delete undo log builder * feat: add sync worker and fmt (#303) * feat: add sync worker and fmt * feat: update * feat: update * feat: update * feat: update * feat: update * feat: update * feat: update * feat: fix spell * feat: fix conflict Co-authored-by: Xin.Zh <dragoncharlie@foxmail.com> Co-authored-by: haohongfan1 <haohongfan1@jd.com> * Feat add mysql update after undo log builder (#289) feat: add mysql update after undo log builder * format:format code * format:format code * feat:add mysql table meta func && sql addEscape, delEscape func issue… (#294) * feat:add mysql table meta func && sql addEscape, delEscape func issue#290 * fix:TestMetaCache func rename * fix:ci lint fail fix * fix: solve GetTableMeta return type && constant define * fix:solve code format and meta cache func fault * fix:solev git ci fail * fix:solev git ci fail * format:format code * fix:merge master && format code * fix:solve name conflict Co-authored-by: 王瑞 <wangrui5@songguo7.com> Co-authored-by: wangrui130 <wangrui130@tal.com> * feat: init compressor type (#309) * init compressor type * refactor compress (#318) * init compressor type * init compressor type * reformat code * reformat code * Feat multi undo log builder (#301) * feat: add mysql update undo log builder * add log * add comment for JDBCType * add name * optimize basic builder * fix type * fix type * fix switch * add update after iamge builder * format code * format * add multi undo log builder * fix conflict * fix conflict * fix conflict * fix conflict * fix conflict * fix conflict * fix conflict * feat: fix fanout test data race (#326) * Refactor seata conn (#295) * refactor:split xa and at logic * refactor:split xa and at logic * refactor:split xa and at logic * feat(compressor): deflate compress (#321) * feat(compressor): deflate compress 1. Optimize compressor type definition. (Don't start with package name). 2. Implement deflate compressor and ut. close #312 * refactor(compressor): revert compressor type definition * refactor(compressor): remove fmt.Println in ut * refactor(compressor): adjust the order of deflate compressor constants. * Add LZ4 compressor support. (#324) * feat: add lz4 compressor support. Signed-off-by: Wyatt Jia <i@eventloop.live> * fix lz4 compressor buffer error. Signed-off-by: Wyatt Jia <i@eventloop.live> * format code by goimports. * Add equal lz4 decompress test case and format code by goimports. Signed-off-by: Wyatt Jia <i@eventloop.live> * add zstd compress (#327) zstd compress Co-authored-by: xubaisheng <> * Feat add gzip (#322) * format:format code * feat:add gzip func * fix:comment unuseful code * fix: fix ret val and unit test assert * fix:gzip unit modify * fix:unit test param define modify Co-authored-by: wangrui130 <wangrui130@tal.com> Co-authored-by: 王瑞 <wangrui5@songguo7.com> * feat:add undo log manager-flush undo log func #269 (#307) add undo log manager-flush undo log func #269 * Feat add zip #315 (#329) * add zipcompress * Feat/multi update (#325) feat: add multi update * feature: add select for update (#319) * add select for update * wip:multi delete sql (#330) feat:multi delete sql Co-authored-by: wangxiaoxiong <wangxiaoxiong@asants.com> * test: strengthen ut (#332) * optimize at (#336) optimize at model * Feat add undo func (#320) add undo function * optimize: add insert undo log function (#337) add insert undo log function * doc: add sample undo_log table sql add sample mysql * optimize at commit branch (#346) * optimize at * fix:timeout config & panic log lost (#350) * fix timeout config & panic log lost * format & fix rollback * optimize meta data (#352) optimize meta data * optimize commit (#354) optimize commit * optimize commit (#354) optimize commit process * feat: modify some receiver name (#353) * feat: modify some receiver name * feat: add no transaction test * feat: update batch delete * Optimize rollback (#356) optimize rollback * feat: add gin for at and fix async worker bug (#357) * feature: support ONLY_CARE_UPDATE_COLUMNS (#355) support ONLY_CARE_UPDATE_COLUMNS & fix beforeImage error due to the struct shallow copy * Rollback (#358) * feat: rollback * feat: add at rollback sampel * feat: fix name * fix bug (#359) * Rollback1 (#360) * feat: rollback * feat: add at rollback sampel * feat: update * Fix meta data (#363) fix ColumnType * Fix decode image(#365) fix decode image * add data check before rollbeck (#366) * add data check before rollbeck * doc: v1.0.2-RC1 release file (#338) * optimize: remove unless function (#369) remove unless function * doc: update 1.0.2-RC1 change log (#370) * update 1.0.2-RC1 change log * fix: at sample (#374) fix at rolback sample * optimize: simplify to make codes more readable (#367) Signed-off-by: charlie <qianglin98@qq.com> * feat:add yml config (#285) add yml config * feat: support for propagation of global transactions (#262) * feature:build undo log by insert target SQL (#333) * insert undo log * add insert undo test * fix map loop * OPT code style & lint & add ut * fix imports * fix conflict & adapter some modify * fix some bug & add ut * fix at smaple sql (#385) fix the table structure in testdata sql requires fewer fields than that in smaple (#385) * fix: loop recursion problem in OpenConnector (#387) fix: fix loop recursion problem in OpenConnector * add tcc fence config logic (#383) * add tcc fence config logic * add string loader * fix samples * fix samples * fix samples * fix samples * fix samples * fix samples * fix samples * merge Load and LoadPath * fix sample * optimize some format (#392) Co-authored-by: haohongfan1 <haohongfan1@jd.com> * optimize at base executor (#394) optimize at executor * feat: add tm config (#398) feat(pkg/client,pkg/config): add tm config * feat: add getty config (#399) add getty config * fix branch register response (#401) * fix branch register response * optimize protocol init (#400) * optimize protocol init * optimize RM init (#390) * OPT rm client * fix code style * fix lint * feat: support xa mysql connection (#380) * [wip]feat: support xa mysql connection Signed-off-by: charlie <qianglin98@qq.com> * Feat xa branch xid (#389) * feat: add xa branch xid Co-authored-by: 王瑞 <wangrui5@songguo7.com> * [AT] add insert on duplicate (#405) Co-authored-by: “kirhaku” <“2454546080@qq.com”> * optimize: refactor at executor (#397) * refactor at executor * refactor: use new tm config in tm module. (#411) * refactor: use new tm config in tm module. * roptimize: efactor delete and insert executor (#409) * refactor delete and insert executor * optimize: add log init (#408) * feature: add transport config (#406) * add transport config * refactor(compressor): rename defalte_compress_test.go to deflate_compress_test.go (#414) Co-authored-by: liushao <liushao@inke.cn> * feat: add RM config (#412) * feat: add rm config * optimize readme (#417) * feat: add service and seata config (#413) * feat: add Undo config (#410) add undo config * fix undo config (#418) fix undo config * opt: optimize comments and dead code (#388) * Apply getty config (#421) * apply getty config * feat: use undo config (#419) * use undo config * remove unused config (#422) * bugfix: repair init getty failed (#423) * bugfix: repair init getty failed * fix getty config (#424) * remove unused config * bugfix: at model execution failed (#429) * bugfix: at model execution failed * Optimize getty config (#430) * temporary not supported connection-num * use config (#436) * fix bug * doc: add v1.0.2-RC3 change log (#431) add change log for v1.0.2-RC3 * rename change-log to 1.0.3 (#443) Co-authored-by: liuyuecai <liuyuecai@didiglobal.com> * fix:when id autoincrement insert fail Signed-off-by: Wyatt Jia <i@eventloop.live> Signed-off-by: charlie <qianglin98@qq.com> Co-authored-by: Xin.Zh <dragoncharlie@foxmail.com> Co-authored-by: luky116_Liuyuecai <luky116@126.com> Co-authored-by: liiiiiibpm <1653433835@qq.com> Co-authored-by: AlexStocks <alexstocks@foxmail.com> Co-authored-by: liuyuecai <liuyuecai@didiglobal.com> Co-authored-by: 刘月财 <38887641+luky116@users.noreply.github.com> Co-authored-by: wangxiaoxiong <wangxiaoxiong@asants.com> Co-authored-by: lichen <liiiiiibpm@163.com> Co-authored-by: cgDeepLearn <cglearningnow@163.com> Co-authored-by: liiibpm <105549399+liiibpm@users.noreply.github.com> Co-authored-by: juzimao <578961953@qq.com> Co-authored-by: adair peng <1374854359@qq.com> Co-authored-by: Elrond G <elrondgcn@gmail.com> Co-authored-by: liushao <505786909@qq.com> Co-authored-by: liushao <liushao@inke.cn> Co-authored-by: Cr <631807682@qq.com> Co-authored-by: 野牛 <1251604436@qq.com> Co-authored-by: Jason Deng <76831112+jasondeng1997@users.noreply.github.com> Co-authored-by: liaochuntao <liaochuntao@live.com> Co-authored-by: windWheel <1817802738@qq.com> Co-authored-by: miaoxueyu <M__java@163.com> Co-authored-by: miaoxueyu <miaoxueyu@xs901.com> Co-authored-by: PangXing <pangxing_2006@126.com> Co-authored-by: georgehao <haohongfan@gmail.com> Co-authored-by: baerwang <52104949+baerwang@users.noreply.github.com> Co-authored-by: Shaozhou Hu <1094091844@qq.com> Co-authored-by: haohongfan1 <haohongfan1@jd.com> Co-authored-by: 王瑞 <wangrui5@songguo7.com> Co-authored-by: bitstring <wellschuan@gmail.com> Co-authored-by: FengZhang <zfcode@qq.com> Co-authored-by: Charlie17Li <qianglin98@qq.com> Co-authored-by: zhangym <40376181+zhangymPerson@users.noreply.github.com> Co-authored-by: lxfeng1997 <33981743+lxfeng1997@users.noreply.github.com> Co-authored-by: Kirhaku <38072436+Kirhaku@users.noreply.github.com> Co-authored-by: “kirhaku” <“2454546080@qq.com”> Co-authored-by: Zihao Yu <81380056+Chovyyyyyy@users.noreply.github.com>
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package at
  18. import (
  19. "context"
  20. "database/sql/driver"
  21. "fmt"
  22. "strings"
  23. "github.com/arana-db/parser/ast"
  24. "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
  25. "seata.apache.org/seata-go/pkg/datasource/sql/exec"
  26. "seata.apache.org/seata-go/pkg/datasource/sql/types"
  27. "seata.apache.org/seata-go/pkg/datasource/sql/util"
  28. "seata.apache.org/seata-go/pkg/util/log"
  29. )
  30. const (
  31. sqlPlaceholder = "?"
  32. )
  33. // insertExecutor execute insert SQL
  34. type insertExecutor struct {
  35. baseExecutor
  36. parserCtx *types.ParseContext
  37. execContext *types.ExecContext
  38. incrementStep int
  39. // businesSQLResult after insert sql
  40. businesSQLResult types.ExecResult
  41. }
  42. // NewInsertExecutor get insert executor
  43. func NewInsertExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor {
  44. return &insertExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}}
  45. }
  46. func (i *insertExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
  47. i.beforeHooks(ctx, i.execContext)
  48. defer func() {
  49. i.afterHooks(ctx, i.execContext)
  50. }()
  51. beforeImage, err := i.beforeImage(ctx)
  52. if err != nil {
  53. return nil, err
  54. }
  55. res, err := f(ctx, i.execContext.Query, i.execContext.NamedValues)
  56. if err != nil {
  57. return nil, err
  58. }
  59. if i.businesSQLResult == nil {
  60. i.businesSQLResult = res
  61. }
  62. afterImage, err := i.afterImage(ctx)
  63. if err != nil {
  64. return nil, err
  65. }
  66. i.execContext.TxCtx.RoundImages.AppendBeofreImage(beforeImage)
  67. i.execContext.TxCtx.RoundImages.AppendAfterImage(afterImage)
  68. return res, nil
  69. }
  70. // beforeImage build before image
  71. func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
  72. tableName, _ := i.parserCtx.GetTableName()
  73. metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
  74. if err != nil {
  75. return nil, err
  76. }
  77. return types.NewEmptyRecordImage(metaData, types.SQLTypeInsert), nil
  78. }
  79. // afterImage build after image
  80. func (i *insertExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) {
  81. if !i.isAstStmtValid() {
  82. return nil, nil
  83. }
  84. tableName, _ := i.parserCtx.GetTableName()
  85. metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
  86. if err != nil {
  87. return nil, err
  88. }
  89. selectSQL, selectArgs, err := i.buildAfterImageSQL(ctx)
  90. if err != nil {
  91. return nil, err
  92. }
  93. var rowsi driver.Rows
  94. queryerCtx, ok := i.execContext.Conn.(driver.QueryerContext)
  95. var queryer driver.Queryer
  96. if !ok {
  97. queryer, ok = i.execContext.Conn.(driver.Queryer)
  98. }
  99. if ok {
  100. rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, selectArgs)
  101. defer func() {
  102. if rowsi != nil {
  103. rowsi.Close()
  104. }
  105. }()
  106. if err != nil {
  107. log.Errorf("ctx driver query: %+v", err)
  108. return nil, err
  109. }
  110. } else {
  111. log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
  112. return nil, fmt.Errorf("invalid conn")
  113. }
  114. image, err := i.buildRecordImages(rowsi, metaData, types.SQLTypeInsert)
  115. if err != nil {
  116. return nil, err
  117. }
  118. lockKey := i.buildLockKey(image, *metaData)
  119. i.execContext.TxCtx.LockKeys[lockKey] = struct{}{}
  120. return image, nil
  121. }
  122. // buildAfterImageSQL build select sql from insert sql
  123. func (i *insertExecutor) buildAfterImageSQL(ctx context.Context) (string, []driver.NamedValue, error) {
  124. // get all pk value
  125. tableName, _ := i.parserCtx.GetTableName()
  126. meta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
  127. if err != nil {
  128. return "", nil, err
  129. }
  130. pkValuesMap, err := i.getPkValues(ctx, i.execContext, i.parserCtx, *meta)
  131. if err != nil {
  132. return "", nil, err
  133. }
  134. pkColumnNameList := meta.GetPrimaryKeyOnlyName()
  135. if len(pkColumnNameList) == 0 {
  136. return "", nil, fmt.Errorf("Pk columnName size is zero")
  137. }
  138. dataTypeMap, err := meta.GetPrimaryKeyTypeStrMap()
  139. if err != nil {
  140. return "", nil, err
  141. }
  142. if len(dataTypeMap) != len(pkColumnNameList) {
  143. return "", nil, fmt.Errorf("PK columnName size don't equal PK DataType size")
  144. }
  145. var pkRowImages []types.RowImage
  146. rowSize := len(pkValuesMap[pkColumnNameList[0]])
  147. for i := 0; i < rowSize; i++ {
  148. for _, name := range pkColumnNameList {
  149. tmpKey := name
  150. tmpArray := pkValuesMap[tmpKey]
  151. pkRowImages = append(pkRowImages, types.RowImage{
  152. Columns: []types.ColumnImage{{
  153. KeyType: types.IndexTypePrimaryKey,
  154. ColumnName: tmpKey,
  155. ColumnType: types.MySQLStrToJavaType(dataTypeMap[tmpKey]),
  156. Value: tmpArray[i],
  157. }},
  158. })
  159. }
  160. }
  161. // build check sql
  162. sb := strings.Builder{}
  163. suffix := strings.Builder{}
  164. var insertColumns []string
  165. for _, column := range i.parserCtx.InsertStmt.Columns {
  166. insertColumns = append(insertColumns, column.Name.O)
  167. }
  168. sb.WriteString("SELECT " + strings.Join(i.getNeedColumns(meta, insertColumns, types.DBTypeMySQL), ", "))
  169. suffix.WriteString(" FROM " + tableName)
  170. whereSQL := i.buildWhereConditionByPKs(pkColumnNameList, rowSize, "mysql", maxInSize)
  171. suffix.WriteString(" WHERE " + whereSQL + " ")
  172. sb.WriteString(suffix.String())
  173. return sb.String(), i.buildPKParams(pkRowImages, pkColumnNameList), nil
  174. }
  175. func (i *insertExecutor) getPkValues(ctx context.Context, execCtx *types.ExecContext, parseCtx *types.ParseContext, meta types.TableMeta) (map[string][]interface{}, error) {
  176. pkColumnNameList := meta.GetPrimaryKeyOnlyName()
  177. pkValuesMap := make(map[string][]interface{})
  178. var err error
  179. // when there is only one pk in the table
  180. if len(pkColumnNameList) == 1 {
  181. if i.containsPK(meta, parseCtx) {
  182. // the insert sql contain pk value
  183. pkValuesMap, err = i.getPkValuesByColumn(ctx, execCtx)
  184. if err != nil {
  185. return nil, err
  186. }
  187. } else if containsColumns(parseCtx) {
  188. // the insert table pk auto generated
  189. pkValuesMap, err = i.getPkValuesByAuto(ctx, execCtx)
  190. if err != nil {
  191. return nil, err
  192. }
  193. } else {
  194. pkValuesMap, err = i.getPkValuesByColumn(ctx, execCtx)
  195. if err != nil {
  196. return nil, err
  197. }
  198. }
  199. } else {
  200. // when there is multiple pk in the table
  201. // 1,all pk columns are filled value.
  202. // 2,the auto increment pk column value is null, and other pk value are not null.
  203. pkValuesMap, err = i.getPkValuesByColumn(ctx, execCtx)
  204. if err != nil {
  205. return nil, err
  206. }
  207. for _, columnName := range pkColumnNameList {
  208. if _, ok := pkValuesMap[columnName]; !ok {
  209. curPkValuesMap, err := i.getPkValuesByAuto(ctx, execCtx)
  210. if err != nil {
  211. return nil, err
  212. }
  213. pkValuesMapMerge(&pkValuesMap, curPkValuesMap)
  214. }
  215. }
  216. }
  217. return pkValuesMap, nil
  218. }
  219. // containsPK the columns contains table meta pk
  220. func (i *insertExecutor) containsPK(meta types.TableMeta, parseCtx *types.ParseContext) bool {
  221. pkColumnNameList := meta.GetPrimaryKeyOnlyName()
  222. if len(pkColumnNameList) == 0 {
  223. return false
  224. }
  225. if parseCtx == nil || parseCtx.InsertStmt == nil || parseCtx.InsertStmt.Columns == nil {
  226. return false
  227. }
  228. if len(parseCtx.InsertStmt.Columns) == 0 {
  229. return false
  230. }
  231. matchCounter := 0
  232. for _, column := range parseCtx.InsertStmt.Columns {
  233. for _, pkName := range pkColumnNameList {
  234. if strings.EqualFold(pkName, column.Name.O) ||
  235. strings.EqualFold(pkName, column.Name.L) {
  236. matchCounter++
  237. }
  238. }
  239. }
  240. return matchCounter == len(pkColumnNameList)
  241. }
  242. // containPK compare column name and primary key name
  243. func (i *insertExecutor) containPK(columnName string, meta types.TableMeta) bool {
  244. newColumnName := DelEscape(columnName, types.DBTypeMySQL)
  245. pkColumnNameList := meta.GetPrimaryKeyOnlyName()
  246. if len(pkColumnNameList) == 0 {
  247. return false
  248. }
  249. for _, name := range pkColumnNameList {
  250. if strings.EqualFold(name, newColumnName) {
  251. return true
  252. }
  253. }
  254. return false
  255. }
  256. // getPkIndex get pk index
  257. // return the key is pk column name and the value is index of the pk column
  258. func (i *insertExecutor) getPkIndex(InsertStmt *ast.InsertStmt, meta types.TableMeta) map[string]int {
  259. pkIndexMap := make(map[string]int)
  260. if InsertStmt == nil {
  261. return pkIndexMap
  262. }
  263. insertColumnsSize := len(InsertStmt.Columns)
  264. if insertColumnsSize == 0 {
  265. return pkIndexMap
  266. }
  267. if meta.ColumnNames == nil {
  268. return pkIndexMap
  269. }
  270. if len(meta.Columns) > 0 {
  271. for paramIdx := 0; paramIdx < insertColumnsSize; paramIdx++ {
  272. sqlColumnName := InsertStmt.Columns[paramIdx].Name.O
  273. if i.containPK(sqlColumnName, meta) {
  274. pkIndexMap[sqlColumnName] = paramIdx
  275. }
  276. }
  277. return pkIndexMap
  278. }
  279. pkIndex := -1
  280. allColumns := meta.Columns
  281. for _, columnMeta := range allColumns {
  282. tmpColumnMeta := columnMeta
  283. pkIndex++
  284. if i.containPK(tmpColumnMeta.ColumnName, meta) {
  285. pkIndexMap[DelEscape(tmpColumnMeta.ColumnName, types.DBTypeMySQL)] = pkIndex
  286. }
  287. }
  288. return pkIndexMap
  289. }
  290. // parsePkValuesFromStatement parse primary key value from statement.
  291. // return the primary key and values<key:primary key,value:primary key values></key:primary>
  292. func (i *insertExecutor) parsePkValuesFromStatement(insertStmt *ast.InsertStmt, meta types.TableMeta, nameValues []driver.NamedValue) (map[string][]interface{}, error) {
  293. if insertStmt == nil {
  294. return nil, nil
  295. }
  296. pkIndexMap := i.getPkIndex(insertStmt, meta)
  297. if pkIndexMap == nil || len(pkIndexMap) == 0 {
  298. return nil, fmt.Errorf("pkIndex is not found")
  299. }
  300. var pkIndexArray []int
  301. for _, val := range pkIndexMap {
  302. tmpVal := val
  303. pkIndexArray = append(pkIndexArray, tmpVal)
  304. }
  305. if insertStmt == nil || len(insertStmt.Lists) == 0 {
  306. return nil, fmt.Errorf("parCtx is nil, perhaps InsertStmt is empty")
  307. }
  308. pkValuesMap := make(map[string][]interface{})
  309. if nameValues != nil && len(nameValues) > 0 {
  310. // use prepared statements
  311. insertRows, err := getInsertRows(insertStmt, pkIndexArray)
  312. if err != nil {
  313. return nil, err
  314. }
  315. if insertRows == nil || len(insertRows) == 0 {
  316. return nil, err
  317. }
  318. totalPlaceholderNum := -1
  319. for _, row := range insertRows {
  320. if len(row) == 0 {
  321. continue
  322. }
  323. currentRowPlaceholderNum := -1
  324. for _, r := range row {
  325. rStr, ok := r.(string)
  326. if ok && strings.EqualFold(rStr, sqlPlaceholder) {
  327. totalPlaceholderNum += 1
  328. currentRowPlaceholderNum += 1
  329. }
  330. }
  331. var pkKey string
  332. var pkIndex int
  333. var pkValues []interface{}
  334. for key, index := range pkIndexMap {
  335. curKey := key
  336. curIndex := index
  337. pkKey = curKey
  338. pkValues = pkValuesMap[pkKey]
  339. pkIndex = curIndex
  340. if pkIndex > len(row)-1 {
  341. continue
  342. }
  343. pkValue := row[pkIndex]
  344. pkValueStr, ok := pkValue.(string)
  345. if ok && strings.EqualFold(pkValueStr, sqlPlaceholder) {
  346. currentRowNotPlaceholderNumBeforePkIndex := 0
  347. for i := range row {
  348. r := row[i]
  349. rStr, ok := r.(string)
  350. if i < pkIndex && ok && !strings.EqualFold(rStr, sqlPlaceholder) {
  351. currentRowNotPlaceholderNumBeforePkIndex++
  352. }
  353. }
  354. idx := totalPlaceholderNum - currentRowPlaceholderNum + pkIndex - currentRowNotPlaceholderNumBeforePkIndex
  355. pkValues = append(pkValues, nameValues[idx].Value)
  356. } else {
  357. pkValues = append(pkValues, pkValue)
  358. }
  359. if _, ok := pkValuesMap[pkKey]; !ok {
  360. pkValuesMap[pkKey] = pkValues
  361. }
  362. }
  363. }
  364. } else {
  365. for _, list := range insertStmt.Lists {
  366. for pkName, pkIndex := range pkIndexMap {
  367. tmpPkName := pkName
  368. tmpPkIndex := pkIndex
  369. if tmpPkIndex >= len(list) {
  370. return nil, fmt.Errorf("pkIndex out of range")
  371. }
  372. if node, ok := list[tmpPkIndex].(ast.ValueExpr); ok {
  373. pkValuesMap[tmpPkName] = append(pkValuesMap[tmpPkName], node.GetValue())
  374. }
  375. }
  376. }
  377. }
  378. return pkValuesMap, nil
  379. }
  380. // getPkValuesByColumn get pk value by column.
  381. func (i *insertExecutor) getPkValuesByColumn(ctx context.Context, execCtx *types.ExecContext) (map[string][]interface{}, error) {
  382. if !i.isAstStmtValid() {
  383. return nil, nil
  384. }
  385. tableName, _ := i.parserCtx.GetTableName()
  386. meta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
  387. if err != nil {
  388. return nil, err
  389. }
  390. pkValuesMap, err := i.parsePkValuesFromStatement(i.parserCtx.InsertStmt, *meta, execCtx.NamedValues)
  391. if err != nil {
  392. return nil, err
  393. }
  394. // generate pkValue by auto increment
  395. for _, v := range pkValuesMap {
  396. tmpV := v
  397. if len(tmpV) == 1 {
  398. // pk auto generated while single insert primary key is expression
  399. if _, ok := tmpV[0].(*ast.FuncCallExpr); ok {
  400. curPkValueMap, err := i.getPkValuesByAuto(ctx, execCtx)
  401. if err != nil {
  402. return nil, err
  403. }
  404. pkValuesMapMerge(&pkValuesMap, curPkValueMap)
  405. }
  406. } else if len(tmpV) > 0 && tmpV[0] == nil {
  407. // pk auto generated while column exists and value is null
  408. curPkValueMap, err := i.getPkValuesByAuto(ctx, execCtx)
  409. if err != nil {
  410. return nil, err
  411. }
  412. pkValuesMapMerge(&pkValuesMap, curPkValueMap)
  413. }
  414. }
  415. return pkValuesMap, nil
  416. }
  417. func (i *insertExecutor) getPkValuesByAuto(ctx context.Context, execCtx *types.ExecContext) (map[string][]interface{}, error) {
  418. if !i.isAstStmtValid() {
  419. return nil, nil
  420. }
  421. tableName, _ := i.parserCtx.GetTableName()
  422. metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
  423. if err != nil {
  424. return nil, err
  425. }
  426. pkValuesMap := make(map[string][]interface{})
  427. pkMetaMap := metaData.GetPrimaryKeyMap()
  428. if len(pkMetaMap) == 0 {
  429. return nil, fmt.Errorf("pk map is empty")
  430. }
  431. var autoColumnName string
  432. for _, columnMeta := range pkMetaMap {
  433. tmpColumnMeta := columnMeta
  434. if tmpColumnMeta.Autoincrement {
  435. autoColumnName = tmpColumnMeta.ColumnName
  436. break
  437. }
  438. }
  439. if len(autoColumnName) == 0 {
  440. return nil, fmt.Errorf("auto increment column not exist")
  441. }
  442. updateCount, err := i.businesSQLResult.GetResult().RowsAffected()
  443. if err != nil {
  444. return nil, err
  445. }
  446. lastInsertId, err := i.businesSQLResult.GetResult().LastInsertId()
  447. if err != nil {
  448. return nil, err
  449. }
  450. // If there is batch insert
  451. // do auto increment base LAST_INSERT_ID and variable `auto_increment_increment`
  452. if lastInsertId > 0 && updateCount > 1 && canAutoIncrement(pkMetaMap) {
  453. return i.autoGeneratePks(execCtx, autoColumnName, lastInsertId, updateCount)
  454. }
  455. if lastInsertId > 0 {
  456. var pkValues []interface{}
  457. pkValues = append(pkValues, lastInsertId)
  458. pkValuesMap[autoColumnName] = pkValues
  459. return pkValuesMap, nil
  460. }
  461. return nil, nil
  462. }
  463. func canAutoIncrement(pkMetaMap map[string]types.ColumnMeta) bool {
  464. if len(pkMetaMap) != 1 {
  465. return false
  466. }
  467. for _, meta := range pkMetaMap {
  468. return meta.Autoincrement
  469. }
  470. return false
  471. }
  472. func (i *insertExecutor) isAstStmtValid() bool {
  473. return i.parserCtx != nil && i.parserCtx.InsertStmt != nil
  474. }
  475. func (i *insertExecutor) autoGeneratePks(execCtx *types.ExecContext, autoColumnName string, lastInsetId, updateCount int64) (map[string][]interface{}, error) {
  476. var step int64
  477. if i.incrementStep > 0 {
  478. step = int64(i.incrementStep)
  479. } else {
  480. // get step by query sql
  481. stmt, err := execCtx.Conn.Prepare("SHOW VARIABLES LIKE 'auto_increment_increment'")
  482. if err != nil {
  483. log.Errorf("build prepare stmt: %+v", err)
  484. return nil, err
  485. }
  486. rows, err := stmt.Query(nil)
  487. if err != nil {
  488. log.Errorf("stmt query: %+v", err)
  489. return nil, err
  490. }
  491. if len(rows.Columns()) > 0 {
  492. var curStep []driver.Value
  493. if err := rows.Next(curStep); err != nil {
  494. return nil, err
  495. }
  496. if curStepInt, ok := curStep[0].(int64); ok {
  497. step = curStepInt
  498. }
  499. } else {
  500. return nil, fmt.Errorf("query is empty")
  501. }
  502. }
  503. if step == 0 {
  504. return nil, fmt.Errorf("get increment step error")
  505. }
  506. var pkValues []interface{}
  507. for j := int64(0); j < updateCount; j++ {
  508. pkValues = append(pkValues, lastInsetId+j*step)
  509. }
  510. pkValuesMap := make(map[string][]interface{})
  511. pkValuesMap[autoColumnName] = pkValues
  512. return pkValuesMap, nil
  513. }
  514. func pkValuesMapMerge(dest *map[string][]interface{}, src map[string][]interface{}) {
  515. for k, v := range src {
  516. tmpK := k
  517. tmpV := v
  518. (*dest)[tmpK] = append((*dest)[tmpK], tmpV)
  519. }
  520. }
  521. // containsColumns judge sql specify column
  522. func containsColumns(parseCtx *types.ParseContext) bool {
  523. if parseCtx == nil || parseCtx.InsertStmt == nil || parseCtx.InsertStmt.Columns == nil {
  524. return false
  525. }
  526. return len(parseCtx.InsertStmt.Columns) > 0
  527. }
  528. func getInsertRows(insertStmt *ast.InsertStmt, pkIndexArray []int) ([][]interface{}, error) {
  529. if insertStmt == nil {
  530. return nil, nil
  531. }
  532. if len(insertStmt.Lists) == 0 {
  533. return nil, nil
  534. }
  535. var rows [][]interface{}
  536. for _, nodes := range insertStmt.Lists {
  537. var row []interface{}
  538. for i, node := range nodes {
  539. if _, ok := node.(ast.ParamMarkerExpr); ok {
  540. row = append(row, sqlPlaceholder)
  541. } else if newNode, ok := node.(ast.ValueExpr); ok {
  542. row = append(row, newNode.GetValue())
  543. } else if newNode, ok := node.(*ast.VariableExpr); ok {
  544. row = append(row, newNode.Name)
  545. } else if _, ok := node.(*ast.FuncCallExpr); ok {
  546. row = append(row, ast.FuncCallExpr{})
  547. } else {
  548. for _, index := range pkIndexArray {
  549. if index == i {
  550. return nil, fmt.Errorf("Unknown SQLExpr:%v", node)
  551. }
  552. }
  553. row = append(row, ast.DefaultExpr{})
  554. }
  555. }
  556. rows = append(rows, row)
  557. }
  558. return rows, nil
  559. }