Skip to content

Commit

Permalink
[YSQL] #1090 #1091 Reduce per-connection overhead for cache preloading
Browse files Browse the repository at this point in the history
Summary:
- Use the relcache file to store relation cache data, add ysql catalog version to
  detect (and clean up) old (no longer valid) files.
- No longer preload catcache (only relcache) for backend connections except during
  initdb which makes heavy use of various relations and builtins.
- No longer increment catalog version for inserts, only for updates/deletes. Since we don't do
  negative caching, inserts (e.g. from CREATE TABLE) do not invalidate the cache, only
  updates/deletes (e.g. ALTER or DROP TABLE).
- Fix a cache refrence leak bug and clean up the logic for retry and cache-refresh due to
  catalog version mismatches.

Test Plan:
jenkins, TestPgCacheConsistency, sample apps with high number of connections:
java -jar ./target/yb-sample-apps.jar --workload SqlInserts --nodes 127.0.0.1:5433 --num_threads_write 99 --num_threads_read 0 --num_unique_keys 1000000000

Reviewers: mikhail, robert

Reviewed By: robert

Subscribers: yql, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D6412
  • Loading branch information
m-iancu committed Apr 2, 2019
1 parent 382b89c commit 465b6d6
Show file tree
Hide file tree
Showing 20 changed files with 541 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void testBasicDDLOperations() throws Exception {
statement2.execute("DROP TABLE cache_test1");

// Check that insert now fails on both tables.
runInvalidQuery(statement1, "INSERT INTO cache_test1(a) VALUES (3)");
runInvalidQuery(statement2, "INSERT INTO cache_test1(a) VALUES (4)");
runInvalidQuery(statement1, "INSERT INTO cache_test1(a) VALUES (3)", "does not exist");
runInvalidQuery(statement2, "INSERT INTO cache_test1(a) VALUES (4)", "does not exist");

// Create and use a new table on connection 1.
statement1.execute("CREATE TABLE cache_test2(a int)");
Expand All @@ -86,7 +86,8 @@ public void testBasicDDLOperations() throws Exception {
statement1.execute("CREATE TABLE cache_test2(a bool)");

// Check that we cannot still insert a float (but that bool will work).
runInvalidQuery(statement2, "INSERT INTO cache_test2(a) VALUES (1.0)");
runInvalidQuery(statement2, "INSERT INTO cache_test2(a) VALUES (1.0)",
"type boolean but expression is of type numeric");
statement2.execute("INSERT INTO cache_test2(a) VALUES (true)");
expectedRows.add(new Row(true));
statement1.execute("INSERT INTO cache_test2(a) VALUES (false)");
Expand All @@ -98,6 +99,26 @@ public void testBasicDDLOperations() throws Exception {
}
expectedRows.clear();

// Alter the table to add a column.
statement1.execute("ALTER TABLE cache_test2 ADD COLUMN b int");
expectedRows.add(new Row(true, null));
expectedRows.add(new Row(false, null));

// First query should fail because we cannot yet retry parsing errors within
// parse-bind-execute blocks (JDBC default execution). But it should refresh cache so
// second attempt should succeed.
runInvalidQuery(statement2,"INSERT INTO cache_test2(a,b) VALUES (true, 11)",
"Catalog Version Mismatch");
statement2.execute("INSERT INTO cache_test2(a,b) VALUES (true, 11)");
expectedRows.add(new Row(true, 11));
statement1.execute("INSERT INTO cache_test2(a,b) VALUES (false, 12)");
expectedRows.add(new Row(false, 12));

// Check values.
try (ResultSet rs = statement2.executeQuery("SELECT * FROM cache_test2")) {
assertEquals(expectedRows, getRowSet(rs));
}
expectedRows.clear();
}
}
}
8 changes: 4 additions & 4 deletions src/postgres/src/backend/catalog/indexing.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ CatalogTupleInsert(Relation heapRel, HeapTuple tup)
{
oid = YBCExecuteInsert(heapRel, RelationGetDescr(heapRel), tup);
/* Update the local cache automatically */
SetSysCacheTuple(heapRel, tup);
YBSetSysCacheTuple(heapRel, tup);
}
else
{
Expand Down Expand Up @@ -286,7 +286,7 @@ CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup,
{
oid = YBCExecuteInsert(heapRel, RelationGetDescr(heapRel), tup);
/* Update the local cache automatically */
SetSysCacheTuple(heapRel, tup);
YBSetSysCacheTuple(heapRel, tup);
}
else
{
Expand Down Expand Up @@ -334,7 +334,7 @@ CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)

YBCUpdateSysCatalogTuple(heapRel, oldtup, tup);
/* Update the local cache automatically */
SetSysCacheTuple(heapRel, tup);
YBSetSysCacheTuple(heapRel, tup);

if (heapRel->rd_rel->relhasindex)
CatalogIndexInsert(indstate, tup);
Expand Down Expand Up @@ -379,7 +379,7 @@ CatalogTupleUpdateWithInfo(Relation heapRel, ItemPointer otid, HeapTuple tup,

YBCUpdateSysCatalogTuple(heapRel, oldtup, tup);
/* Update the local cache automatically */
SetSysCacheTuple(heapRel, tup);
YBSetSysCacheTuple(heapRel, tup);

if (heapRel->rd_rel->relhasindex)
CatalogIndexInsert(indstate, tup);
Expand Down
74 changes: 40 additions & 34 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ Datum YBCGetYBTupleIdFromSlot(TupleTableSlot *slot)
}

/*
* Need to update the version whenever we write to a system catalogs table (or
* indexes) except in bootstrap mode (initdb) where there is no point
* to keep track of versions yet.
* Check if operation changes a system table, ignore changes during
* initialization (bootstrap mode).
*/
static bool IsSystemCatalogVersionChange(Relation rel)
static bool IsSystemCatalogChange(Relation rel)
{
return IsSystemRelation(rel) && !IsBootstrapProcessingMode();
return IsSystemRelation(rel) &&
!IsBootstrapProcessingMode();
}

/*
Expand All @@ -170,17 +170,19 @@ static bool IsSystemCatalogVersionChange(Relation rel)
*/
static void YBCExecWriteStmt(YBCPgStatement ybc_stmt, Relation rel)
{
bool is_syscatalog_change = IsSystemCatalogVersionChange(rel);
bool is_syscatalog_change = IsSystemCatalogChange(rel);
bool is_syscatalog_version_change = false;

/* Let the master know if this should increment the catalog version. */
if (is_syscatalog_change)
{
YBCPgSetIsSystemCatalogChange(ybc_stmt);
YBCPgSetIfIsSysCatalogVersionChange(ybc_stmt,
&is_syscatalog_version_change);
}

HandleYBStmtStatus(YBCPgSetCatalogCacheVersion(ybc_stmt,
ybc_catalog_cache_version),
ybc_stmt);
yb_catalog_cache_version),
ybc_stmt);

/* Execute the insert. */
HandleYBStmtStatus(YBCPgDmlExecWriteOp(ybc_stmt), ybc_stmt);
Expand All @@ -195,9 +197,9 @@ static void YBCExecWriteStmt(YBCPgStatement ybc_stmt, Relation rel)
* other backends).
* If changes occurred, then a cache refresh will be needed as usual.
*/
if (is_syscatalog_change)
if (is_syscatalog_version_change)
{
ybc_catalog_cache_version += 1;
yb_catalog_cache_version += 1;
}
}

Expand All @@ -214,7 +216,7 @@ static Oid YBCExecuteInsertInternal(Relation rel,
AttrNumber minattr = FirstLowInvalidHeapAttributeNumber + 1;
int natts = RelationGetNumberOfAttributes(rel);
Bitmapset *pkey = GetTablePrimaryKey(rel);
YBCPgStatement ybc_stmt = NULL;
YBCPgStatement insert_stmt = NULL;
bool is_null = false;

/* Generate a new oid for this row if needed */
Expand All @@ -229,7 +231,7 @@ static Oid YBCExecuteInsertInternal(Relation rel,
dboid,
relid,
is_single_row_txn,
&ybc_stmt));
&insert_stmt));

for (AttrNumber attnum = minattr; attnum <= natts; attnum++)
{
Expand All @@ -245,16 +247,16 @@ static Oid YBCExecuteInsertInternal(Relation rel,
/* Check not-null constraint on primary key early */
if (is_null && bms_is_member(attnum - minattr, pkey))
{
HandleYBStatus(YBCPgDeleteStatement(ybc_stmt));
HandleYBStatus(YBCPgDeleteStatement(insert_stmt));
ereport(ERROR,
(errcode(ERRCODE_NOT_NULL_VIOLATION), errmsg(
"Missing/null value for primary key column")));
}

/* Add the column value to the insert request */
YBCPgExpr ybc_expr = YBCNewConstant(ybc_stmt, type_id, datum, is_null);
HandleYBStmtStatus(YBCPgDmlBindColumn(ybc_stmt, attnum, ybc_expr),
ybc_stmt);
YBCPgExpr ybc_expr = YBCNewConstant(insert_stmt, type_id, datum, is_null);
HandleYBStmtStatus(YBCPgDmlBindColumn(insert_stmt, attnum, ybc_expr),
insert_stmt);
}

/*
Expand All @@ -264,15 +266,15 @@ static Oid YBCExecuteInsertInternal(Relation rel,
if (rel->rd_rel->relhasindex)
{
YBCPgTypeAttrs type_attrs = {0};
YBCPgExpr ybc_expr = YBCNewColumnRef(ybc_stmt,
YBCPgExpr ybc_expr = YBCNewColumnRef(insert_stmt,
YBTupleIdAttributeNumber,
InvalidOid,
&type_attrs);
HandleYBStmtStatus(YBCPgDmlAppendTarget(ybc_stmt, ybc_expr), ybc_stmt);
HandleYBStmtStatus(YBCPgDmlAppendTarget(insert_stmt, ybc_expr), insert_stmt);
}

/* Execute the insert */
YBCExecWriteStmt(ybc_stmt, rel);
YBCExecWriteStmt(insert_stmt, rel);

/*
* If the relation has indexes, save ybctid to insert the new row into the
Expand All @@ -283,22 +285,22 @@ static Oid YBCExecuteInsertInternal(Relation rel,
YBCPgSysColumns syscols;
bool has_data = false;

HandleYBStmtStatus(YBCPgDmlFetch(ybc_stmt,
HandleYBStmtStatus(YBCPgDmlFetch(insert_stmt,
0 /* natts */,
NULL /* values */,
NULL /* isnulls */,
&syscols,
&has_data),
ybc_stmt);
insert_stmt);
if (has_data && syscols.ybctid != NULL)
{
tuple->t_ybctid = PointerGetDatum(syscols.ybctid);
}
}

/* Clean up */
HandleYBStatus(YBCPgDeleteStatement(ybc_stmt));
ybc_stmt = NULL;
HandleYBStatus(YBCPgDeleteStatement(insert_stmt));
insert_stmt = NULL;

return HeapTupleGetOid(tuple);
}
Expand Down Expand Up @@ -329,7 +331,7 @@ void YBCExecuteInsertIndex(Relation index, Datum *values, bool *isnull, Datum yb
Oid relid = RelationGetRelid(index);
TupleDesc tupdesc = RelationGetDescr(index);
int natts = RelationGetNumberOfAttributes(index);
YBCPgStatement ybc_stmt = NULL;
YBCPgStatement insert_stmt = NULL;

Assert(index->rd_rel->relkind == RELKIND_INDEX);

Expand All @@ -338,7 +340,7 @@ void YBCExecuteInsertIndex(Relation index, Datum *values, bool *isnull, Datum yb
dboid,
relid,
false /* is_single_row_txn */,
&ybc_stmt));
&insert_stmt));

for (AttrNumber attnum = 1; attnum <= natts; attnum++)
{
Expand All @@ -347,22 +349,26 @@ void YBCExecuteInsertIndex(Relation index, Datum *values, bool *isnull, Datum yb
bool is_null = isnull[attnum - 1];

/* Add the column value to the insert request */
YBCPgExpr ybc_expr = YBCNewConstant(ybc_stmt, type_id, value, is_null);
HandleYBStmtStatus(YBCPgDmlBindColumn(ybc_stmt, attnum, ybc_expr), ybc_stmt);
YBCPgExpr ybc_expr = YBCNewConstant(insert_stmt,
type_id,
value,
is_null);
HandleYBStmtStatus(YBCPgDmlBindColumn(insert_stmt, attnum, ybc_expr),
insert_stmt);
}

/*
* Bind the ybctid from the base table to the ybbasectid column.
*/
Assert(ybctid != 0);
YBCPgExpr ybc_expr = YBCNewConstant(ybc_stmt, BYTEAOID, ybctid, false /* is_null */);
HandleYBStmtStatus(YBCPgDmlBindColumn(ybc_stmt, YBBaseTupleIdAttributeNumber, ybc_expr),
ybc_stmt);
YBCPgExpr ybc_expr = YBCNewConstant(insert_stmt, BYTEAOID, ybctid, false /* is_null */);
HandleYBStmtStatus(YBCPgDmlBindColumn(insert_stmt, YBBaseTupleIdAttributeNumber, ybc_expr),
insert_stmt);

/* Execute the insert and clean up. */
YBCExecWriteStmt(ybc_stmt, index);
HandleYBStatus(YBCPgDeleteStatement(ybc_stmt));
ybc_stmt = NULL;
YBCExecWriteStmt(insert_stmt, index);
HandleYBStatus(YBCPgDeleteStatement(insert_stmt));
insert_stmt = NULL;
}

void YBCExecuteDelete(Relation rel, TupleTableSlot *slot)
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/backend/executor/ybcScan.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ YbScanState ybcBeginScan(Relation rel, Relation index, List *target_attrs, List
if (!IsSystemRelation(rel))
{
HandleYBStmtStatusWithOwner(YBCPgSetCatalogCacheVersion(ybc_state->handle,
ybc_catalog_cache_version),
yb_catalog_cache_version),
ybc_state->handle,
ybc_state->stmt_owner);
}
Expand Down
4 changes: 2 additions & 2 deletions src/postgres/src/backend/executor/ybc_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ ybcBeginForeignScan(ForeignScanState *node, int eflags)

/* Set the current syscatalog version (will check that we are up to date) */
HandleYBStmtStatusWithOwner(YBCPgSetCatalogCacheVersion(ybc_state->handle,
ybc_catalog_cache_version),
yb_catalog_cache_version),
ybc_state->handle,
ybc_state->stmt_owner);

Expand Down Expand Up @@ -465,7 +465,7 @@ ybc_fdw_handler()
fdwroutine->EndForeignScan = ybcEndForeignScan;

/* TODO: These are optional but we should support them eventually. */
/* fdwroutine->ExplainForeignScan = fileExplainForeignScan; */
/* fdwroutine->ExplainForeignScan = ybcExplainForeignScan; */
/* fdwroutine->AnalyzeForeignTable = ybcAnalyzeForeignTable; */
/* fdwroutine->IsForeignScanParallelSafe = ybcIsForeignScanParallelSafe; */

Expand Down
Loading

0 comments on commit 465b6d6

Please sign in to comment.