Skip to content

Commit

Permalink
[yugabyte#15856] YSQL: Pick read time on tserver for new statements i…
Browse files Browse the repository at this point in the history
…n Read Committed isolation

Summary:
In Read Committed isolation, a new read time is picked for each statement
(i.e., a new logical snapshot of the database is used for each statement's
reads). This is done (in PgClientService) by setting the read time to the
current time at the start of each new statement before issuing requests to any
tserver. However, this might results in high latencies in the first read op that
is executed as part of that statement because the tablet serving the read
(likely on another node) might have to wait for the "safe" time to reach the
picked read time. A long wait for safe time is usually seen when there are
concurrent writes to the tablet and the read enters while the raft replication
that moves the safe time ahead is still in progress (see yugabyte#11805).

This issue is avoided in Repeatable Read isolation because there, the first
tablet serving the read in a transaction is allowed to pick the read time as the
latest available "safe" time without having to wait for any catchup. This read
time is sent back to PgClientService as used_read_time so that future reads can
use the same read time. Note that even in Repeatable Read isolation, in case,
there are multiple parallel RPCs to various tservers, the read time is still
picked on the PgClientService because otherwise, the rpcs would have to wait for
one of them to execute and came back with a used_read_time.

This diff extends the same logic to Read Committed isolation.

Test Plan:
./yb_build.sh --java-test org.yb.pgsql.TestPgTransactions#testReadPointInReadCommittedIsolation
./yb_build.sh --java-test org.yb.pgsql.TestPgIsolationRegress

Reviewers: dmitry

Subscribers: yql, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D24075
  • Loading branch information
pkj415 committed Jun 30, 2023
1 parent dd00203 commit cb5c7be
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 39 deletions.
93 changes: 55 additions & 38 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ DEFINE_RUNTIME_string(ysql_sequence_cache_method, "connection",
"Where sequence values are cached for both existing and new sequences. Valid values are "
"\"connection\" and \"server\"");

DEFINE_NON_RUNTIME_bool(ysql_rc_force_pick_read_time_on_pg_client, false,
"When resetting read time for a statement in Read Commited isolation level,"
" pick read time on the PgClientService instead of allowing the tserver to"
" pick one.");
TAG_FLAG(ysql_rc_force_pick_read_time_on_pg_client, advanced);

DECLARE_bool(ysql_serializable_isolation_for_ddl_txn);
DECLARE_bool(ysql_ddl_rollback_enabled);

Expand Down Expand Up @@ -983,25 +989,30 @@ Status PgClientSession::DoPerform(const DataPtr& data, CoarseTimePoint deadline,
return Status::OK();
}

void PgClientSession::ProcessReadTimeManipulation(ReadTimeManipulation manipulation) {
Result<PgClientSession::UsedReadTimePtr> PgClientSession::ProcessReadTimeManipulation(
ReadTimeManipulation manipulation) {
switch (manipulation) {
case ReadTimeManipulation::RESET: {
// If a txn_ has been created, session_->read_point() returns the read point stored in txn_.
ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point();
rp->SetCurrentReadTime();

VLOG(1) << "Setting current ht as read point " << rp->GetReadTime();
auto* read_point = Session(PgClientSessionKind::kPlain)->read_point();
if (FLAGS_ysql_rc_force_pick_read_time_on_pg_client ||
Transaction(PgClientSessionKind::kPlain)) {
// If a txn_ has been created, session_->read_point() returns the read point stored in
// txn_.
read_point->SetCurrentReadTime();
VLOG(1) << "Setting current ht as read point " << read_point->GetReadTime();
return PgClientSession::UsedReadTimePtr();
}
return VERIFY_RESULT(ResetReadPoint(PgClientSessionKind::kPlain));
}
return;
case ReadTimeManipulation::RESTART: {
ConsistentReadPoint* rp = Session(PgClientSessionKind::kPlain)->read_point();
auto* rp = Session(PgClientSessionKind::kPlain)->read_point();
rp->Restart();

VLOG(1) << "Restarted read point " << rp->GetReadTime();
}
return;
return PgClientSession::UsedReadTimePtr();
case ReadTimeManipulation::NONE:
return;
return PgClientSession::UsedReadTimePtr();
case ReadTimeManipulation::ReadTimeManipulation_INT_MIN_SENTINEL_DO_NOT_USE_:
case ReadTimeManipulation::ReadTimeManipulation_INT_MAX_SENTINEL_DO_NOT_USE_:
break;
Expand Down Expand Up @@ -1085,39 +1096,26 @@ PgClientSession::SetupSession(

UsedReadTimePtr used_read_time;
if (options.restart_transaction()) {
if(options.ddl_mode()) {
return STATUS(NotSupported, "Not supported to restart DDL transaction");
}
RSTATUS_DCHECK(!options.ddl_mode(), NotSupported, "Restarting a DDL transaction not supported");
Transaction(kind) = VERIFY_RESULT(RestartTransaction(session, transaction));
transaction = Transaction(kind).get();
} else {
RSTATUS_DCHECK(
kind == PgClientSessionKind::kPlain ||
options.read_time_manipulation() == ReadTimeManipulation::NONE,
IllegalState,
"Read time manipulation can't be specified for kDdl/ kCatalog transactions");
ProcessReadTimeManipulation(options.read_time_manipulation());
if (options.has_read_time() || options.use_catalog_session()) {
const auto read_time = options.has_read_time() && options.read_time().has_read_ht()
? ReadHybridTime::FromPB(options.read_time()) : ReadHybridTime();
if (options.read_time_manipulation() != ReadTimeManipulation::NONE) {
RSTATUS_DCHECK(
kind == PgClientSessionKind::kPlain, IllegalState,
"Read time manipulation can't be specified for non kPlain sessions");
used_read_time = VERIFY_RESULT(ProcessReadTimeManipulation(options.read_time_manipulation()));
}
if (options.has_read_time() && options.read_time().has_read_ht()) {
const auto read_time = ReadHybridTime::FromPB(options.read_time());
session->SetReadPoint(read_time);
if (read_time) {
VLOG_WITH_PREFIX(3) << "Read time: " << read_time;
} else {
VLOG_WITH_PREFIX(3) << "Reset read time: " << session->read_point()->GetReadTime();
}
} else if (!transaction &&
(options.ddl_mode() || txn_serial_no_ != options.txn_serial_no())) {
session->SetReadPoint(ReadHybridTime());
if (kind == PgClientSessionKind::kPlain) {
used_read_time = std::weak_ptr<UsedReadTime>(
std::shared_ptr<UsedReadTime>(shared_from_this(), &plain_session_used_read_time_));
std::lock_guard guard(plain_session_used_read_time_.lock);
plain_session_used_read_time_.value = ReadHybridTime();
}
VLOG_WITH_PREFIX(3) << "Reset read time: " << session->read_point()->GetReadTime();
VLOG_WITH_PREFIX(3) << "Read time: " << read_time;
} else if (options.has_read_time() ||
options.use_catalog_session() ||
(!transaction && (txn_serial_no_ != options.txn_serial_no()))) {
used_read_time = VERIFY_RESULT(ResetReadPoint(kind));
} else {
if (!transaction && kind == PgClientSessionKind::kPlain) {
if (!transaction && kind == PgClientSessionKind::kPlain && !used_read_time.lock()) {
RETURN_NOT_OK(CheckPlainSessionReadTime());
}
VLOG_WITH_PREFIX(3) << "Keep read time: " << session->read_point()->GetReadTime();
Expand Down Expand Up @@ -1159,6 +1157,25 @@ PgClientSession::SetupSession(
return std::make_pair(sessions_[to_underlying(kind)], used_read_time);
}

Result<PgClientSession::UsedReadTimePtr> PgClientSession::ResetReadPoint(PgClientSessionKind kind) {
auto& data = sessions_[to_underlying(kind)];
RSTATUS_DCHECK(
!data.transaction, IllegalState,
"Can't reset read time in case distributed transaction has started");
auto& session = *data.session;
session.SetReadPoint(ReadHybridTime());
VLOG_WITH_PREFIX(3) << "Reset read time: " << session.read_point()->GetReadTime();

UsedReadTimePtr used_read_time;
if (kind == PgClientSessionKind::kPlain) {
used_read_time = std::weak_ptr(
std::shared_ptr<UsedReadTime>(shared_from_this(), &plain_session_used_read_time_));
std::lock_guard guard(plain_session_used_read_time_.lock);
plain_session_used_read_time_.value = ReadHybridTime();
}
return used_read_time;
}

std::string PgClientSession::LogPrefix() {
return SessionLogPrefix(id_);
}
Expand Down
10 changes: 9 additions & 1 deletion src/yb/tserver/pg_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class PgClientSession : public std::enable_shared_from_this<PgClientSession> {
Status ProcessResponse(
const PgClientSessionOperations& operations, const PgPerformRequestPB& req,
PgPerformResponsePB* resp, rpc::RpcContext* context);
void ProcessReadTimeManipulation(ReadTimeManipulation manipulation);
Result<PgClientSession::UsedReadTimePtr> ProcessReadTimeManipulation(
ReadTimeManipulation manipulation);

client::YBClient& client();
client::YBSessionPtr& EnsureSession(PgClientSessionKind kind);
Expand Down Expand Up @@ -172,6 +173,13 @@ class PgClientSession : public std::enable_shared_from_this<PgClientSession> {
template <class DataPtr>
Status DoPerform(const DataPtr& data, CoarseTimePoint deadline, rpc::RpcContext* context);

// Resets the session's current read point.
//
// For kPlain sessions, also reset the plain session used read time since the tserver will pick a
// read time and send back as "used read time" in the response for use by future rpcs of the
// session.
Result<PgClientSession::UsedReadTimePtr> ResetReadPoint(PgClientSessionKind kind);

const uint64_t id_;
client::YBClient& client_;
scoped_refptr<ClockBase> clock_;
Expand Down

0 comments on commit cb5c7be

Please sign in to comment.