Skip to content

Commit

Permalink
planner: fix compatibility issue that update subquery table should be…
Browse files Browse the repository at this point in the history
… forbidden (#7783)
  • Loading branch information
CodeRushing authored and Lingyu Song committed Sep 26, 2018
1 parent cfd4544 commit af3b782
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
7 changes: 7 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,13 @@ func (s *testSuite) TestUpdate(c *C) {
c.Assert(err.Error(), Equals, "cannot convert datum from bigint to type year.")

tk.MustExec("update (select * from t) t set c1 = 1111111")

// issue 7237, update subquery table should be forbidden
tk.MustExec("drop table t")
tk.MustExec("create table t (k int, v int)")
_, err = tk.Exec("update t, (select * from t) as b set b.k = t.k")
c.Assert(err.Error(), Equals, "[planner:1288]The target table b of the UPDATE is not updatable")
tk.MustExec("update t, (select * from t) as b set t.k = b.k")
}

func (s *testSuite) TestPartitionedTableUpdate(c *C) {
Expand Down
25 changes: 20 additions & 5 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,17 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) (Plan, error) {
defer b.popTableHints()
}

// update subquery table should be forbidden
var asNameList []string
asNameList = extractTableSourceAsNames(update.TableRefs.TableRefs, asNameList, true)
for _, asName := range asNameList {
for _, assign := range update.List {
if assign.Column.Table.L == asName {
return nil, ErrNonUpdatableTable.GenWithStackByArgs(asName, "UPDATE")
}
}
}

b.inUpdateStmt = true
sel := &ast.SelectStmt{
Fields: &ast.FieldList{},
Expand Down Expand Up @@ -2268,7 +2279,7 @@ func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) (Plan, error) {
}
if !foundMatch {
var asNameList []string
asNameList = extractTableSourceAsNames(delete.TableRefs.TableRefs, asNameList)
asNameList = extractTableSourceAsNames(delete.TableRefs.TableRefs, asNameList, false)
for _, asName := range asNameList {
tblName := tn.Name.L
if tn.Schema.L != "" {
Expand Down Expand Up @@ -2319,13 +2330,17 @@ func extractTableList(node ast.ResultSetNode, input []*ast.TableName) []*ast.Tab
return input
}

// extractTableSourceAsNames extracts all the TableSource.AsNames from node.
func extractTableSourceAsNames(node ast.ResultSetNode, input []string) []string {
// extractTableSourceAsNames extracts TableSource.AsNames from node.
// if onlySelectStmt is set to be true, only extracts AsNames when TableSource.Source.(type) == *ast.SelectStmt
func extractTableSourceAsNames(node ast.ResultSetNode, input []string, onlySelectStmt bool) []string {
switch x := node.(type) {
case *ast.Join:
input = extractTableSourceAsNames(x.Left, input)
input = extractTableSourceAsNames(x.Right, input)
input = extractTableSourceAsNames(x.Left, input, onlySelectStmt)
input = extractTableSourceAsNames(x.Right, input, onlySelectStmt)
case *ast.TableSource:
if _, ok := x.Source.(*ast.SelectStmt); !ok && onlySelectStmt {
break
}
if s, ok := x.Source.(*ast.TableName); ok {
if x.AsName.L == "" {
input = append(input, s.Name.L)
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,7 @@ func (s *testPlanSuite) TestNameResolver(c *C) {
{"select a from t group by t11.c1", "[planner:1054]Unknown column 't11.c1' in 'group statement'"},
{"delete a from (select * from t ) as a, t", "[planner:1288]The target table a of the DELETE is not updatable"},
{"delete b from (select * from t ) as a, t", "[planner:1109]Unknown table 'b' in MULTI DELETE"},
{"update t, (select * from t) as b set b.a = t.a", "[planner:1288]The target table b of the UPDATE is not updatable"},
}

for _, t := range tests {
Expand Down

0 comments on commit af3b782

Please sign in to comment.