Skip to content

Commit

Permalink
[YSQL] #1444 #3106: Update only the assigned columns in UPDATE statement
Browse files Browse the repository at this point in the history
Summary:
For statements like `UPDATE test set c1 = x, c3 = y ..` make sure we only
send to DocDB the modified columns (i.e. assigned or has default value)

Additionally fix the single-row detection for update&delete to handle
partial-row updates and as well as immutable functions/operators.

Concrete changes:
 1. For update, only send (values of) modified columns to DocDB
 2. Fix update/delete single-row pushdown not checking for (equality) operator correctly
 3. Improve single-row update detection by handling stable functions and partial-columns updates

Test Plan: jenkins, TestPgUpdate, TestPgRegressDml

Reviewers: neha, neil

Reviewed By: neil

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D7653
  • Loading branch information
m-iancu committed Dec 19, 2019
1 parent cafbdc2 commit 7e222b4
Show file tree
Hide file tree
Showing 16 changed files with 651 additions and 90 deletions.
60 changes: 60 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.yb.util.YBTestRunnerNonTsanOnly;

import static java.lang.Math.toIntExact;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -28,6 +29,8 @@
import java.util.List;

import static org.yb.AssertionWrappers.assertEquals;
import static org.yb.AssertionWrappers.assertTrue;
import static org.yb.AssertionWrappers.fail;

@RunWith(value=YBTestRunnerNonTsanOnly.class)
public class TestPgUpdate extends BasePgSQLTest {
Expand Down Expand Up @@ -223,4 +226,61 @@ public void testUpdateEnforceConstraints() throws SQLException {
assertEquals(expectedRows, getSortedRowList(returning));
}
}

@Test
public void testConcurrentUpdate() throws Exception {

connection.createStatement().execute("create table test_concurrent_update (k int primary key," +
" v1 int, v2 int, v3 int, v4 int)");
connection.createStatement().execute("insert into test_concurrent_update values"+
" (0, 0, 0, 0, 0)");

final List<Throwable> errors = new ArrayList<Throwable>();

// Test concurrent update to individual columns from 1 to 100. They should not block one
// another.
List<Thread> threads = new ArrayList<Thread>();
for (int i = 1; i <= 4; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
PreparedStatement updateStmt = connection.prepareStatement(
String.format("update test_concurrent_update set v%d = ? where k = 0", index));
PreparedStatement selectStmt = connection.prepareStatement(
String.format("select v%d from test_concurrent_update where k = 0", index));

for (int j = 1; j <= 100; j++) {
// Update column.
updateStmt.setInt(1, j);
updateStmt.execute();

// Verify update.
ResultSet rs = selectStmt.executeQuery();
assertNextRow(rs, j);
}

} catch (Throwable e) {
synchronized (errors) {
errors.add(e);
}
}
});
thread.start();
threads.add(thread);
}

for (Thread thread : threads) {
thread.join();
}

// Verify final result of all columns.
assertOneRow("select v1, v2, v3, v4 from test_concurrent_update where k = 0",
100, 100, 100, 100);

// Log the actual errors that occurred.
for (Throwable e : errors) {
LOG.error("Errors occurred", e);
}
assertTrue(errors.isEmpty());
}
}
9 changes: 7 additions & 2 deletions src/postgres/src/backend/executor/nodeModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#include "access/sysattr.h"
#include "catalog/pg_database.h"
#include "executor/ybcModifyTable.h"
#include "parser/parsetree.h"
#include "pg_yb_utils.h"
#include "optimizer/ybcplan.h"

Expand Down Expand Up @@ -1132,7 +1133,12 @@ ExecUpdate(ModifyTableState *mtstate,
if (resultRelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);

bool row_found = YBCExecuteUpdate(resultRelationDesc, planSlot, tuple, estate, mtstate);

RangeTblEntry *rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
estate->es_range_table);

bool row_found = YBCExecuteUpdate(resultRelationDesc, planSlot, tuple, estate, mtstate, rte->updatedCols);

if (!row_found)
{
/*
Expand Down Expand Up @@ -2463,7 +2469,6 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
mtstate->mt_plans = (PlanState **) palloc0(sizeof(PlanState *) * nplans);
mtstate->resultRelInfo = estate->es_result_relations + node->resultRelIndex;
mtstate->yb_mt_is_single_row_update_or_delete = YBCIsSingleRowUpdateOrDelete(node);
mtstate->yb_mt_update_attrs = node->ybUpdateAttrs;

/* If modifying a partitioned table, initialize the root table info */
if (node->rootResultRelIndex >= 0)
Expand Down
16 changes: 10 additions & 6 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,14 +682,14 @@ bool YBCExecuteUpdate(Relation rel,
TupleTableSlot *slot,
HeapTuple tuple,
EState *estate,
ModifyTableState *mtstate)
ModifyTableState *mtstate,
Bitmapset *updatedCols)
{
TupleDesc tupleDesc = slot->tts_tupleDescriptor;
Oid dboid = YBCGetDatabaseOid(rel);
Oid relid = RelationGetRelid(rel);
YBCPgStatement update_stmt = NULL;
bool isSingleRow = mtstate->yb_mt_is_single_row_update_or_delete;
Bitmapset *update_attrs = mtstate->yb_mt_update_attrs;
Datum ybctid = 0;

/* Create update statement. */
Expand Down Expand Up @@ -731,23 +731,27 @@ bool YBCExecuteUpdate(Relation rel,
YBTupleIdAttributeNumber,
ybctid_expr), update_stmt);

/* Assign new values to columns for updating the current row. */
/* Assign new values to the updated columns for the current row. */
tupleDesc = RelationGetDescr(rel);
bool whole_row = bms_is_member(InvalidAttrNumber, updatedCols);
for (int idx = 0; idx < tupleDesc->natts; idx++)
{
AttrNumber attnum = TupleDescAttr(tupleDesc, idx)->attnum;

bool has_default = TupleDescAttr(tupleDesc, idx)->atthasdef;
/* Skip virtual (system) and dropped columns */
if (!IsRealYBColumn(rel, attnum))
continue;

if (update_attrs && !bms_is_member(attnum, update_attrs))
/* Skip unmodified columns */
int bms_idx = attnum - YBGetFirstLowInvalidAttributeNumber(rel);
if (!whole_row && !bms_is_member(bms_idx, updatedCols) && !has_default)
continue;

bool is_null = false;
Datum d = heap_getattr(tuple, attnum, tupleDesc, &is_null);
YBCPgExpr ybc_expr = YBCNewConstant(update_stmt, TupleDescAttr(tupleDesc, idx)->atttypid,
d, is_null);

HandleYBStmtStatus(YBCPgDmlAssignColumn(update_stmt, attnum, ybc_expr), update_stmt);
}

Expand Down Expand Up @@ -840,7 +844,7 @@ void YBCUpdateSysCatalogTuple(Relation rel, HeapTuple oldtuple, HeapTuple tuple)
/* Bind the ybctid to the statement. */
YBCBindTupleId(update_stmt, tuple->t_ybctid);

/* Assign new values to columns for updating the current row. */
/* Assign values to the non-primary-key columns to update the current row. */
for (int idx = 0; idx < natts; idx++)
{
AttrNumber attnum = TupleDescAttr(tupleDesc, idx)->attnum;
Expand Down
1 change: 0 additions & 1 deletion src/postgres/src/backend/nodes/copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ _copyModifyTable(const ModifyTable *from)
COPY_NODE_FIELD(onConflictWhere);
COPY_SCALAR_FIELD(exclRelRTI);
COPY_NODE_FIELD(exclRelTlist);
COPY_BITMAPSET_FIELD(ybUpdateAttrs);

return newnode;
}
Expand Down
1 change: 0 additions & 1 deletion src/postgres/src/backend/nodes/outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ _outModifyTable(StringInfo str, const ModifyTable *node)
WRITE_NODE_FIELD(onConflictWhere);
WRITE_UINT_FIELD(exclRelRTI);
WRITE_NODE_FIELD(exclRelTlist);
WRITE_BITMAPSET_FIELD(ybUpdateAttrs);
}

static void
Expand Down
78 changes: 56 additions & 22 deletions src/postgres/src/backend/optimizer/plan/createplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
#include "partitioning/partprune.h"
#include "utils/selfuncs.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"

#include "pg_yb_utils.h"
#include "optimizer/ybcplan.h"
Expand Down Expand Up @@ -112,7 +114,7 @@ static RecursiveUnion *create_recursiveunion_plan(PlannerInfo *root, RecursiveUn
static LockRows *create_lockrows_plan(PlannerInfo *root, LockRowsPath *best_path,
int flags);
static bool yb_single_row_update_or_delete_path(PlannerInfo *root, ModifyTablePath *path,
List **tlist, Bitmapset **update_attrs);
List **tlist);
static ModifyTable *create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path);
static Limit *create_limit_plan(PlannerInfo *root, LimitPath *best_path,
int flags);
Expand Down Expand Up @@ -286,8 +288,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
bool partColsUpdated,
List *resultRelations, List *subplans, List *subroots,
List *withCheckOptionLists, List *returningLists,
List *rowMarks, OnConflictExpr *onconflict, int epqParam,
Bitmapset *yb_update_attrs);
List *rowMarks, OnConflictExpr *onconflict, int epqParam);
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);

Expand Down Expand Up @@ -2382,8 +2383,7 @@ create_lockrows_plan(PlannerInfo *root, LockRowsPath *best_path,
static bool
yb_single_row_update_or_delete_path(PlannerInfo *root,
ModifyTablePath *path,
List **tlist,
Bitmapset **update_attrs)
List **tlist)
{
RelOptInfo *relInfo;
Oid relid;
Expand All @@ -2400,7 +2400,7 @@ yb_single_row_update_or_delete_path(PlannerInfo *root,
int attr_num;

*tlist = NIL;
*update_attrs = NULL;
Bitmapset *update_attrs = NULL;

/* Verify YB is enabled. */
if (!IsYugaByteEnabled())
Expand Down Expand Up @@ -2500,15 +2500,25 @@ yb_single_row_update_or_delete_path(PlannerInfo *root,
tle = lfirst_node(TargetEntry, values);

/* Ignore unspecified columns. */
if (IsA(tle->expr, Var) && castNode(Var, tle->expr)->vartypmod == -1)
continue;
if (IsA(tle->expr, Var)) {
Var *var = castNode(Var, tle->expr);
/*
* Column set to itself (unset) or ybctid pseudo-column
* (added for YB scan in rewrite handler).
*/
if (var->varattno == tle->resno ||
(var->varattno == YBTupleIdAttributeNumber &&
var->varcollid == InvalidOid)) {
continue;
}
}

/* Verify expression is supported. */
if (!YBCIsSupportedSingleRowModifyWriteExpr(tle->expr))
return false;

subpath_tlist = lappend(subpath_tlist, tle);
*update_attrs = bms_add_member(*update_attrs, tle->resno);
update_attrs = bms_add_member(update_attrs, tle->resno);
}
}

Expand All @@ -2525,7 +2535,39 @@ yb_single_row_update_or_delete_path(PlannerInfo *root,
return false;
}

/* Add WHERE clauses from index scan quals to list, verify they are supported write exprs. */
/* Check that all WHERE clause conditions use equality operator. */
List *qinfos;
ListCell *lc;
qinfos = deconstruct_indexquals(index_path);
foreach(lc, qinfos)
{
IndexQualInfo *qinfo = (IndexQualInfo *) lfirst(lc);
RestrictInfo *rinfo = qinfo->rinfo;
Expr *clause = rinfo->clause;
Oid clause_op;
int op_strategy;

if (!IsA(clause, OpExpr))
return false;

clause_op = qinfo->clause_op;
if (!OidIsValid(clause_op))
return false;

op_strategy = get_op_opfamily_strategy(clause_op, index_path->indexinfo->opfamily[qinfo->indexcol]);
Assert(op_strategy != 0); /* not a member of opfamily?? */
/* Only pushdown equal operators. */
if (op_strategy != BTEqualStrategyNumber) {
return false;
}
}

/*
* Add WHERE clauses from index scan quals to list, verify they are supported write exprs.
* i.e. after fix_indexqual_references:
* - LHS must be a (key) column
* - RHS must be a "stable" expression (evaluate to constant)
*/
foreach(values, fix_indexqual_references(subroot, index_path))
{
Expr *clause;
Expand Down Expand Up @@ -2580,7 +2622,7 @@ yb_single_row_update_or_delete_path(PlannerInfo *root,
TargetEntry *tle;

tle = lfirst_node(TargetEntry, values);
if (!bms_is_member(tle->resorigcol, *update_attrs) &&
if (!bms_is_member(tle->resorigcol, update_attrs) &&
!bms_is_member(tle->resorigcol, primary_key_attrs))
return false;
}
Expand Down Expand Up @@ -2659,22 +2701,19 @@ create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path)
ListCell *subpaths,
*subroots;
List *tlist = NIL;
Bitmapset *yb_update_attrs_param = NULL;
Bitmapset *yb_update_attrs = NULL;

/*
* If we are a single row UPDATE/DELETE in a YB relation, add Result subplan
* instead of IndexScan. It is necessary to avoid the scan since we will be
* running outside of a transaction and thus cannot rely on the results from a
* seperately executed operation.
*/
if (yb_single_row_update_or_delete_path(root, best_path, &tlist, &yb_update_attrs_param))
if (yb_single_row_update_or_delete_path(root, best_path, &tlist))
{
Plan *subplan = (Plan *) make_result(tlist, NULL, NULL);
copy_generic_path_info(subplan, linitial(best_path->subpaths));

subplans = lappend(subplans, subplan);
yb_update_attrs = yb_update_attrs_param;
}
else
{
Expand Down Expand Up @@ -2719,8 +2758,7 @@ create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path)
best_path->returningLists,
best_path->rowMarks,
best_path->onconflict,
best_path->epqParam,
yb_update_attrs);
best_path->epqParam);

copy_generic_path_info(&plan->plan, &best_path->path);

Expand Down Expand Up @@ -6654,8 +6692,7 @@ make_modifytable(PlannerInfo *root,
bool partColsUpdated,
List *resultRelations, List *subplans, List *subroots,
List *withCheckOptionLists, List *returningLists,
List *rowMarks, OnConflictExpr *onconflict, int epqParam,
Bitmapset *yb_update_attrs)
List *rowMarks, OnConflictExpr *onconflict, int epqParam)
{
ModifyTable *node = makeNode(ModifyTable);
List *fdw_private_list;
Expand Down Expand Up @@ -6786,9 +6823,6 @@ make_modifytable(PlannerInfo *root,
}
node->fdwPrivLists = fdw_private_list;
node->fdwDirectModifyPlans = direct_modify_plans;

node->ybUpdateAttrs = yb_update_attrs;

return node;
}

Expand Down
Loading

0 comments on commit 7e222b4

Please sign in to comment.