Skip to content

Commit

Permalink
[#23493] xCluster: code for ensuring there's an update for every sequ…
Browse files Browse the repository at this point in the history
…ence in WAL

Summary:
This adds code for ensuring that
```
// Ensure that there is a full update for every sequence in sequences in the WALs after the time the
// sequence info was read from the sequences_data table.
//
// That is, if a sequence S in sequences had value v at the time we scanned sequences_data to obtain
// sequences, then after this function successfully completes, there will either be an update S := v
// or some other update to S in the WAL with commit timestamp after the time the scan was done.
```

This is the second part of
#23493.
Jira: DB-12407

Test Plan: Added two new tests that verify this function works.

Reviewers: xCluster, hsunder

Reviewed By: hsunder

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D37347
  • Loading branch information
mdbridge committed Aug 20, 2024
1 parent e5127f8 commit b3389ff
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 14 deletions.
67 changes: 59 additions & 8 deletions src/yb/integration-tests/sequence_utility-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "yb/master/master_ddl.pb.h"
#include "yb/master/ysql_sequence_util.h"

#include "yb/tablet/tablet.h"

#include "yb/yql/pgwrapper/pg_mini_test_base.h"

namespace yb::master {
Expand Down Expand Up @@ -62,6 +64,8 @@ class SequencesUtilTest : public pgwrapper::PgMiniTestBase {
RETURN_NOT_OK(conn.Execute("CREATE SEQUENCE dropped_sequence START WITH 40"));
RETURN_NOT_OK(conn.Execute("DROP SEQUENCE dropped_sequence"));

RETURN_NOT_OK(conn.Execute("CREATE SEQUENCE extra_sequence START WITH 50"));

const NamespaceName unscanned_namespace_name = "unscanned_database";
RETURN_NOT_OK(CreateYsqlNamespace(unscanned_namespace_name));
auto conn_2 = VERIFY_RESULT(ConnectToDB(unscanned_namespace_name));
Expand Down Expand Up @@ -105,7 +109,7 @@ class SequencesUtilTest : public pgwrapper::PgMiniTestBase {

TEST_F(SequencesUtilTest, ScanWhenSequencesDataTableNonexistentGivesNotFound) {
// Expect failure because sequences_data table has not been created yet.
auto result = master::ScanSequencesDataTable(client_.get(), namespace_oid_);
auto result = master::ScanSequencesDataTable(*client_.get(), namespace_oid_);
ASSERT_NOK(result);
ASSERT_TRUE(result.status().IsNotFound());
ASSERT_NOK_STR_CONTAINS(
Expand All @@ -118,7 +122,7 @@ TEST_F(SequencesUtilTest, ScanReturnsNothingForNoSequences) {
ASSERT_OK(conn.Execute("CREATE SEQUENCE foo"));
ASSERT_OK(conn.Execute("DROP SEQUENCE foo"));
{
auto results = master::ScanSequencesDataTable(client_.get(), namespace_oid_);
auto results = master::ScanSequencesDataTable(*client_.get(), namespace_oid_);
ASSERT_OK(results);
ASSERT_EQ(0, results->size());
}
Expand All @@ -128,15 +132,15 @@ TEST_F(SequencesUtilTest, ScanReturnsNothingForNoSequences) {
auto empty_namespace_id = ASSERT_RESULT(CreateYsqlNamespace("empty_database"));
auto empty_namespace_oid = ASSERT_RESULT(GetPgsqlDatabaseOid(empty_namespace_id));
{
auto results = master::ScanSequencesDataTable(client_.get(), empty_namespace_oid);
auto results = master::ScanSequencesDataTable(*client_.get(), empty_namespace_oid);
ASSERT_OK(results);
ASSERT_EQ(0, results->size());
}

// Same but instead scan a nonexistent database OID.
{
auto results =
master::ScanSequencesDataTable(client_.get(), /*nonexistent database OID*/ 666666);
master::ScanSequencesDataTable(*client_.get(), /*nonexistent database OID*/ 666666);
ASSERT_OK(results);
ASSERT_EQ(0, results->size());
}
Expand All @@ -145,7 +149,7 @@ TEST_F(SequencesUtilTest, ScanReturnsNothingForNoSequences) {
TEST_F(SequencesUtilTest, ScanSampleSequences) {
ASSERT_OK(CreateSampleSequences());
auto expected = ASSERT_RESULT(ComputeExpectedScanResult());
auto actual = ASSERT_RESULT(master::ScanSequencesDataTable(client_.get(), namespace_oid_));
auto actual = ASSERT_RESULT(master::ScanSequencesDataTable(*client_.get(), namespace_oid_));
VerifyScan(expected, actual);
}

Expand All @@ -154,21 +158,68 @@ TEST_F(SequencesUtilTest, ScanWithPaging) {
auto expected = ASSERT_RESULT(ComputeExpectedScanResult());
{
auto actual = ASSERT_RESULT(
master::ScanSequencesDataTable(client_.get(), namespace_oid_, /*max_rows_per_read=*/1));
master::ScanSequencesDataTable(*client_.get(), namespace_oid_, /*max_rows_per_read=*/1));
VerifyScan(expected, actual);
}
{
auto actual = ASSERT_RESULT(
master::ScanSequencesDataTable(client_.get(), namespace_oid_, /*max_rows_per_read=*/2));
master::ScanSequencesDataTable(*client_.get(), namespace_oid_, /*max_rows_per_read=*/2));
VerifyScan(expected, actual);
}
}

TEST_F(SequencesUtilTest, ScanWithReadFailure) {
ASSERT_OK(CreateSampleSequences());
auto result = master::ScanSequencesDataTable(
client_.get(), namespace_oid_, /*max_rows_per_read=*/1000, /*TEST_fail_read=*/true);
*client_.get(), namespace_oid_, /*max_rows_per_read=*/1000, /*TEST_fail_read=*/true);
ASSERT_NOK(result);
LOG(INFO) << "return status is " << result.status();
}

TEST_F(SequencesUtilTest, EnsureSequenceUpdatesInWalWhenNoChanges) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_docdb_log_write_batches) = true;
ASSERT_OK(CreateSampleSequences());
auto expected = ASSERT_RESULT(ComputeExpectedScanResult());
auto sequences = ASSERT_RESULT(master::ScanSequencesDataTable(*client_.get(), namespace_oid_));
auto updates =
ASSERT_RESULT(EnsureSequenceUpdatesInWal(*client_.get(), namespace_oid_, sequences));

// For this test we assume there were no changes since the scan used to call
// EnsureSequenceUpdatesInWal was done. Thus EnsureSequenceUpdatesInWal should have made one
// update for each sequence.
ASSERT_EQ(updates, expected.size());

// Ensure the updates it made are nops.
{
auto actual = ASSERT_RESULT(master::ScanSequencesDataTable(*client_.get(), namespace_oid_));
VerifyScan(expected, actual);
}
}

TEST_F(SequencesUtilTest, EnsureSequenceUpdatesInWalWithConcurrentChanges) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_docdb_log_write_batches) = true;
ASSERT_OK(CreateSampleSequences());
auto sequences = ASSERT_RESULT(master::ScanSequencesDataTable(*client_.get(), namespace_oid_));

// Here we are testing changes that occur between the scan used to get the sequence information
// and the call to EnsureSequenceUpdatesInWal. This should make EnsureSequenceUpdatesInWal do
// fewer updates because it should not update changes since the scan was done.

auto conn = ASSERT_RESULT(ConnectToDB(kNamespaceName));
ASSERT_OK(conn.Execute("ALTER SEQUENCE altered_sequence RESTART WITH 22"));
// Change just is_called.
ASSERT_OK(conn.Fetch("SELECT pg_catalog.setval('set_sequence', 31, false)"));
ASSERT_OK(conn.Execute("DROP SEQUENCE extra_sequence"));

auto updates =
ASSERT_RESULT(EnsureSequenceUpdatesInWal(*client_.get(), namespace_oid_, sequences));
ASSERT_EQ(updates, sequences.size() - /*number of sequences changed above*/ 3);

// Ensure the updates it did make are nops.
{
auto expected = ASSERT_RESULT(ComputeExpectedScanResult());
auto actual = ASSERT_RESULT(master::ScanSequencesDataTable(*client_.get(), namespace_oid_));
VerifyScan(expected, actual);
}
}
} // namespace yb::master
116 changes: 111 additions & 5 deletions src/yb/master/ysql_sequence_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@

#include "yb/yql/pggate/util/pg_doc_data.h"

DECLARE_int32(master_yb_client_default_timeout_ms);

namespace yb::master {

namespace {

constexpr const size_t kPgSequenceLastValueColIdx = 2;
constexpr const size_t kPgSequenceIsCalledColIdx = 3;

Result<client::YBTablePtr> OpenSequencesDataTable(client::YBClient& client) {
PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
return client.OpenTable(table_oid.GetYbTableId());
Expand All @@ -51,13 +56,14 @@ Result<T> ReadNonNullNumber(Slice* cursor_into_sidebar) {
} // anonymous namespace

Result<std::vector<YsqlSequenceInfo>> ScanSequencesDataTable(
client::YBClient* client, uint32_t db_oid, uint64_t max_rows_per_read, bool TEST_fail_read) {
auto table = VERIFY_RESULT(OpenSequencesDataTable(*client));
client::YBClient& client, uint32_t db_oid, uint64_t max_rows_per_read, bool TEST_fail_read) {
auto table = VERIFY_RESULT(OpenSequencesDataTable(client));
RSTATUS_DCHECK_EQ(
table->schema().num_columns(), 4, IllegalState,
"sequences_data DocDB table has wrong number of columns");

auto session = client->NewSession(/*timeout=*/MonoDelta::FromSeconds(60));
auto session =
client.NewSession(MonoDelta::FromMilliseconds(FLAGS_master_yb_client_default_timeout_ms));
session->set_allow_local_calls_in_curr_thread(false);

PgsqlPagingStatePB* paging_state = nullptr;
Expand Down Expand Up @@ -110,8 +116,9 @@ Result<std::vector<YsqlSequenceInfo>> ScanSequencesDataTable(
return StatusFromPB(*response.error_status().rbegin());
} else {
// This should not happen because other ways of returning errors are deprecated.
return STATUS(
InternalError, "Unknown error while trying to read from sequences_data DocDB table.");
return STATUS_FORMAT(
InternalError, "Unknown error while trying to read from sequences_data DocDB table: $0",
PgsqlResponsePB::RequestStatus_Name(response.status()));
}
}

Expand All @@ -138,4 +145,103 @@ Result<std::vector<YsqlSequenceInfo>> ScanSequencesDataTable(

return results;
}

Result<int> EnsureSequenceUpdatesInWal(
client::YBClient& client, uint32_t db_oid, const std::vector<YsqlSequenceInfo>& sequences) {
auto table = VERIFY_RESULT(OpenSequencesDataTable(client));

auto session =
client.NewSession(MonoDelta::FromMilliseconds(FLAGS_master_yb_client_default_timeout_ms));
session->set_allow_local_calls_in_curr_thread(false);

std::vector<rpc::Sidecars> sidecars_storage{sequences.size()};
std::vector<client::YBPgsqlWriteOpPtr> operations;
for (size_t i = 0; i < sequences.size(); i++) {
const auto& sequence = sequences[i];

// The following code creates a new YBPgsqlWriteOp operation equivalent to:
//
// UPDATE sequences_data SET last_value={sequence.last_value}, is_called={sequence.is_called}
// WHERE db_oid={db_oid} AND seq_oid={sequence.sequence_oid}
// AND last_value={sequence.last_value} AND is_called={sequence.is_called}

auto psql_write = client::YBPgsqlWriteOp::NewUpdate(table, &sidecars_storage[i]);
auto write_request = psql_write->mutable_request();
// We do not set the PG catalog version numbers in the request because we don't care what the PG
// catalog state is.

// Set primary key to db_oid, sequence.sequence_oid.
write_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid);
write_request->add_partition_column_values()->mutable_value()->set_int64_value(
sequence.sequence_oid);

// The SET part.
PgsqlColumnValuePB* column_value = write_request->add_column_new_values();
column_value->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx));
column_value->mutable_expr()->mutable_value()->set_int64_value(sequence.last_value);
column_value = write_request->add_column_new_values();
column_value->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx));
column_value->mutable_expr()->mutable_value()->set_bool_value(sequence.is_called);

// The WHERE part without the primary key.
auto where_pb = write_request->mutable_where_expr()->mutable_condition();
where_pb->set_op(QL_OP_AND);
auto cond = where_pb->add_operands()->mutable_condition();
cond->set_op(QL_OP_EQUAL);
cond->add_operands()->set_column_id(table->schema().ColumnId(kPgSequenceLastValueColIdx));
cond->add_operands()->mutable_value()->set_int64_value(sequence.last_value);
cond = where_pb->add_operands()->mutable_condition();
cond->set_op(QL_OP_EQUAL);
cond->add_operands()->set_column_id(table->schema().ColumnId(kPgSequenceIsCalledColIdx));
cond->add_operands()->mutable_value()->set_bool_value(sequence.is_called);

// For compatibility set deprecated column_refs
write_request->mutable_column_refs()->add_ids(
table->schema().ColumnId(kPgSequenceLastValueColIdx));
write_request->mutable_column_refs()->add_ids(
table->schema().ColumnId(kPgSequenceIsCalledColIdx));
// Same values, to be consumed by current TServers
write_request->add_col_refs()->set_column_id(
table->schema().ColumnId(kPgSequenceLastValueColIdx));
write_request->add_col_refs()->set_column_id(
table->schema().ColumnId(kPgSequenceIsCalledColIdx));

// Add the operation to the operations the session should do once a flush is submitted.
VLOG(3) << "write request: " << write_request->DebugString();
operations.push_back(psql_write);
session->Apply(std::move(psql_write));
}

// Synchronously execute the operations.
// TODO(async_flush): https://github.com/yugabyte/yugabyte-db/issues/12173
RETURN_NOT_OK(session->TEST_Flush());

int updates = 0;
for (const auto& operation : operations) {
const auto& response = operation.get()->response();
VLOG(3) << "write response: " << response.DebugString();

if (!operation->response().skipped()) {
updates++;
}

if (response.status() != PgsqlResponsePB::PGSQL_STATUS_OK) {
if (response.error_status().size() > 0) {
// TODO(14814, 18387): We do not currently expect more than one status, when we do, we need
// to decide how to handle them. Possible options: aggregate multiple statuses into one,
// discard all but one, etc. Historically, for the one set of status fields (like
// error_message), new error message was overwriting the previous one, that's why let's
// return the last entry from error_status to mimic that past behavior, refer
// AsyncRpc::Finished for details.
return StatusFromPB(*response.error_status().rbegin());
} else {
// This should not happen because other ways of returning errors are deprecated.
return STATUS_FORMAT(
InternalError, "Unknown error while trying to update sequences_data DocDB table: $0",
PgsqlResponsePB::RequestStatus_Name(response.status()));
}
}
}
return updates;
}
} // namespace yb::master
13 changes: 12 additions & 1 deletion src/yb/master/ysql_sequence_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,18 @@ struct YsqlSequenceInfo {
// to cause the reads done by ScanSequencesDataTable to fail in order to test the error handling
// pathways.
Result<std::vector<YsqlSequenceInfo>> ScanSequencesDataTable(
client::YBClient* client, uint32_t db_oid, uint64_t max_rows_per_read = 10000,
client::YBClient& client, uint32_t db_oid, uint64_t max_rows_per_read = 10000,
bool TEST_fail_read = false);

// Ensure that there is a full update for every sequence in sequences in the WALs after the time the
// sequence info was read from the sequences_data table.
//
// That is, if a sequence S in sequences had value v at the time we scanned sequences_data to obtain
// sequences, then after this function successfully completes, there will either be an update S := v
// or some other update to S in the WAL with commit timestamp after the time the scan was done.
//
// When successful returns the number of updates it makes.
Result<int> EnsureSequenceUpdatesInWal(
client::YBClient& client, uint32_t db_oid, const std::vector<YsqlSequenceInfo>& sequences);

} // namespace yb::master

0 comments on commit b3389ff

Please sign in to comment.