Skip to content

Commit

Permalink
Remove unnecessary encoding and decoding from DocDB
Browse files Browse the repository at this point in the history
Summary:
Removed unnecessary doc key encoding and decoding from DocDB (github issue #70)

Added special test to address scenario with a long hash key and list columns.
Test runs for 15 seconds and measures the number of reads.
Avg. number of reads in master (per 10 launches): 12262.6, avg. number of reads with encoding/decoding optimisation: 19628.8.

Test Plan: ybd --cxx-test ql-list-test --gtest_filter *.Performance

Reviewers: robert, timur, amitanand, mikhail

Reviewed By: mikhail

Subscribers: bharat, rahuldesirazu, timur, kannan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4266
  • Loading branch information
spolitov committed Mar 6, 2018
1 parent a44baa1 commit 13724f5
Show file tree
Hide file tree
Showing 43 changed files with 682 additions and 282 deletions.
1 change: 1 addition & 0 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,6 @@ ADD_YB_TEST(client-test)
ADD_YB_TEST(client-unittest)
ADD_YB_TEST(ql-dml-test)
ADD_YB_TEST(ql-dml-ttl-test)
ADD_YB_TEST(ql-list-test)
ADD_YB_TEST(ql-tablet-test)
ADD_YB_TEST(ql-transaction-test)
4 changes: 4 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,10 @@ Status YBSession::Apply(std::shared_ptr<YBOperation> yb_op) {
return data_->Apply(std::move(yb_op));
}

Status YBSession::Apply(const std::vector<YBOperationPtr>& ops, VerifyResponse verify_response) {
return data_->Apply(ops, verify_response);
}

int YBSession::CountBufferedOperations() const {
return data_->CountBufferedOperations();
}
Expand Down
10 changes: 9 additions & 1 deletion src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "yb/util/result.h"
#include "yb/util/status.h"
#include "yb/util/status_callback.h"
#include "yb/util/strongly_typed_bool.h"
#include "yb/util/net/net_fwd.h"

template<class T> class scoped_refptr;
Expand Down Expand Up @@ -818,6 +819,8 @@ typedef std::vector<std::unique_ptr<YBError>> CollectedErrors;

class YBSessionData;

YB_STRONGLY_TYPED_BOOL(VerifyResponse);

// A YBSession belongs to a specific YBClient, and represents a context in
// which all read/write data access should take place. Within a session,
// multiple operations may be accumulated and batched together for better
Expand Down Expand Up @@ -954,7 +957,12 @@ class YBSession : public std::enable_shared_from_this<YBSession> {
// may be retrieved at any time.
//
// This is thread safe.
CHECKED_STATUS Apply(std::shared_ptr<YBOperation> yb_op) WARN_UNUSED_RESULT;
CHECKED_STATUS Apply(YBOperationPtr yb_op);

// verify_response - supported only in auto flush mode. Checks that after flush operation
// is succeeded. (i.e. op->succeeded() returns true).
CHECKED_STATUS Apply(const std::vector<YBOperationPtr>& ops,
VerifyResponse verify_response = VerifyResponse::kFalse);

// Flush any pending writes.
//
Expand Down
4 changes: 3 additions & 1 deletion src/yb/client/client_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ typedef std::shared_ptr<YBSession> YBSessionPtr;
class YBTable;
typedef std::shared_ptr<YBTable> YBTablePtr;

class YBOperation;
typedef std::shared_ptr<YBOperation> YBOperationPtr;

class TableHandle;
class TransactionManager;
class YBMetaDataCache;
class YBOperation;
class YBSchema;
class YBTableAlterer;
class YBTableCreator;
Expand Down
188 changes: 188 additions & 0 deletions src/yb/client/ql-list-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include <thread>

#include "yb/client/ql-dml-test-base.h"
#include "yb/client/table_handle.h"

#include "yb/yql/cql/ql/util/statement_result.h"

#include "yb/util/random_util.h"

namespace yb {
namespace client {

namespace {

constexpr int kCollectionSize = 10;

std::string To8LengthHex(uint32_t value) {
char buffer[16];
snprintf(buffer, sizeof(buffer), "%08x", value);
return buffer;
}

template <class RequestPB>
void AddHash(int32_t hash_seed, RequestPB* req) {
QLAddInt32HashValue(req, hash_seed);
QLAddInt32HashValue(req, ~hash_seed);
auto s1 = To8LengthHex(hash_seed);
auto s2 = To8LengthHex(~hash_seed);
// Need 40 chars long string.
QLAddStringHashValue(req, s1 + s2 + s1 + s2 + s1);
}

// Returns seed for appropriate row. Seed is used to generate all row values.
int32_t RowSeed(int32_t hash_seed, int32_t range) {
return (hash_seed << 16) + range;
}

int32_t ListEntry(int32_t hash_seed, int32_t range, int32_t i) {
return RowSeed(hash_seed, range) * i;
}

} // namespace

class QLListTest : public QLDmlTestBase {
public:
QLListTest() {
}

void SetUp() override {
QLDmlTestBase::SetUp();

YBSchemaBuilder b;
b.AddColumn("h1")->Type(INT32)->HashPrimaryKey()->NotNull();
b.AddColumn("h2")->Type(INT32)->HashPrimaryKey()->NotNull();
b.AddColumn("h3")->Type(STRING)->HashPrimaryKey()->NotNull();
b.AddColumn("r1")->Type(INT32)->PrimaryKey()->NotNull();
b.AddColumn("l1")->Type(QLType::CreateTypeList(DataType::INT32));
b.AddColumn("s1")->Type(QLType::CreateTypeSet(DataType::STRING));
b.AddColumn("s2")->Type(QLType::CreateTypeSet(DataType::STRING));

ASSERT_OK(table_.Create(kTableName, CalcNumTablets(3), client_.get(), &b));
}

void InsertRows(YBSession* session, int32_t hash_seed, int32_t ranges) {
std::vector<YBOperationPtr> ops;
for (int32_t range = 0; range != ranges; ++range) {
auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
auto* const req = op->mutable_request();
AddHash(hash_seed, req);
QLAddInt32RangeValue(req, range);
auto l1 = table_.PrepareColumn(req, "l1")->mutable_list_value();
auto s1 = table_.PrepareColumn(req, "s1")->mutable_set_value();
auto s2 = table_.PrepareColumn(req, "s2")->mutable_set_value();
int32_t seed = RowSeed(hash_seed, range);
for (int i = 1; i <= kCollectionSize; ++i) {
l1->add_elems()->set_int32_value(ListEntry(hash_seed, range, i));
s1->add_elems()->set_string_value(To8LengthHex(seed * i));
s2->add_elems()->set_string_value(To8LengthHex((~seed) * i));
}
ops.push_back(std::move(op));
}
ASSERT_OK(session->Apply(ops));
}

std::unique_ptr<QLRowBlock> ReadRows(YBSession* session, int32_t hash_seed) {
auto op = table_.NewReadOp();
auto* const req = op->mutable_request();
AddHash(hash_seed, req);
table_.AddColumns(table_.AllColumnNames(), req);
EXPECT_OK(session->Apply(op));
EXPECT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);

return ql::RowsResult(op.get()).GetRowBlock();
}

TableHandle table_;
};

TEST_F(QLListTest, Simple) {
auto session = NewSession();
constexpr int kKeys = 2;
constexpr int kRanges = 10;
for (int i = 1; i <= kKeys; ++i) {
InsertRows(session.get(), i, kRanges);
}

for (int k = 1; k <= kKeys; ++k) {
auto rowblock = ReadRows(session.get(), k);
ASSERT_EQ(kRanges, rowblock->row_count());
for (int r = 0; r != kRanges; ++r) {
const auto& row = rowblock->row(r);
ASSERT_EQ(r, row.column(3).int32_value());
const auto& l1 = row.column(4).list_value();
ASSERT_EQ(kCollectionSize, l1.elems_size());
for (int i = 1; i <= kCollectionSize; ++i) {
SCOPED_TRACE(Format("k: $0, r: $1, i: $2", k, r, i));
ASSERT_EQ(ListEntry(k, r, i), l1.elems(i - 1).int32_value());
}
// TODO add string set verification
}
}
}

TEST_F(QLListTest, Performance) {
DontVerifyClusterBeforeNextTearDown(); // To remove checksum from perf report

std::atomic<bool> stop(false);
std::atomic<int> inserted(0);
std::atomic<int> total_reads(0);
constexpr int kRanges = 10;
constexpr int kReaders = 4;
constexpr int kWarmupInserts = 100;
const auto kRunTime = 15s;

std::thread writer([this, &stop, &inserted] {
auto session = NewSession();
while (!stop.load()) {
auto index = ++inserted;
InsertRows(session.get(), index, kRanges);
if (index >= kWarmupInserts) {
std::this_thread::sleep_for(10ms);
}
}
});

while (inserted.load() < kWarmupInserts) {
std::this_thread::sleep_for(10ms);
}

std::vector<std::thread> readers;
for (int i = 0; i != kReaders; ++i) {
readers.emplace_back([this, &stop, &inserted, &total_reads, kRanges] {
auto session = NewSession();
while (!stop.load()) {
auto hash_seed = RandomUniformInt(1, inserted.load() - 1);
auto rowblock = ReadRows(session.get(), hash_seed);
ASSERT_EQ(kRanges, rowblock->row_count()) << "Seed: " << hash_seed;
++total_reads;
}
});
}

std::this_thread::sleep_for(kRunTime);

stop = true;
LOG(INFO) << "Total writes: " << inserted.load() << ", total reads: " << total_reads.load();

writer.join();
for (auto& t : readers) {
t.join();
}
}

} // namespace client
} // namespace yb
40 changes: 36 additions & 4 deletions src/yb/client/session-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,19 @@ void YBSessionData::set_allow_local_calls_in_curr_thread(bool flag) {
allow_local_calls_in_curr_thread_ = flag;
}

Status YBSessionData::Apply(std::shared_ptr<YBOperation> yb_op) {
internal::Batcher& YBSessionData::Batcher() {
if (!batcher_) {
batcher_.reset(new Batcher(client_.get(), error_collector_.get(), shared_from_this(),
transaction_));
batcher_.reset(new internal::Batcher(
client_.get(), error_collector_.get(), shared_from_this(), transaction_));
if (timeout_.Initialized()) {
batcher_->SetTimeout(timeout_);
}
}
Status s = batcher_->Add(yb_op);
return *batcher_;
}

Status YBSessionData::Apply(YBOperationPtr yb_op) {
Status s = Batcher().Add(yb_op);
if (!PREDICT_FALSE(s.ok())) {
error_collector_->AddError(yb_op, s);
return s;
Expand All @@ -152,6 +156,34 @@ Status YBSessionData::Apply(std::shared_ptr<YBOperation> yb_op) {
return Status::OK();
}

Status YBSessionData::Apply(
const std::vector<YBOperationPtr>& ops, VerifyResponse verify_response) {
auto& batcher = Batcher();
for (const auto& op : ops) {
Status s = batcher.Add(op);
if (!PREDICT_FALSE(s.ok())) {
error_collector_->AddError(op, s);
return s;
}
}

if (flush_mode_ == YBSession::AUTO_FLUSH_SYNC) {
return Flush();
} else if (verify_response) {
LOG(DFATAL) << "Verify response could be used only with sync flush mode.";
}

if (verify_response) {
for (const auto& op : ops) {
if (!op->succeeded()) {
return STATUS_FORMAT(RuntimeError, "Operation failed: ", op);
}
}
}

return Status::OK();
}

Status YBSessionData::Flush() {
Synchronizer s;
FlushAsync(s.AsStatusFunctor());
Expand Down
5 changes: 4 additions & 1 deletion src/yb/client/session-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class YBSessionData : public std::enable_shared_from_this<YBSessionData> {
YBSessionData(const YBSessionData&) = delete;
void operator=(const YBSessionData&) = delete;

CHECKED_STATUS Apply(std::shared_ptr<YBOperation> yb_op);
CHECKED_STATUS Apply(YBOperationPtr yb_op);
CHECKED_STATUS Apply(const std::vector<YBOperationPtr>& ops, VerifyResponse verify_response);

void FlushAsync(boost::function<void(const Status&)> callback);

Expand Down Expand Up @@ -99,6 +100,8 @@ class YBSessionData : public std::enable_shared_from_this<YBSessionData> {
bool allow_local_calls_in_curr_thread() const;

private:
internal::Batcher& Batcher();

// The client that this session is associated with.
const std::shared_ptr<YBClient> client_;

Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/table_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ class TableHandle {

std::vector<std::string> AllColumnNames() const;

private:
QLValuePB* PrepareColumn(QLWriteRequestPB* req, const string& column_name) const;
QLValuePB* PrepareCondition(
QLConditionPB* const condition, const string& column_name, const QLOperator op) const;

private:
typedef std::unordered_map<std::string, yb::ColumnId> ColumnIdsMap;
typedef std::unordered_map<yb::ColumnId, const std::shared_ptr<QLType>> ColumnTypesMap;

Expand Down
17 changes: 12 additions & 5 deletions src/yb/common/doc_hybrid_time.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,19 @@ Status DocHybridTime::FullyDecodeFrom(const Slice& encoded) {
return Status::OK();
}

Status DocHybridTime::DecodeFromEnd(const Slice& encoded_key_with_ht_at_end) {
Result<DocHybridTime> DocHybridTime::DecodeFromEnd(Slice* encoded_key_with_ht_at_end) {
int encoded_size = 0;
RETURN_NOT_OK(CheckAndGetEncodedSize(encoded_key_with_ht_at_end, &encoded_size));
Slice s(encoded_key_with_ht_at_end.data() + encoded_key_with_ht_at_end.size() - encoded_size,
encoded_size);
return FullyDecodeFrom(s);
RETURN_NOT_OK(CheckAndGetEncodedSize(*encoded_key_with_ht_at_end, &encoded_size));
Slice s(encoded_key_with_ht_at_end->end() - encoded_size, encoded_size);
DocHybridTime result;
RETURN_NOT_OK(result.FullyDecodeFrom(s));
encoded_key_with_ht_at_end->remove_suffix(encoded_size);
return result;
}

Status DocHybridTime::DecodeFromEnd(Slice encoded_key_with_ht_at_end) {
*this = VERIFY_RESULT(DecodeFromEnd(&encoded_key_with_ht_at_end));
return Status::OK();
}

string DocHybridTime::ToString() const {
Expand Down
6 changes: 5 additions & 1 deletion src/yb/common/doc_hybrid_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "yb/common/hybrid_time.h"
#include "yb/util/compare_util.h"
#include "yb/util/result.h"

namespace yb {

Expand Down Expand Up @@ -87,7 +88,10 @@ class DocHybridTime {
CHECKED_STATUS DecodeFrom(Slice *slice);

CHECKED_STATUS FullyDecodeFrom(const Slice& encoded);
CHECKED_STATUS DecodeFromEnd(const Slice& encoded_key_with_ht_at_end);
CHECKED_STATUS DecodeFromEnd(Slice encoded_key_with_ht);

// Decodes doc ht from end of slice, and removes corresponding bytes from provided slice.
static Result<DocHybridTime> DecodeFromEnd(Slice* encoded_key_with_ht);

bool operator==(const DocHybridTime& other) const {
return hybrid_time_ == other.hybrid_time_ && write_id_ == other.write_id_;
Expand Down
1 change: 0 additions & 1 deletion src/yb/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ using log::AsyncLogReader;
using log::Log;
using std::unique_ptr;
using rpc::Messenger;
using util::to_underlying;
using strings::Substitute;

METRIC_DEFINE_gauge_int64(tablet, majority_done_ops, "Leader Operations Acked by Majority",
Expand Down
Loading

0 comments on commit 13724f5

Please sign in to comment.