Skip to content

Commit

Permalink
ENG-4855: #977: Handle restart read in YSQL
Browse files Browse the repository at this point in the history
Summary:
In PgReadOp we don't handle operation status, so read restart is ignored.
That causes it to return 0 rows in case of read restart.

Fixed by handling operation status and trying to handle read restart in the same fashion
as we do for writes.

Test Plan:
Launch local cluster:
bin/yb-ctl --rf 1 create

Launch workload:
java -jar ./target/yb-sample-apps.jar --workload SqlSnapshotTxns --nodes 127.0.0.1:5433

Reviewers: mihnea, robert, mikhail

Reviewed By: robert

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D6387
  • Loading branch information
spolitov committed Mar 25, 2019
1 parent 30725bf commit 046b4b6
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 31 deletions.
4 changes: 4 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,10 @@ void YBSession::SetReadPoint(const Restart restart) {
data_->SetReadPoint(restart);
}

bool YBSession::IsRestartRequired() const {
return data_->IsRestartRequired();
}

void YBSession::SetTransaction(YBTransactionPtr transaction) {
data_->SetTransaction(std::move(transaction));
}
Expand Down
3 changes: 3 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,9 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
// the current time.
void SetReadPoint(Restart restart);

// Returns true if our current read point requires restart.
bool IsRestartRequired() const;

// Changed transaction used by this session.
void SetTransaction(YBTransactionPtr transaction);

Expand Down
5 changes: 5 additions & 0 deletions src/yb/client/session-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ void YBSessionData::SetReadPoint(const Restart restart) {
}
}

bool YBSessionData::IsRestartRequired() {
auto rp = read_point();
return rp && rp->IsRestartRequired();
}

Status YBSessionData::Close(bool force) {
if (batcher_) {
if (batcher_->HasPendingOperations() && !force) {
Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/session-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class YBSessionData : public std::enable_shared_from_this<YBSessionData> {

void SetReadPoint(Restart restart);

bool IsRestartRequired();

// Changed transaction used by this session.
void SetTransaction(YBTransactionPtr transaction);

Expand Down
8 changes: 8 additions & 0 deletions src/yb/rpc/rpc_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ Status RpcContext::AddRpcSidecar(RefCntBuffer car, int* idx) {
return call_->AddRpcSidecar(car, idx);
}

int RpcContext::RpcSidecarsSize() const {
return call_->RpcSidecarsSize();
}

const RefCntBuffer& RpcContext::RpcSidecar(int idx) const {
return call_->RpcSidecar(idx);
}

void RpcContext::ResetRpcSidecars() {
call_->ResetRpcSidecars();
}
Expand Down
4 changes: 4 additions & 0 deletions src/yb/rpc/rpc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class RpcContext {
// by the RPC response.
CHECKED_STATUS AddRpcSidecar(RefCntBuffer car, int* idx);

int RpcSidecarsSize() const;

const RefCntBuffer& RpcSidecar(int idx) const;

// Removes all RpcSidecars.
void ResetRpcSidecars();

Expand Down
13 changes: 13 additions & 0 deletions src/yb/rpc/yb_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,20 @@ Status YBInboundCall::AddRpcSidecar(RefCntBuffer car, int* idx) {
return Status::OK();
}

int YBInboundCall::RpcSidecarsSize() const {
return sidecars_.size();
}

const RefCntBuffer& YBInboundCall::RpcSidecar(int idx) {
return sidecars_[idx];
}

void YBInboundCall::ResetRpcSidecars() {
if (consumption_) {
for (const auto& sidecar : sidecars_) {
consumption_.Add(-sidecar.size());
}
}
sidecars_.clear();
}

Expand Down
4 changes: 4 additions & 0 deletions src/yb/rpc/yb_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class YBInboundCall : public InboundCall {
// See RpcContext::AddRpcSidecar()
CHECKED_STATUS AddRpcSidecar(RefCntBuffer car, int* idx);

int RpcSidecarsSize() const;

const RefCntBuffer& RpcSidecar(int idx);

// See RpcContext::ResetRpcSidecars()
void ResetRpcSidecars();

Expand Down
64 changes: 37 additions & 27 deletions src/yb/yql/pggate/pg_doc_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace yb {
namespace pggate {

PgDocOp::PgDocOp(PgSession::ScopedRefPtr pg_session, uint64_t* read_time)
: pg_session_(std::move(pg_session)), read_time_(read_time) {
: pg_session_(std::move(pg_session)), read_time_(read_time),
can_restart_(!pg_session_->HasAppliedOperations()) {
}

PgDocOp::~PgDocOp() {
Expand Down Expand Up @@ -58,7 +59,7 @@ Result<RequestSent> PgDocOp::Execute() {
InitUnlocked(&lock);

RETURN_NOT_OK(SendRequestUnlocked());

return RequestSent(waiting_for_response_);
}

Expand Down Expand Up @@ -128,6 +129,27 @@ Status PgDocOp::SendRequestIfNeededUnlocked() {
return Status::OK();
}

bool PgDocOp::CheckRestartUnlocked(client::YBPgsqlOp* op) {
if (op->succeeded()) {
return false;
}

if (op->response().status() == PgsqlResponsePB::PGSQL_STATUS_RESTART_REQUIRED_ERROR &&
can_restart_) {
exec_status_ = pg_session_->RestartTransaction();
if (exec_status_.ok()) {
exec_status_ = SendRequestUnlocked();
if (exec_status_.ok()) {
return true;
}
}
} else {
exec_status_ = STATUS(QLError, op->response().error_message());
}

return false;
}

//--------------------------------------------------------------------------------------------------

PgDocReadOp::PgDocReadOp(
Expand Down Expand Up @@ -170,7 +192,12 @@ void PgDocReadOp::ReceiveResponse(Status exec_status) {
waiting_for_response_ = false;
exec_status_ = exec_status;

if (!exec_status.ok()) {
if (exec_status.ok() && CheckRestartUnlocked(read_op_.get())) {
return;
}

// exec_status_ could be changed by CheckRestartUnlocked
if (!exec_status_.ok()) {
end_of_data_ = true;
return;
}
Expand All @@ -182,9 +209,9 @@ void PgDocReadOp::ReceiveResponse(Status exec_status) {
// Setup request for the next batch of data.
const PgsqlResponsePB& res = read_op_->response();
if (res.has_paging_state()) {
PgsqlReadRequestPB *req = read_op_->mutable_request();
// Set up paging state for next request.
*req->mutable_paging_state() = res.paging_state();
PgsqlReadRequestPB *req = read_op_->mutable_request();
// Set up paging state for next request.
*req->mutable_paging_state() = res.paging_state();
// Parse/Analysis/Rewrite catalog version has already been checked on the first request.
// The docdb layer will check the target table's schema version is compatible.
// This allows long-running queries to continue in the presence of other DDL statements
Expand All @@ -201,8 +228,7 @@ void PgDocReadOp::ReceiveResponse(Status exec_status) {
//--------------------------------------------------------------------------------------------------

PgDocWriteOp::PgDocWriteOp(PgSession::ScopedRefPtr pg_session, client::YBPgsqlWriteOp *write_op)
: PgDocOp(pg_session, nullptr /* read_time */), write_op_(write_op),
can_restart_(!pg_session->HasAppliedOperations()) {
: PgDocOp(pg_session, nullptr /* read_time */), write_op_(write_op) {
}

PgDocWriteOp::~PgDocWriteOp() {
Expand Down Expand Up @@ -233,26 +259,10 @@ void PgDocWriteOp::ReceiveResponse(Status exec_status) {
CHECK(waiting_for_response_);
waiting_for_response_ = false;
cv_.notify_all();
exec_status_ = exec_status;

if (exec_status.ok() && !write_op_->succeeded()) {
if (write_op_->response().status() == PgsqlResponsePB::PGSQL_STATUS_RESTART_REQUIRED_ERROR &&
can_restart_) {
auto restart_status = pg_session_->RestartTransaction();
if (!restart_status.ok()) {
exec_status_ = restart_status;
} else {
auto status = SendRequestUnlocked();
if (!status.ok()) {
exec_status_ = status;
} else {
return;
}
}
} else {
exec_status_ = STATUS(QLError, write_op_->response().error_message());
}
} else {
exec_status_ = exec_status;
if (exec_status.ok() && CheckRestartUnlocked(write_op_.get())) {
return;
}
if (!is_canceled_ && exec_status_.ok()) {
// Save it to cache.
Expand Down
8 changes: 7 additions & 1 deletion src/yb/yql/pggate/pg_doc_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class PgDocOp {
// all data in the cache.
CHECKED_STATUS SendRequestIfNeededUnlocked();

// Checks whether op causes restart. Could set exec_status_.
// Returns true is restart was initiated;
bool CheckRestartUnlocked(client::YBPgsqlOp* op);

// Session control.
PgSession::ScopedRefPtr pg_session_;

Expand Down Expand Up @@ -99,6 +103,9 @@ class PgDocOp {

// Caching state variables.
std::list<string> result_cache_;

// Whether we can restart this operation.
const bool can_restart_;
};

class PgDocReadOp : public PgDocOp {
Expand Down Expand Up @@ -159,7 +166,6 @@ class PgDocWriteOp : public PgDocOp {

// Operator.
std::shared_ptr<client::YBPgsqlWriteOp> write_op_;
bool can_restart_;
};

// TODO(neil)
Expand Down
8 changes: 6 additions & 2 deletions src/yb/yql/pggate/pg_txn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ Status PgTxnManager::BeginWriteTransactionIfNecessary(bool read_only_op) {

Status PgTxnManager::RestartTransaction() {
if (!txn_in_progress_ || !txn_) {
return STATUS(IllegalState, "Attempting to restart when transaction is not in progress");
if (!session_->IsRestartRequired()) {
return STATUS(IllegalState, "Attempted to restart when session does not require restart");
}
session_->SetReadPoint(client::Restart::kTrue);
return Status::OK();
}
if (!txn_->IsRestartRequired()) {
return STATUS(IllegalState, "Attempting to restart when transaction does not require restart");
return STATUS(IllegalState, "Attempted to restart when transaction does not require restart");
}
auto new_txn = VERIFY_RESULT(txn_->CreateRestartedTransaction());
ResetTxnAndSession();
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pgwrapper/libpq_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Status Execute(PGconn* conn, const std::string& command) {
}

Result<PGResultPtr> Fetch(PGconn* conn, const std::string& command) {
PGResultPtr res(PQexecParams(conn, "SELECT * FROM t", 0, nullptr, nullptr, nullptr, nullptr, 1));
PGResultPtr res(PQexecParams(conn, command.c_str(), 0, nullptr, nullptr, nullptr, nullptr, 1));
auto status = PQresultStatus(res.get());
if (ExecStatusType::PGRES_TUPLES_OK != status) {
return STATUS_FORMAT(NetworkError, "Fetch '$0' failed: $1, message: $2",
Expand Down
65 changes: 65 additions & 0 deletions src/yb/yql/pgwrapper/pg_libpq-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@

#include "yb/util/random_util.h"

#include <boost/scope_exit.hpp>

#include "yb/yql/pgwrapper/libpq_utils.h"
#include "yb/yql/pgwrapper/pg_wrapper_test_base.h"

using namespace std::literals;

DECLARE_int64(retryable_rpc_single_call_timeout_ms);
DECLARE_int32(yb_client_admin_operation_timeout_sec);

Expand Down Expand Up @@ -273,5 +277,66 @@ TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(SerializableReadWriteConflict)) {
}
}

TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ReadRestart)) {
auto conn = ASSERT_RESULT(Connect());
ASSERT_OK(Execute(conn.get(), "CREATE TABLE t (key INT PRIMARY KEY)"));

std::atomic<bool> stop(false);
std::atomic<int> last_written(0);

std::thread write_thread([this, &stop, &last_written] {
auto write_conn = ASSERT_RESULT(Connect());
int write_key = 1;
while (!stop.load(std::memory_order_acquire)) {
SCOPED_TRACE(Format("Writing: $0", write_key));

ASSERT_OK(Execute(write_conn.get(), "BEGIN"));
auto status = Execute(write_conn.get(), Format("INSERT INTO t (key) VALUES ($0)", write_key));
if (status.ok()) {
status = Execute(write_conn.get(), "COMMIT");
}
if (status.ok()) {
last_written.store(write_key, std::memory_order_release);
++write_key;
} else {
LOG(INFO) << "Write " << write_key << " failed: " << status;
}
}
});

BOOST_SCOPE_EXIT(&stop, &write_thread) {
stop.store(true, std::memory_order_release);
write_thread.join();
} BOOST_SCOPE_EXIT_END;

auto deadline = CoarseMonoClock::now() + 30s;

while (CoarseMonoClock::now() < deadline) {
int read_key = last_written.load(std::memory_order_acquire);
if (read_key == 0) {
std::this_thread::sleep_for(100ms);
continue;
}

SCOPED_TRACE(Format("Reading: $0", read_key));

ASSERT_OK(Execute(conn.get(), "BEGIN"));

auto res = ASSERT_RESULT(Fetch(conn.get(), Format("SELECT * FROM t WHERE key = $0", read_key)));
auto columns = PQnfields(res.get());
ASSERT_EQ(1, columns);

auto lines = PQntuples(res.get());
ASSERT_EQ(1, lines);

auto key = ASSERT_RESULT(GetInt32(res.get(), 0, 0));
ASSERT_EQ(key, read_key);

ASSERT_OK(Execute(conn.get(), "ROLLBACK"));
}

ASSERT_GE(last_written.load(std::memory_order_acquire), 100);
}

} // namespace pgwrapper
} // namespace yb

0 comments on commit 046b4b6

Please sign in to comment.