Skip to content

Commit

Permalink
ENG-3649: Introducing StatusFunctor and changing the return type of Y…
Browse files Browse the repository at this point in the history
…BSession::SetFlushMode to void

Summary:
This is a preparation diff for D5211.
This diff introduces StatusFunctor and changes the return type of YBSession::SetFlushMode to void.

Note: D5211 will make secondary index update RPC calls asynchronous.

Test Plan: Jenkins

Reviewers: mikhail, robert, timur

Reviewed By: robert, timur

Subscribers: bogdan, ybase, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D5222
  • Loading branch information
spolitov committed Jul 27, 2018
1 parent 6a20c87 commit cec8f4e
Show file tree
Hide file tree
Showing 48 changed files with 136 additions and 236 deletions.
1 change: 1 addition & 0 deletions python/yb/build_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def make_postgres(self):
make_parallelism = os.environ.get('YB_MAKE_PARALLELISM')
if make_parallelism:
make_cmd += ['-j', str(int(make_parallelism))]
os.environ['YB_COMPILER_TYPE'] = self.compiler_type

# Create a script allowing to easily run "make" from the build directory with the right
# environment.
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ MonoTime Batcher::ComputeDeadlineUnlocked() const {
return ret;
}

void Batcher::FlushAsync(boost::function<void(const Status&)> callback) {
void Batcher::FlushAsync(StatusFunctor callback) {
{
std::lock_guard<simple_spinlock> l(lock_);
CHECK_EQ(state_, kGatheringOps);
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
// then the callback will receive Status::OK. Otherwise, it will receive IOError,
// and the caller must inspect the ErrorCollector to retrieve more detailed
// information on which operations failed.
void FlushAsync(boost::function<void(const Status&)> callback);
void FlushAsync(StatusFunctor callback);

MonoTime deadline() const {
return deadline_;
Expand Down Expand Up @@ -235,7 +235,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
// If state is kFlushing, this member will be set to the user-provided
// callback. Once there are no more in-flight operations, the callback
// will be called exactly once (and the state changed to kFlushed).
boost::function<void(const Status&)> flush_callback_;
StatusFunctor flush_callback_;

// All buffered or in-flight ops.
// Added to this set during apply, removed during Finished of AsyncRpc.
Expand Down
15 changes: 5 additions & 10 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {
client = client_.get();
}
std::shared_ptr<YBSession> session = client->NewSession();
EXPECT_OK(session->SetFlushMode(YBSession::MANUAL_FLUSH));
session->SetTimeout(10s);
return session;
}
Expand All @@ -214,7 +213,7 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {
ASSERT_OK(session->Apply(BuildTestRow(table, i)));
}
FlushSessionOrDie(session);
ASSERT_NO_FATALS(CheckNoRpcOverflow());
ASSERT_NO_FATALS(CheckNoRpcOverflow());
}

// Inserts 'num_rows' using the default client.
Expand All @@ -228,7 +227,7 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {
ASSERT_OK(session->Apply(UpdateTestRow(table, i)));
}
FlushSessionOrDie(session);
ASSERT_NO_FATALS(CheckNoRpcOverflow());
ASSERT_NO_FATALS(CheckNoRpcOverflow());
}

void DeleteTestRows(const TableHandle& table, int lo, int hi) {
Expand All @@ -237,7 +236,7 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {
ASSERT_OK(session->Apply(DeleteTestRow(table, i)));
}
FlushSessionOrDie(session);
ASSERT_NO_FATALS(CheckNoRpcOverflow());
ASSERT_NO_FATALS(CheckNoRpcOverflow());
}

shared_ptr<YBqlWriteOp> BuildTestRow(const TableHandle& table, int index) {
Expand Down Expand Up @@ -313,8 +312,7 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {
client_table_.AddColumns({"key"}, req);
auto session = client_->NewSession();
session->SetTimeout(60s);
ASSERT_OK(session->Apply(op));
ASSERT_OK(session->Flush());
ASSERT_OK(session->ApplyAndFlush(op));
ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status());
auto rowblock = ql::RowsResult(op.get()).GetRowBlock();
for (const auto& row : rowblock->rows()) {
Expand Down Expand Up @@ -834,9 +832,7 @@ TEST_F(ClientTest, DISABLED_TestInsertSingleRowManualBatch) {
// Try inserting without specifying a key: should fail.
client_table_.AddInt32ColumnValue(insert->mutable_request(), "int_val", 54321);
client_table_.AddStringColumnValue(insert->mutable_request(), "string_val", "hello world");
Status s = session->Apply(insert);
ASSERT_OK(s);
ASSERT_OK(session->Flush());
ASSERT_OK(session->ApplyAndFlush(insert));
ASSERT_EQ(QLResponsePB::YQL_STATUS_RUNTIME_ERROR, insert->response().status());

// Retry
Expand Down Expand Up @@ -1711,7 +1707,6 @@ shared_ptr<YBSession> LoadedSession(const shared_ptr<YBClient>& client,
bool fwd, int max, MonoDelta timeout) {
shared_ptr<YBSession> session = client->NewSession();
session->SetTimeout(timeout);
CHECK_OK(session->SetFlushMode(YBSession::MANUAL_FLUSH));
for (int i = 0; i < max; ++i) {
int key = fwd ? i : max - i;
CHECK_OK(ApplyUpdateToSession(session.get(), tbl, key, fwd));
Expand Down
22 changes: 13 additions & 9 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1483,10 +1483,6 @@ Status YBSession::Close() {
return data_->Close(false);
}

Status YBSession::SetFlushMode(FlushMode m) {
return data_->SetFlushMode(m);
}

void YBSession::SetTimeout(MonoDelta timeout) {
data_->SetTimeout(timeout);
}
Expand All @@ -1495,7 +1491,7 @@ Status YBSession::Flush() {
return data_->Flush();
}

void YBSession::FlushAsync(boost::function<void(const Status&)> callback) {
void YBSession::FlushAsync(StatusFunctor callback) {
data_->FlushAsync(std::move(callback));
}

Expand All @@ -1513,8 +1509,7 @@ Status YBSession::ReadSync(std::shared_ptr<YBOperation> yb_op) {
return s.Wait();
}

void YBSession::ReadAsync(std::shared_ptr<YBOperation> yb_op,
boost::function<void(const Status&)> callback) {
void YBSession::ReadAsync(std::shared_ptr<YBOperation> yb_op, StatusFunctor callback) {
CHECK(yb_op->read_only());
CHECK_OK(Apply(std::move(yb_op)));
FlushAsync(std::move(callback));
Expand All @@ -1524,8 +1519,17 @@ Status YBSession::Apply(std::shared_ptr<YBOperation> yb_op) {
return data_->Apply(std::move(yb_op));
}

Status YBSession::Apply(const std::vector<YBOperationPtr>& ops, VerifyResponse verify_response) {
return data_->Apply(ops, verify_response);
Status YBSession::ApplyAndFlush(std::shared_ptr<YBOperation> yb_op) {
return data_->ApplyAndFlush(std::move(yb_op));
}

Status YBSession::Apply(const std::vector<YBOperationPtr>& ops) {
return data_->Apply(ops);
}

Status YBSession::ApplyAndFlush(
const std::vector<YBOperationPtr>& ops, VerifyResponse verify_response) {
return data_->ApplyAndFlush(ops, verify_response);
}

int YBSession::CountBufferedOperations() const {
Expand Down
51 changes: 7 additions & 44 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -944,37 +944,6 @@ class YBSession : public std::enable_shared_from_this<YBSession> {

~YBSession();

enum FlushMode {
// Every write will be sent to the server in-band with the Apply()
// call. No batching will occur. This is the default flush mode. In this
// mode, the Flush() call never has any effect, since each Apply() call
// has already flushed the buffer. This is the default flush mode.
AUTO_FLUSH_SYNC,

// Apply() calls will return immediately, but the writes will be sent in
// the background, potentially batched together with other writes from
// the same session. If there is not sufficient buffer space, then Apply()
// may block for buffer space to be available.
//
// Because writes are applied in the background, any errors will be stored
// in a session-local buffer. Call CountPendingErrors() or GetPendingErrors()
// to retrieve them.
// TODO: provide an API for the user to specify a callback to do their own
// error reporting.
// TODO: specify which threads the background activity runs on (probably the
// messenger IO threads?)
//
// NOTE: This is not implemented yet, see KUDU-456.
//
// The Flush() call can be used to block until the buffer is empty.
AUTO_FLUSH_BACKGROUND,

// Apply() calls will return immediately, and the writes will not be
// sent until the user calls Flush(). If the buffer runs past the
// configured space limit, then Apply() will return an error.
MANUAL_FLUSH
};

// Set the consistent read point used by the non-transactional operations in this session. If
// the operations are retries and last read point indicates the operations are to be restarted,
// the read point will be updated to restart read-time. Otherwise, the read point will be set to
Expand All @@ -984,19 +953,12 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
// Changed transaction used by this session.
void SetTransaction(YBTransactionPtr transaction);

// Set the flush mode.
// REQUIRES: there should be no pending writes -- call Flush() first to ensure.
CHECKED_STATUS SetFlushMode(FlushMode m) WARN_UNUSED_RESULT;

// Set the amount of buffer space used by this session for outbound writes.
// The effect of the buffer size varies based on the flush mode of the
// session:
//
// AUTO_FLUSH_SYNC:
// since no buffering is done, this has no effect
// AUTO_FLUSH_BACKGROUND:
// if the buffer space is exhausted, then write calls will block until there
// is space available in the buffer.
// MANUAL_FLUSH:
// if the buffer space is exhausted, then write calls will return an error.
CHECKED_STATUS SetMutationBufferSpace(size_t size) WARN_UNUSED_RESULT;
Expand All @@ -1006,7 +968,7 @@ class YBSession : public std::enable_shared_from_this<YBSession> {

CHECKED_STATUS ReadSync(std::shared_ptr<YBOperation> yb_op) WARN_UNUSED_RESULT;

void ReadAsync(std::shared_ptr<YBOperation> yb_op, boost::function<void(const Status&)> callback);
void ReadAsync(std::shared_ptr<YBOperation> yb_op, StatusFunctor callback);

// TODO: add "doAs" ability here for proxy servers to be able to act on behalf of
// other users, assuming access rights.
Expand All @@ -1025,11 +987,13 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
//
// This is thread safe.
CHECKED_STATUS Apply(YBOperationPtr yb_op);
CHECKED_STATUS ApplyAndFlush(YBOperationPtr yb_op);

// verify_response - supported only in auto flush mode. Checks that after flush operation
// is succeeded. (i.e. op->succeeded() returns true).
CHECKED_STATUS Apply(const std::vector<YBOperationPtr>& ops,
VerifyResponse verify_response = VerifyResponse::kFalse);
CHECKED_STATUS Apply(const std::vector<YBOperationPtr>& ops);
CHECKED_STATUS ApplyAndFlush(const std::vector<YBOperationPtr>& ops,
VerifyResponse verify_response = VerifyResponse::kFalse);

// Flush any pending writes.
//
Expand Down Expand Up @@ -1065,7 +1029,7 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
//
// For FlushAsync, 'cb' must remain valid until it is invoked.
CHECKED_STATUS Flush() WARN_UNUSED_RESULT;
void FlushAsync(boost::function<void(const Status&)> callback);
void FlushAsync(StatusFunctor callback);
std::future<Status> FlushFuture();

// Abort the unflushed or in-flight operations in the session.
Expand Down Expand Up @@ -1094,8 +1058,7 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
// so this will return 0.
int CountBufferedOperations() const;

// Return the number of errors which are pending. Errors may accumulate when
// using the AUTO_FLUSH_BACKGROUND mode.
// Return the number of errors which are pending.
int CountPendingErrors() const;

// Return any errors from previous calls. If there were more errors
Expand Down
11 changes: 7 additions & 4 deletions src/yb/client/ql-dml-test-base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void KeyValueTableTest::CreateTable(Transactional transactional) {

Result<shared_ptr<YBqlWriteOp>> KeyValueTableTest::WriteRow(
const YBSessionPtr& session, int32_t key, int32_t value,
const WriteOpType op_type) {
const WriteOpType op_type, Flush flush) {
VLOG(4) << "Calling WriteRow key=" << key << " value=" << value << " op_type="
<< yb::ToString(op_type);
const QLWriteRequestPB::QLStmtType stmt_type = GetQlStatementType(op_type);
Expand All @@ -108,8 +108,11 @@ Result<shared_ptr<YBqlWriteOp>> KeyValueTableTest::WriteRow(
table_.AddInt32ColumnValue(req, kValueColumn, value);
}
RETURN_NOT_OK(session->Apply(op));
if (op->response().status() != QLResponsePB::YQL_STATUS_OK) {
return STATUS_FORMAT(QLError, "Error writing row: $0", op->response().error_message());
if (flush) {
RETURN_NOT_OK(session->Flush());
if (op->response().status() != QLResponsePB::YQL_STATUS_OK) {
return STATUS_FORMAT(QLError, "Error writing row: $0", op->response().error_message());
}
}
return op;
}
Expand All @@ -130,7 +133,7 @@ Result<int32_t> KeyValueTableTest::SelectRow(
auto* const req = op->mutable_request();
QLAddInt32HashValue(req, key);
table_.AddColumns({column}, req);
auto status = session->Apply(op);
auto status = session->ApplyAndFlush(op);
if (status.IsIOError()) {
for (const auto& error : session->GetPendingErrors()) {
LOG(WARNING) << "Error: " << error->status() << ", op: " << error->failed_op().ToString();
Expand Down
4 changes: 3 additions & 1 deletion src/yb/client/ql-dml-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class QLDmlTestBase : public YBMiniClusterTestBase<MiniCluster> {

YB_STRONGLY_TYPED_BOOL(Transactional);
YB_DEFINE_ENUM(WriteOpType, (INSERT)(UPDATE)(DELETE));
YB_STRONGLY_TYPED_BOOL(Flush);

class KeyValueTableTest : public QLDmlTestBase {
protected:
Expand All @@ -75,7 +76,8 @@ class KeyValueTableTest : public QLDmlTestBase {
// op_type == WriteOpType::DELETE: delete from t where k=key; (parameter "value" is unused).
Result<shared_ptr<YBqlWriteOp>> WriteRow(
const YBSessionPtr& session, int32_t key, int32_t value,
const WriteOpType op_type = WriteOpType::INSERT);
const WriteOpType op_type = WriteOpType::INSERT,
Flush flush = Flush::kTrue);

Result<shared_ptr<YBqlWriteOp>> DeleteRow(
const YBSessionPtr& session, int32_t key);
Expand Down
Loading

0 comments on commit cec8f4e

Please sign in to comment.