Skip to content

Commit

Permalink
Merge #29487
Browse files Browse the repository at this point in the history
29487: sql: fix UPSERT RETURNING in presence of schema changes r=jordanlewis a=jordanlewis

Fixes #29436.

Release note (bug fix): fix crash caused by certain kinds of UPSERT
RETURNING statements on tables with active schema changes.

Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
  • Loading branch information
craig[bot] and jordanlewis committed Sep 4, 2018
2 parents 6f64580 + e9f0f9b commit 1571818
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 9 deletions.
65 changes: 65 additions & 0 deletions pkg/sql/descriptor_mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,63 @@ func (mt mutationTest) writeMutation(m sqlbase.DescriptorMutation) {
}
}

// Test that UPSERT with a column mutation that has a default value with a
// NOT NULL constraint can handle the null input to its row fetcher, and
// produces output rows of the correct shape.
// Regression test for #29436.
func TestUpsertWithColumnMutationAndNotNullDefault(t *testing.T) {
defer leaktest.AfterTest(t)()
// The descriptor changes made must have an immediate effect
// so disable leases on tables.
defer sql.TestDisableTableLeases()()
// Disable external processing of mutations.
params, _ := tests.CreateTestServerParams()
params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{
AsyncExecNotification: asyncSchemaChangerDisabled,
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.TODO())

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR);
INSERT INTO t.test VALUES('a', 'foo');
ALTER TABLE t.test ADD COLUMN i VARCHAR NOT NULL DEFAULT 'i';
`); err != nil {
t.Fatal(err)
}

// read table descriptor
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")

mTest := makeMutationTest(t, kvDB, sqlDB, tableDesc)
// Add column "i" as a mutation in delete/write.
mTest.writeColumnMutation("i", sqlbase.DescriptorMutation{State: sqlbase.DescriptorMutation_DELETE_AND_WRITE_ONLY})

// This row will conflict with the original row, and should insert an `i`
// into the new column.
mTest.Exec(t, `UPSERT INTO t.test VALUES('a', 'bar') RETURNING k`)

// These rows will not conflict.
mTest.Exec(t, `UPSERT INTO t.test VALUES('b', 'bar') RETURNING k`)
mTest.Exec(t, `INSERT INTO t.test VALUES('c', 'bar') RETURNING k, v`)
mTest.Exec(t, `INSERT INTO t.test VALUES('c', 'bar') ON CONFLICT(k) DO UPDATE SET v='qux' RETURNING k`)

mTest.CheckQueryResults(t, `SELECT * FROM t.test`, [][]string{
{"a", "bar"},
{"b", "bar"},
{"c", "qux"},
})

mTest.makeMutationsActive()

mTest.CheckQueryResults(t, `SELECT * FROM t.test`, [][]string{
{"a", "bar", "i"},
{"b", "bar", "i"},
{"c", "qux", "i"},
})
}

// Test INSERT, UPDATE, UPSERT, and DELETE operations with a column schema
// change.
func TestOperationsWithColumnMutation(t *testing.T) {
Expand Down Expand Up @@ -232,6 +289,14 @@ CREATE INDEX allidx ON t.test (k, v);
if !testutils.IsError(err, `column "i" does not exist`) {
t.Fatal(err)
}
if useUpsert {
_, err = sqlDB.Exec(`UPSERT INTO t.test (k, v) VALUES ('b', 'y') RETURNING i`)
} else {
_, err = sqlDB.Exec(`INSERT INTO t.test (k, v) VALUES ('b', 'y') RETURNING i`)
}
if !testutils.IsError(err, `column "i" does not exist`) {
t.Fatal(err)
}

// Repeating the same without specifying the columns results in a different error.
if useUpsert {
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ UPSERT INTO kv VALUES (11, 10), (11, 12) RETURNING k, v
11 10
11 12

query I
UPSERT INTO kv VALUES (11) RETURNING k
----
11

query I
UPSERT INTO kv VALUES (11, 12) RETURNING v
----
12

statement count 1
INSERT INTO kv VALUES (13, 13), (7, 8) ON CONFLICT (k) DO NOTHING RETURNING *

Expand Down
43 changes: 34 additions & 9 deletions pkg/sql/tablewriter_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type tableUpserterBase struct {
// rowTemplate is used to prepare rows to add to rowsUpserted.
rowTemplate tree.Datums
// rowIdxToRetIdx maps the indices in the inserted rows
// back to indices in rowTemplate.
// back to indices in rowTemplate. Contains dontReturnCol (-1) for indices
// in the inserted rows that are mutation columns which shouldn't be returned
// to the user.
rowIdxToRetIdx []int
// resultCount is the number of upserts. Mirrors rowsUpserted.Len() if
// collectRows is set, counted separately otherwise.
Expand All @@ -52,6 +54,10 @@ type tableUpserterBase struct {
indexKeyPrefix []byte
}

// dontReturnCol is stored in rowIdxToRetIdx to indicate indices in the inserted
// rows that are mutation columns which shouldn't be returned to the user.
const dontReturnCol = -1

func (tu *tableUpserterBase) init(txn *client.Txn, evalCtx *tree.EvalContext) error {
tu.tableWriterBase.init(txn)
tableDesc := tu.tableDesc()
Expand All @@ -77,13 +83,22 @@ func (tu *tableUpserterBase) init(txn *client.Txn, evalCtx *tree.EvalContext) er
}

// Create the map from insert rows to returning rows.
// Note that this map will *not* contain any mutation columns - that's because
// even though we might insert values into mutation columns, we never return
// them back to the user.
colIDToRetIndex := map[sqlbase.ColumnID]int{}
for i, col := range tableDesc.Columns {
colIDToRetIndex[col.ID] = i
}
tu.rowIdxToRetIdx = make([]int, len(tu.ri.InsertCols))
for i, col := range tu.ri.InsertCols {
tu.rowIdxToRetIdx[i] = colIDToRetIndex[col.ID]
retID, ok := colIDToRetIndex[col.ID]
if !ok {
// If the column is missing, it means it's a mutation column. Put -1 in
// the map to signal that we don't want it in the returning row.
retID = dontReturnCol
}
tu.rowIdxToRetIdx[i] = retID
}

tu.insertRows.Init(
Expand Down Expand Up @@ -149,16 +164,25 @@ func (tu *tableUpserterBase) finalize(
func (tu *tableUpserterBase) makeResultFromInsertRow(
insertRow tree.Datums, cols []sqlbase.ColumnDescriptor,
) tree.Datums {
resultRow := insertRow
if len(resultRow) < len(cols) {
resultRow = make(tree.Datums, len(cols))
// Pre-fill with NULLs.
if len(insertRow) == len(cols) {
// The row we inserted was already the right shape.
return insertRow
}
resultRow := make(tree.Datums, len(cols))
if len(insertRow) < len(cols) {
// The row we inserted didn't have all columns filled out. Fill the columns
// that weren't included with NULLs.
for i := range resultRow {
resultRow[i] = tree.DNull
}
// Fill the other values from insertRow.
for i, val := range insertRow {
resultRow[tu.rowIdxToRetIdx[i]] = val
}
// Now, fill the other values from insertRow.
for i, val := range insertRow {
retIdx := tu.rowIdxToRetIdx[i]
if retIdx != dontReturnCol {
// We don't want to return values we wrote into non-public columns.
// These values have retIdx = dontReturnCol. Skip them.
resultRow[retIdx] = val
}
}
return resultRow
Expand Down Expand Up @@ -430,6 +454,7 @@ func (tu *tableUpserter) atBatchEnd(ctx context.Context, traceKV bool) error {
// Do we need to remember a result for RETURNING?
if tu.collectRows {
// Yes, collect it.
resultRow = tu.makeResultFromInsertRow(resultRow, tableDesc.Columns)
_, err = tu.rowsUpserted.AddRow(ctx, resultRow)
if err != nil {
return err
Expand Down

0 comments on commit 1571818

Please sign in to comment.