Skip to content

Commit

Permalink
yugabyte#1250 [YSQL] Order range columns with respect to primary key …
Browse files Browse the repository at this point in the history
…description [format change]

Summary:
Table columns are added in the following order:
- hash component of compound primary key (first column in compound primary key description)
- range components of compound primary key (same order as described in compound primary key)
- non key columns (same order as described in `CREATE TABLE` statement)

Test Plan:
New unit test was introduced

```
yb_build.sh --cxx-test pg_libpq-test --gtest_filter PgLibPqTest.CompoundKeyColumnOrder
```

Reviewers: neil, mihnea, neha

Reviewed By: neha

Subscribers: kannan, neha, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D7134
  • Loading branch information
d-uspenskiy committed Sep 5, 2019
1 parent 48d9b91 commit b5115b0
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 81 deletions.
173 changes: 92 additions & 81 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,66 +122,101 @@ YBCReserveOids(Oid dboid, Oid next_oid, uint32 count, Oid *begin_oid, Oid *end_o
/* ---------------------------------------------------------------------------------------------- */
/* Table Functions. */

/* Utility function to add columns to the YB create statement */
static void CreateTableAddColumn(YBCPgStatement handle,
Form_pg_attribute att,
bool is_hash,
bool is_primary,
bool is_desc,
bool is_nulls_first)
{
const AttrNumber attnum = att->attnum;
const YBCPgTypeEntity *col_type = YBCDataTypeFromOidMod(attnum, att->atttypid);
HandleYBStmtStatus(YBCPgCreateTableAddColumn(handle,
NameStr(att->attname),
attnum,
col_type,
is_hash,
is_primary,
is_desc,
is_nulls_first), handle);
}

/* Utility function to add columns to the YB create statement
* Columns need to be sent in order first hash columns, then rest of primary key columns,
* then regular columns.
*/
static void CreateTableAddColumns(YBCPgStatement handle,
TupleDesc desc,
Constraint *primary_key,
bool include_hash,
bool include_primary)
Constraint *primary_key)
{
for (int i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
char *attname = NameStr(att->attname);
AttrNumber attnum = att->attnum;
bool is_hash = false;
bool is_primary = false;
bool is_desc = false;
bool is_nulls_first = false;


if (primary_key != NULL)
{
ListCell *cell;

int key_col_idx = 0;
foreach(cell, primary_key->yb_index_params)
{
IndexElem *index_elem = (IndexElem *)lfirst(cell);

if (strcmp(attname, index_elem->name) == 0)
{
SortByDir order = index_elem->ordering;
/* In YB mode first column defaults to HASH if not set */
is_hash = (order == SORTBY_HASH) ||
(key_col_idx == 0 && order == SORTBY_DEFAULT);
is_primary = true;
ColumnSortingOptions(order, index_elem->nulls_ordering, &is_desc, &is_nulls_first);
break;
}
key_col_idx++;
}
}
/* Add all key columns first with respect to compound key order */
ListCell *cell;
if (primary_key != NULL)
{
foreach(cell, primary_key->yb_index_params)
{
IndexElem *index_elem = (IndexElem *)lfirst(cell);
bool column_found = false;
for (int i = 0; i < desc->natts; ++i)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
if (strcmp(NameStr(att->attname), index_elem->name) == 0)
{
if (!YBCDataTypeIsValidForKey(att->atttypid))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PRIMARY KEY containing column of type '%s' not yet supported",
YBPgTypeOidToStr(att->atttypid))));
}
SortByDir order = index_elem->ordering;
/* In YB mode first column defaults to HASH if not set */
const bool is_first_key = (cell == list_head(primary_key->yb_index_params));
bool is_hash = (order == SORTBY_HASH) || (is_first_key && order == SORTBY_DEFAULT);
bool is_desc = false;
bool is_nulls_first = false;
ColumnSortingOptions(order, index_elem->nulls_ordering, &is_desc, &is_nulls_first);
CreateTableAddColumn(handle, att, is_hash, true /* is_primary */, is_desc, is_nulls_first);
column_found = true;
break;
}
}
if (!column_found)
{
ereport(FATAL,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Column '%s' not found in table", index_elem->name)));
}
}
}

if (include_hash == is_hash && include_primary == is_primary)
{
if (is_primary && !YBCDataTypeIsValidForKey(att->atttypid)) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PRIMARY KEY containing column of type '%s' not yet supported",
YBPgTypeOidToStr(att->atttypid))));
}
const YBCPgTypeEntity *col_type = YBCDataTypeFromOidMod(attnum, att->atttypid);
HandleYBStmtStatus(YBCPgCreateTableAddColumn(handle,
attname,
attnum,
col_type,
is_hash,
is_primary,
is_desc,
is_nulls_first), handle);
}
}
/* Add all non-key columns */
for (int i = 0; i < desc->natts; ++i)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
bool is_key = false;
if (primary_key)
{
foreach(cell, primary_key->yb_index_params)
{
IndexElem *index_elem = (IndexElem *) lfirst(cell);
if (strcmp(NameStr(att->attname), index_elem->name) == 0)
{
is_key = true;
break;
}
}
}
if (!is_key)
{
CreateTableAddColumn(handle,
att,
false /* is_hash */,
false /* is_primary */,
false /* is_desc */,
false /* is_nulls_first */);
}
}
}

/* Utility function to handle split points */
Expand Down Expand Up @@ -461,31 +496,7 @@ YBCCreateTable(CreateStmt *stmt, char relkind, TupleDesc desc, Oid relationId, O
primary_key == NULL /* add_primary_key */,
&handle));

/*
* Process the table columns. They need to be sent in order, first hash
* columns, then rest of primary key columns, then regular columns. If
* no primary key is specified, an internal primary key is added above.
*/
if (primary_key != NULL)
{
CreateTableAddColumns(handle,
desc,
primary_key,
true /* is_hash */,
true /* is_primary */);

CreateTableAddColumns(handle,
desc,
primary_key,
false /* is_hash */,
true /* is_primary */);
}

CreateTableAddColumns(handle,
desc,
primary_key,
false /* is_hash */,
false /* is_primary */);
CreateTableAddColumns(handle, desc, primary_key);

/* Handle SPLIT statement, if present */
OptSplit *split_options = stmt->split_options;
Expand Down
29 changes: 29 additions & 0 deletions src/yb/yql/pgwrapper/pg_libpq-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1085,5 +1085,34 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(NoTxnOnConflict)) {
LogResult(ASSERT_RESULT(Fetch(conn.get(), "SELECT * FROM test ORDER BY k")).get());
}

TEST_F(PgLibPqTest, CompoundKeyColumnOrder) {
const string table_name = "test";
auto conn = ASSERT_RESULT(Connect());
ASSERT_OK(Execute(conn.get(), Format(
"CREATE TABLE $0 (r2 int, r1 int, h int, v2 int, v1 int, primary key (h, r1, r2))",
table_name)));
auto client = ASSERT_RESULT(cluster_->CreateClient());
yb::client::YBSchema schema;
PartitionSchema partition_schema;
bool table_found = false;
// TODO(dmitry): Find table by name instead of checking all the tables when catalog_mangager
// will be able to find YSQL tables
std::vector<yb::client::YBTableName> tables;
ASSERT_OK(client->ListTables(&tables));
for (const auto& t : tables) {
if (t.namespace_name() == "postgres" && t.table_name() == table_name) {
table_found = true;
ASSERT_OK(client->GetTableSchema(t, &schema, &partition_schema));
const auto& columns = schema.columns();
std::array<string, 5> expected_column_names{"h", "r1", "r2", "v2", "v1"};
ASSERT_EQ(expected_column_names.size(), columns.size());
for (size_t i = 0; i < expected_column_names.size(); ++i) {
ASSERT_EQ(columns[i].name(), expected_column_names[i]);
}
}
}
ASSERT_TRUE(table_found);
}

} // namespace pgwrapper
} // namespace yb

0 comments on commit b5115b0

Please sign in to comment.