Skip to content

Commit

Permalink
[BACKPORT 2.8][#11805] YSQL: Avoid setting read time on client if pos…
Browse files Browse the repository at this point in the history
…sible

Summary:
To achieve consistency of reads from multiple tablets and/or across multiple operations in the context of a single transaction, YSQL selects read time on the internal client side (in the Postgres process). This approach has a drawback in case client's hybrid clock shows a time in the future compared to tserver's time, caused by clock skew. On receiving the read request with such read time, the tserver will wait until the tablet's safe time has reached this future time, resulting in increased read latency. To prevent the tserver from waiting while processing the read request, the read time in the request should be omitted. In this case tserver will use the current safe time of the tablet as read time, and will return that time to the client (YSQL). The same read time should then be used by all other operations initiated by YSQL as part of the same transaction.

**Note:**
1. When the first read operation perform reads from different tablets, we detect this case and pick the read time on the client side. (If we allowed each tablet server to pick its own read time, the reads from different tablet servers would be inconsistent with each other.)

2. Client should not initiate parallel read operations at the beginning of a transaction. Even if each operation reads just one tablet, all these operations should use the same read time for consistency. After the read time has been picked, e.g. for second operation of the transaction and beyond, parallel reads are fine. One case in which we send parallel operations in YSQL is foreign key checking.

3. In case of unexpected behavior of new functionality it could be disabled using the newly created gflag `force_preset_read_time_on_client`. Its default value is `false`, resulting in new behavior, and it should be set to `true` to revert to old behavior.

4. The fix in mainline is slightly different because of changes introduced by D13244 / c5f5125 and will be handled by this commit https://phabricator.dev.yugabyte.com/D16201.

Original commit: D16345 / ba1504e

Test Plan:
Jenkins: rebase: 2.8

New unit test is introduced

```
./yb_build.sh --gtest_filter PgLibPqTest.NoReadRestartOnSingleTablet
```

Reviewers: mbautin, sergei, amitanand

Reviewed By: sergei, amitanand

Subscribers: yql, mbautin

Differential Revision: https://phabricator.dev.yugabyte.com/D16548
  • Loading branch information
d-uspenskiy committed Apr 14, 2022
1 parent 63e91a3 commit 8948633
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 22 deletions.
4 changes: 3 additions & 1 deletion src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ void ReadRpc::SwapRequestsAndResponses(bool skip_responses) {
redis_idx = 0;
ql_idx = 0;
pgsql_idx = 0;
bool used_read_time_set = false;
for (auto& op : ops_) {
YBOperation* yb_op = op->yb_op.get();
switch (yb_op->type()) {
Expand Down Expand Up @@ -805,8 +806,9 @@ void ReadRpc::SwapRequestsAndResponses(bool skip_responses) {
}
// Restore PGSQL read request PB and extract response.
auto* pgsql_op = down_cast<YBPgsqlReadOp*>(yb_op);
if (resp_.has_used_read_time()) {
if (!used_read_time_set && resp_.has_used_read_time()) {
pgsql_op->SetUsedReadTime(ReadHybridTime::FromPB(resp_.used_read_time()));
used_read_time_set = true;
}
pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_batch(pgsql_idx));
const auto& pgsql_response = pgsql_op->response();
Expand Down
10 changes: 10 additions & 0 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,16 @@ void Batcher::ExecuteOperations(Initial initial) {
std::bind(&Batcher::TransactionReady, this, _1, BatcherPtr(this)))) {
return;
}
} else if (force_consistent_read_ &&
ops_info_.groups.size() > 1 &&
read_point_ &&
!read_point_->GetReadTime()) {
// Read time is not set but consistent read from multiple tablets without
// transaction is required. Use current time as a read time.
// Note: read_point_ is null in case of initdb. Nothing to do in this case.
read_point_->SetCurrentReadTime();
VLOG_WITH_PREFIX_AND_FUNC(3) << "Set current read time as a read time: "
<< read_point_->GetReadTime();
}

{
Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ void YBSession::SetTransaction(YBTransactionPtr transaction) {
}
}

bool YBSession::HasTransaction() const {
return static_cast<bool>(batcher_config_.transaction);
}

void YBSession::SetRejectionScoreSource(RejectionScoreSourcePtr rejection_score_source) {
if (batcher_) {
batcher_->SetRejectionScoreSource(rejection_score_source);
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class YBSession : public std::enable_shared_from_this<YBSession> {

// Changes transaction used by this session.
void SetTransaction(YBTransactionPtr transaction);
bool HasTransaction() const;

// Set the timeout for writes made in this session.
void SetTimeout(MonoDelta delta);
Expand Down
9 changes: 5 additions & 4 deletions src/yb/yql/pggate/pg_doc_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ Status PgDocOp::GetResult(list<PgDocResult> *rowsets) {
}

DCHECK(response_.InProgress());
auto rows = VERIFY_RESULT(ProcessResponse(response_.GetStatus(pg_session_.get())));
auto rows = VERIFY_RESULT(ProcessResponse());
// In case ProcessResponse doesn't fail with an error
// it should return non empty rows and/or set end_of_data_.
DCHECK(!rows.empty() || end_of_data_);
Expand Down Expand Up @@ -287,10 +287,10 @@ Status PgDocOp::SendRequestImpl(bool force_non_bufferable) {
return Status::OK();
}

Result<std::list<PgDocResult>> PgDocOp::ProcessResponse(const Status& status) {
Result<std::list<PgDocResult>> PgDocOp::ProcessResponse() {
// Check operation status.
DCHECK(exec_status_.ok());
exec_status_ = status;
exec_status_ = response_.GetStatus(pg_session_.get());
if (exec_status_.ok()) {
auto result = ProcessResponseImpl();
if (result.ok()) {
Expand All @@ -311,7 +311,8 @@ Result<std::list<PgDocResult>> PgDocOp::ProcessResponseResult() {
rows_affected_count_ = 0;
// Check for errors reported by tablet server.
for (int op_index = 0; op_index < active_op_count_; op_index++) {
RETURN_NOT_OK(pg_session_->HandleResponse(*pgsql_ops_[op_index], PgObjectId()));
RETURN_NOT_OK(response_.HandleResponse(
pg_session_.get(), pgsql_ops_[op_index].get(), PgObjectId()));

YBPgsqlOp *pgsql_op = pgsql_ops_[op_index].get();
// Get total number of rows that are operated on.
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/pg_doc_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class PgDocOp : public std::enable_shared_from_this<PgDocOp> {

virtual CHECKED_STATUS SendRequestImpl(bool force_non_bufferable);

Result<std::list<PgDocResult>> ProcessResponse(const Status& exec_status);
Result<std::list<PgDocResult>> ProcessResponse();

virtual Result<std::list<PgDocResult>> ProcessResponseImpl() = 0;

Expand Down
45 changes: 34 additions & 11 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ TAG_FLAG(ysql_wait_until_index_permissions_timeout_ms, advanced);
DECLARE_int32(TEST_user_ddl_operation_timeout_sec);

DEFINE_bool(ysql_log_failed_docdb_requests, false, "Log failed docdb requests.");
DECLARE_bool(force_preset_read_time_on_client);

namespace yb {
namespace pggate {
Expand Down Expand Up @@ -319,7 +320,7 @@ Status PgSessionAsyncRunResult::GetStatus(PgSession* pg_session) {
future_status_ = std::future<client::FlushStatus>();
RETURN_NOT_OK(CombineErrorsToStatus(flush_status.errors, flush_status.status));
for (const auto& bop : buffered_operations_) {
RETURN_NOT_OK(pg_session->HandleResponse(*bop.operation, bop.relation_id));
RETURN_NOT_OK(HandleResponse(pg_session, bop.operation.get(), bop.relation_id));
}
return Status::OK();
}
Expand All @@ -328,6 +329,11 @@ bool PgSessionAsyncRunResult::InProgress() const {
return future_status_.valid();
}

Status PgSessionAsyncRunResult::HandleResponse(
PgSession* pg_session, client::YBPgsqlOp* op, const PgObjectId& relation_id) const {
return pg_session->HandleResponse(op, relation_id, session_.get());
}

//--------------------------------------------------------------------------------------------------
// Class PgSession::RunHelper
//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -950,7 +956,7 @@ Status PgSession::FlushOperations(PgsqlOpBuffer ops, IsTransactionalSession tran
const auto flush_status = session->FlushFuture().get();
RETURN_NOT_OK(CombineErrorsToStatus(flush_status.errors, flush_status.status));
for (const auto& buffered_op : ops) {
RETURN_NOT_OK(HandleResponse(*buffered_op.operation, buffered_op.relation_id));
RETURN_NOT_OK(HandleResponse(buffered_op.operation.get(), buffered_op.relation_id, session));
}
return Status::OK();
}
Expand Down Expand Up @@ -1037,15 +1043,23 @@ void PgSession::DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid) {
Erase(&fk_reference_cache_, table_id, ybctid);
}

Status PgSession::HandleResponse(const client::YBPgsqlOp& op, const PgObjectId& relation_id) {
if (op.succeeded()) {
if (op.type() == YBOperation::PGSQL_READ && op.IsYsqlCatalogOp()) {
const auto& pgsql_op = down_cast<const client::YBPgsqlReadOp&>(op);
if (pgsql_op.used_read_time()) {
Status PgSession::HandleResponse(
client::YBPgsqlOp* op, const PgObjectId& relation_id, YBSession* session) {
if (op->succeeded()) {
ReadHybridTime used_read_time;
if (op->type() == YBOperation::PGSQL_READ) {
auto& read_op = down_cast<client::YBPgsqlReadOp&>(*op);
used_read_time = read_op.used_read_time();
// It is necessary to reset used read time as op might be re-sent
// in future (for fetching next portion of data)
read_op.SetUsedReadTime(ReadHybridTime());
}
if (!session->HasTransaction() && session->read_point() && used_read_time) {
if (op->IsYsqlCatalogOp() && session == catalog_session_.get()) {
// Non empty used_read_time field in catalog read operation means this is the very first
// catalog read operation after catalog read time resetting. read_time for the operation
// has been chosen by master. All further reads from catalog must use same read point.
auto catalog_read_point = pgsql_op.used_read_time();
auto catalog_read_point = used_read_time;

// We set global limit to local limit to avoid read restart errors because they are
// disruptive to system catalog reads and it is not always possible to handle them there.
Expand All @@ -1055,11 +1069,20 @@ Status PgSession::HandleResponse(const client::YBPgsqlOp& op, const PgObjectId&
// TODO(dmitry) This situation will be handled in context of #7964.
catalog_read_point.global_limit = catalog_read_point.local_limit;
SetCatalogReadPoint(catalog_read_point);
} else if (PREDICT_TRUE(!FLAGS_force_preset_read_time_on_client)) {
const auto current_read_time = session->read_point()->GetReadTime();
SCHECK(!current_read_time,
IllegalState,
Format("Session already has a read time $0 used read time is $1",
current_read_time,
used_read_time));
session->SetReadPoint(used_read_time);
VLOG(3) << "Update read time from used read time: " << session->read_point()->GetReadTime();
}
}
return Status::OK();
}
const auto& response = op.response();
const auto& response = op->response();
YBPgErrorCode pg_error_code = YBPgErrorCode::YB_PG_INTERNAL_ERROR;
if (response.has_pg_error_code()) {
pg_error_code = static_cast<YBPgErrorCode>(response.pg_error_code());
Expand All @@ -1084,9 +1107,9 @@ Status PgSession::HandleResponse(const client::YBPgsqlOp& op, const PgObjectId&
PgsqlError(YBPgErrorCode::YB_PG_UNIQUE_VIOLATION));
} else {
if (PREDICT_FALSE(yb_debug_log_docdb_requests || FLAGS_ysql_log_failed_docdb_requests)) {
LOG(INFO) << "Operation failed: " << op.ToString();
LOG(INFO) << "Operation failed: " << op->ToString();
}
s = STATUS(QLError, op.response().error_message(), Slice(),
s = STATUS(QLError, op->response().error_message(), Slice(),
PgsqlError(pg_error_code));
}
return s.CloneAndAddErrorCode(TransactionError(txn_error_code));
Expand Down
8 changes: 6 additions & 2 deletions src/yb/yql/pggate/pg_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class PgSessionAsyncRunResult {
client::YBSessionPtr session);
CHECKED_STATUS GetStatus(PgSession* session);
bool InProgress() const;
CHECKED_STATUS HandleResponse(
PgSession* session, client::YBPgsqlOp* op, const PgObjectId& relation_id) const;

private:
// buffered_operations_ holds buffered operations (if any) which were applied to
Expand Down Expand Up @@ -316,8 +318,6 @@ class PgSession : public RefCountedThreadSafe<PgSession> {
// Deletes the row referenced by ybctid from FK reference cache.
void DeleteForeignKeyReference(PgOid table_id, const Slice& ybctid);

CHECKED_STATUS HandleResponse(const client::YBPgsqlOp& op, const PgObjectId& relation_id);

Result<int> TabletServerCount(bool primary_only = false);

// Sets the specified timeout in the rpc service.
Expand All @@ -334,6 +334,10 @@ class PgSession : public RefCountedThreadSafe<PgSession> {
bool ShouldUseFollowerReads() const;

private:
friend PgSessionAsyncRunResult;
CHECKED_STATUS HandleResponse(
client::YBPgsqlOp* op, const PgObjectId& relation_id, client::YBSession* session);

using Flusher = std::function<Status(PgsqlOpBuffer, IsTransactionalSession)>;

CHECKED_STATUS FlushBufferedOperationsImpl(const Flusher& flusher);
Expand Down
13 changes: 12 additions & 1 deletion src/yb/yql/pggate/pg_txn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
DEFINE_bool(use_node_hostname_for_local_tserver, false,
"Connect to local t-server by using host name instead of local IP");

DEFINE_bool(force_preset_read_time_on_client,
false,
"Preset postgres's process current time as read time for each newly created "
"transaction. This flag should be enabled only in case inconsistency problem is "
"observed and this problem is caused by selecting of read time on tserver's side. "
"Enabling this flag will make latencies of single-shard reads higher.");

// A macro for logging the function name and the state of the current transaction.
// This macro is not enclosed in do { ... } while (true) because we want to be able to write
// additional information into the same log message.
Expand Down Expand Up @@ -232,7 +239,11 @@ Status PgTxnManager::SetDeferrable(bool deferrable) {
void PgTxnManager::StartNewSession() {
session_ = BuildSession(async_client_init_->client(), clock_);
session_->SetReadPoint(client::Restart::kFalse);
if (PREDICT_FALSE(FLAGS_force_preset_read_time_on_client)) {
session_->SetReadPoint(client::Restart::kFalse);
} else {
session_->SetReadPoint(ReadHybridTime());
}
enable_follower_reads_ = false;
read_only_ = false;
updated_read_time_for_follower_reads_ = false;
Expand Down
43 changes: 43 additions & 0 deletions src/yb/yql/pgwrapper/pg_libpq-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2390,5 +2390,48 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(PagingReadRestart)) {
ASSERT_FALSE(runner.HasError());
}

// This test checks the absence of read restarts in the context of snapshot isolation operations
// that only read a single tablet. Read restarts are not expected because of 2 facts:
// - initial read uses read time from tserver
// - concurrent modifications are done by single row update (i.e. no status tablet is used)
// As a result all operations use time from the same tserver and tserver is able to determine
// operation order without ambiguity caused by clock's skew.
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(NoReadRestartOnSingleTablet)) {
auto conn = ASSERT_RESULT(Connect());
ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS"));
constexpr size_t kNumReads = 30;
constexpr size_t kUpdateThreads = 30;
ASSERT_OK(conn.ExecuteFormat(
"INSERT INTO t SELECT s, 0 FROM generate_series(0, $0) AS s", kUpdateThreads));
std::atomic<size_t> update_count{0};
TestThreadHolder thread_holder;
for (size_t i = 0; i < kUpdateThreads; ++i) {
thread_holder.AddThreadFunctor(
[this, &stop = thread_holder.stop_flag(), idx = i + 1, &update_count] {
auto update_conn = ASSERT_RESULT(Connect());
while (!stop.load(std::memory_order_acquire)) {
ASSERT_OK(update_conn.ExecuteFormat(
"UPDATE t SET v = $0 WHERE k = $1", RandomUniformInt(0, 10000), idx));
update_count.fetch_add(1, std::memory_order_acq_rel);
}
});
}
for (size_t i = 0; i < kNumReads; ++i) {
ASSERT_OK(conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"));
// Read item which is never modified to avoid possible transpared read restart
// and new read time selection.
ASSERT_OK(conn.Fetch("SELECT * FROM t WHERE k = 0"));
const auto current_update_count = update_count.load(std::memory_order_acquire);
// Wait for some updates before next read to increase probability of potential read restarts.
while (update_count.load(std::memory_order_acquire) < current_update_count + kUpdateThreads) {
std::this_thread::sleep_for(5ms);
}
auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM t"));
auto rows = PQntuples(res.get());
ASSERT_EQ(rows, kUpdateThreads + 1);
ASSERT_OK(conn.Execute("COMMIT"));
}
}

} // namespace pgwrapper
} // namespace yb
6 changes: 4 additions & 2 deletions src/yb/yql/pgwrapper/pg_mini-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ void PgMiniTest::TestInsertSelectRowLock(IsolationLevel isolation, RowMarkType r
}

ASSERT_OK(read_conn.ExecuteFormat("BEGIN TRANSACTION ISOLATION LEVEL $0", isolation_str));
ASSERT_OK(read_conn.Fetch("SELECT '(setting read point)'"));
// Send read request to a server for read time selection.
ASSERT_OK(read_conn.Fetch("SELECT * FROM t WHERE i = -1"));
ASSERT_OK(write_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", kKeys));
auto result = read_conn.FetchFormat("SELECT * FROM t FOR $0", row_mark_str);
if (isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
Expand Down Expand Up @@ -618,7 +619,8 @@ void PgMiniTest::TestDeleteSelectRowLock(IsolationLevel isolation, RowMarkType r
}

ASSERT_OK(read_conn.ExecuteFormat("BEGIN TRANSACTION ISOLATION LEVEL $0", isolation_str));
ASSERT_OK(read_conn.Fetch("SELECT '(setting read point)'"));
// Send read request to a server for read time selection.
ASSERT_OK(read_conn.Fetch("SELECT * FROM t WHERE i = -1"));
ASSERT_OK(write_conn.ExecuteFormat("DELETE FROM t WHERE i = $0", RandomUniformInt(0, kKeys - 1)));
auto result = read_conn.FetchFormat("SELECT * FROM t FOR $0", row_mark_str);
if (isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
Expand Down

0 comments on commit 8948633

Please sign in to comment.