Skip to content

Commit

Permalink
Improve performance of IntentAwareIterator
Browse files Browse the repository at this point in the history
Summary:
Improve IntentAwareIterator's performance in CassandraPersonalization read workload (range-scan) by about 14% by the following:
- Reuse seek key buffer in GetSubDocument().
- Avoid redundant IntentAwareIterator::SkipFutureRecords under IntentAwareIteratorPrefixScope from GetSubDocument().

Test Plan:
Before:

```
> java -jar ~/code/yugabyte/java/yb-loadtester/target/yb-sample-apps.jar -workload CassandraPersonalization -nodes 127.0.0.1:9042 -nouuid -max_written_key 10 -read_only -num_threads_write 0 -num_threads_read 1

2018-04-05 22:00:47,113 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 35.57 ops/sec (28.20 ms/op), 1070 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 30025 ms | maxWrittenKey: 10 | maxGeneratedKey: 10 |
2018-04-05 22:00:52,119 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 35.16 ops/sec (28.31 ms/op), 1246 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 35031 ms | maxWrittenKey: 10 | maxGeneratedKey: 10 |
2018-04-05 22:00:57,123 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 35.17 ops/sec (28.50 ms/op), 1422 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 40035 ms | maxWrittenKey: 10 | maxGeneratedKey: 10 |
```

After:

```
2018-04-05 22:08:28,063 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 40.77 ops/sec (24.62 ms/op), 1227 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 30014 ms | maxWrittenKey: 10 | maxGeneratedKey: 10 |
2018-04-05 22:08:33,067 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 40.56 ops/sec (24.63 ms/op), 1430 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 35018 ms | maxWrittenKey: 10 | maxGeneratedKey: 10 |
2018-04-05 22:08:38,072 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 40.36 ops/sec (24.73 ms/op), 1632 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 40023 ms | maxWrittenKey: 10 | maxGeneratedKey: 10 |

```

Reviewers: mikhail, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4588
  • Loading branch information
robertpang committed Apr 16, 2018
1 parent 7020d08 commit 158def7
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 40 deletions.
27 changes: 19 additions & 8 deletions src/yb/docdb/docdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,27 +702,38 @@ yb::Status GetSubDocument(
return Status::OK();
}

// For each subkey in the projection, build subdocument.
// Seed key_bytes with the subdocument key. For each subkey in the projection, build subdocument
// and reuse key_bytes while appending the subkey.
*data.result = SubDocument();
KeyBytes key_bytes(data.subdocument_key);
const size_t subdocument_key_size = key_bytes.size();
for (const PrimitiveValue& subkey : *projection) {
KeyBytes encoded_projection_subdockey(data.subdocument_key);
subkey.AppendToKey(&encoded_projection_subdockey);
// Append subkey to subdocument key. Reserve extra kMaxBytesPerEncodedHybridTime + 1 bytes in
// key_bytes to avoid the internal buffer from getting reallocated and moved by SeekForward()
// appending the hybrid time, thereby invalidating the buffer pointer saved by prefix_scope.
subkey.AppendToKey(&key_bytes);
key_bytes.Reserve(key_bytes.size() + kMaxBytesPerEncodedHybridTime + 1);

// This seek is to initialize the iterator for BuildSubDocument call.
IntentAwareIteratorPrefixScope prefix_scope(encoded_projection_subdockey, db_iter);
db_iter->SeekForward(encoded_projection_subdockey);
IntentAwareIteratorPrefixScope prefix_scope(key_bytes, db_iter);
db_iter->SeekForward(&key_bytes);

SubDocument descendant(ValueType::kInvalidValueType);
int64 num_values_observed = 0;
RETURN_NOT_OK(BuildSubDocument(
db_iter, data.Adjusted(encoded_projection_subdockey, &descendant),
max_deleted_ts, &num_values_observed));
db_iter, data.Adjusted(key_bytes, &descendant), max_deleted_ts, &num_values_observed));
if (descendant.value_type() != ValueType::kInvalidValueType) {
*data.doc_found = true;
}
data.result->SetChild(subkey, std::move(descendant));

// Restore subdocument key by truncating the appended subkey.
key_bytes.Truncate(subdocument_key_size);
}
// Make sure the iterator is placed outside the whole document in the end.
db_iter->SeekForward(KeyBytes(key_slice, static_cast<char>(ValueType::kMaxByte)));
key_bytes.Truncate(dockey_size);
key_bytes.AppendValueType(ValueType::kMaxByte);
db_iter->SeekForward(&key_bytes);
return Status::OK();
}

Expand Down
9 changes: 0 additions & 9 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,6 @@ KeyBytes AppendDocHt(const Slice& key, const DocHybridTime& doc_ht) {
return KeyBytes(key, Slice(buf, end));
}

KeyBytes AppendEncodedDocHt(const Slice& key, const Slice& encoded_doc_ht) {
KeyBytes key_bytes;
key_bytes.Reserve(key.size() + 1 + encoded_doc_ht.size());
key_bytes.AppendRawBytes(key);
key_bytes.AppendValueType(ValueType::kHybridTime);
key_bytes.AppendRawBytes(encoded_doc_ht);
return key_bytes;
}

void SeekPastSubKey(const Slice& key, rocksdb::Iterator* iter) {
SeekForward(AppendDocHt(key, DocHybridTime::kMin), iter);
}
Expand Down
1 change: 0 additions & 1 deletion src/yb/docdb/docdb_rocksdb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ void SeekPastSubKey(const Slice& key, rocksdb::Iterator* iter);
void SeekOutOfSubKey(const Slice& key, rocksdb::Iterator* iter);

KeyBytes AppendDocHt(const Slice& key, const DocHybridTime& doc_ht);
KeyBytes AppendEncodedDocHt(const Slice& key, const Slice& encoded_doc_ht);

// A wrapper around the RocksDB seek operation that uses Next() up to the configured number of
// times to avoid invalidating iterator state. In debug mode it also allows printing detailed
Expand Down
1 change: 1 addition & 0 deletions src/yb/docdb/docrowwiseiterator-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ SubDocKey(DocKey([], ["row2", 22222]), [ColumnId(40); HT{ physical: 1000 }]) ->
IntentAwareIterator iter(
rocksdb(), rocksdb::ReadOptions(), ReadHybridTime::FromMicros(1000), boost::none);
iter.Seek(DocKey());
ASSERT_TRUE(iter.valid());
Result<Slice> key = iter.FetchKey();
ASSERT_OK(key);
SubDocKey subdoc_key;
Expand Down
68 changes: 50 additions & 18 deletions src/yb/docdb/intent_aware_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <future>
#include <thread>
#include <boost/optional/optional_io.hpp>
#include <boost/scope_exit.hpp>

#include "yb/common/doc_hybrid_time.h"
#include "yb/common/hybrid_time.h"
Expand All @@ -41,6 +42,7 @@ void GetIntentPrefixForKeyWithoutHt(const Slice& key, KeyBytes* out) {
out->Clear();
// Since caller guarantees that key_bytes doesn't have hybrid time, we can simply prepend
// kIntentPrefix in order to get prefix for all related intents.
out->Reserve(key.size() + 1);
out->AppendValueType(ValueType::kIntentPrefix);
out->AppendRawBytes(key);
}
Expand All @@ -51,6 +53,11 @@ KeyBytes GetIntentPrefixForKeyWithoutHt(const Slice& key) {
return result;
}

void AppendEncodedDocHt(const Slice& encoded_doc_ht, KeyBytes* key_bytes) {
key_bytes->AppendValueType(ValueType::kHybridTime);
key_bytes->AppendRawBytes(encoded_doc_ht);
}

} // namespace

// For locally committed transactions returns commit time if committed at specified time or
Expand Down Expand Up @@ -187,7 +194,7 @@ bool IsIntentForTheSameKey(const Slice& key, const Slice& intent_prefix) {

std::string DebugDumpKeyToStr(const Slice &key) {
SubDocKey key_decoded;
DCHECK(key_decoded.FullyDecodeFrom(key).ok());
CHECK(key_decoded.FullyDecodeFrom(key).ok());
return key.ToDebugHexString() + " (" + key_decoded.ToString() + ")";
}

Expand Down Expand Up @@ -243,7 +250,7 @@ void IntentAwareIterator::Seek(const Slice& key) {
}

ROCKSDB_SEEK(iter_.get(), key);
SkipFutureRecords();
skip_future_records_needed_ = true;
if (intent_iter_) {
status_ = SetIntentUpperbound();
if (!status_.ok()) {
Expand All @@ -255,6 +262,15 @@ void IntentAwareIterator::Seek(const Slice& key) {
}

void IntentAwareIterator::SeekForward(const Slice& key) {
KeyBytes key_bytes(key);
SeekForward(&key_bytes);
}

void IntentAwareIterator::SeekForward(KeyBytes* key_bytes) {
const auto key = key_bytes->AsSlice();
BOOST_SCOPE_EXIT(key_bytes, &key) {
key_bytes->Truncate(key.size());
} BOOST_SCOPE_EXIT_END;
VLOG(4) << "SeekForward(" << SubDocKey::DebugSliceToString(key) << ")";
DOCDB_DEBUG_SCOPE_LOG(
SubDocKey::DebugSliceToString(key),
Expand All @@ -263,15 +279,15 @@ void IntentAwareIterator::SeekForward(const Slice& key) {
return;
}

auto key_bytes = AppendEncodedDocHt(key, encoded_read_time_global_limit_);
SeekForwardRegular(key_bytes);
AppendEncodedDocHt(encoded_read_time_global_limit_, key_bytes);
SeekForwardRegular(*key_bytes);
if (intent_iter_ && status_.ok()) {
status_ = SetIntentUpperbound();
if (!status_.ok()) {
return;
}
GetIntentPrefixForKeyWithoutHt(key, &key_bytes);
SeekForwardToSuitableIntent(key_bytes);
GetIntentPrefixForKeyWithoutHt(key, &seek_key_buffer_);
SeekForwardToSuitableIntent(seek_key_buffer_);
}
}

Expand All @@ -282,7 +298,7 @@ void IntentAwareIterator::SeekPastSubKey(const Slice& key) {
}

docdb::SeekPastSubKey(key, iter_.get());
SkipFutureRecords();
skip_future_records_needed_ = true;
if (intent_iter_ && status_.ok()) {
status_ = SetIntentUpperbound();
if (!status_.ok()) {
Expand Down Expand Up @@ -359,6 +375,14 @@ void IntentAwareIterator::PrevDocKey(const DocKey& doc_key) {
}

bool IntentAwareIterator::valid() {
if (skip_future_records_needed_) {
SkipFutureRecords();
skip_future_records_needed_ = false;
}
if (skip_future_intents_needed_) {
SkipFutureIntents();
skip_future_intents_needed_ = false;
}
return !status_.ok() || iter_valid_ || resolved_intent_state_ == ResolvedIntentState::kValid;
}

Expand Down Expand Up @@ -402,7 +426,7 @@ Slice IntentAwareIterator::value() {
void IntentAwareIterator::SeekForwardRegular(const Slice& slice) {
VLOG(4) << "SeekForwardRegular(" << SubDocKey::DebugSliceToString(slice) << ")";
docdb::SeekForward(slice, iter_.get());
SkipFutureRecords();
skip_future_records_needed_ = true;
}

void IntentAwareIterator::ProcessIntent() {
Expand All @@ -427,6 +451,7 @@ void IntentAwareIterator::ProcessIntent() {
if (resolved_intent_state_ == ResolvedIntentState::kNoIntent) {
resolved_intent_key_prefix_.Reset(decode_result->intent_prefix);
auto prefix = prefix_stack_.empty() ? Slice() : prefix_stack_.back();
decode_result->intent_prefix.consume_byte(static_cast<char>(ValueType::kIntentPrefix));
resolved_intent_state_ =
decode_result->intent_prefix.starts_with(prefix) ? ResolvedIntentState::kValid
: ResolvedIntentState::kInvalidPrefix;
Expand Down Expand Up @@ -479,7 +504,7 @@ void IntentAwareIterator::SeekForwardToSuitableIntent() {
!IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)) {
break;
}
intent_key.consume_byte();
intent_key.consume_byte(static_cast<char>(ValueType::kIntentPrefix));
if (!intent_key.starts_with(prefix)) {
break;
}
Expand Down Expand Up @@ -553,9 +578,16 @@ Status IntentAwareIterator::FindLastWriteTime(
}
}

{
SeekForwardRegular(AppendEncodedDocHt(key_without_ht, encoded_read_time_global_limit_));
RETURN_NOT_OK(status_);
seek_key_buffer_.Reserve(key_without_ht.size() + encoded_read_time_global_limit_.size() + 1);
seek_key_buffer_.Reset(key_without_ht);
AppendEncodedDocHt(encoded_read_time_global_limit_, &seek_key_buffer_);
SeekForwardRegular(seek_key_buffer_);
RETURN_NOT_OK(status_);

// After SeekForwardRegular(), we need to call valid() to skip future records and see if the
// current key still matches the pushed prefix if any. If it does not, we are done.
if (!valid()) {
return Status::OK();
}

DocHybridTime doc_ht;
Expand Down Expand Up @@ -590,14 +622,14 @@ Status IntentAwareIterator::FindLastWriteTime(
void IntentAwareIterator::PushPrefix(const Slice& prefix) {
VLOG(4) << "PushPrefix: " << SubDocKey::DebugSliceToString(prefix);
prefix_stack_.push_back(prefix);
SkipFutureRecords();
SkipFutureIntents();
skip_future_records_needed_ = true;
skip_future_intents_needed_ = true;
}

void IntentAwareIterator::PopPrefix() {
prefix_stack_.pop_back();
SkipFutureRecords();
SkipFutureIntents();
skip_future_records_needed_ = true;
skip_future_intents_needed_ = true;
VLOG(4) << "PopPrefix: "
<< (prefix_stack_.empty() ? std::string()
: SubDocKey::DebugSliceToString(prefix_stack_.back()));
Expand Down Expand Up @@ -626,7 +658,7 @@ void IntentAwareIterator::SkipFutureRecords() {
}
encoded_doc_ht.remove_prefix(encoded_doc_ht.size() - doc_ht_size);
auto value = iter_->value();
auto value_type = static_cast<ValueType>(value[0]);
auto value_type = DecodeValueType(value);
if (value_type == ValueType::kHybridTime) {
// Value came from a transaction, we could try to filter it by original intent time.
Slice encoded_intent_doc_ht = value;
Expand Down Expand Up @@ -655,7 +687,7 @@ void IntentAwareIterator::SkipFutureIntents() {
VLOG(4) << "Checking resolved intent: " << resolved_intent_key_prefix_.ToString()
<< ", against new prefix: " << prefix.ToDebugHexString();
auto resolved_intent_key_prefix = resolved_intent_key_prefix_.AsSlice();
resolved_intent_key_prefix.consume_byte();
resolved_intent_key_prefix.consume_byte(static_cast<char>(ValueType::kIntentPrefix));
auto compare_result = resolved_intent_key_prefix.compare_prefix(prefix);
if (compare_result == 0) {
resolved_intent_state_ = ResolvedIntentState::kValid;
Expand Down
16 changes: 12 additions & 4 deletions src/yb/docdb/intent_aware_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ class IntentAwareIterator {
void Seek(const Slice& key);

// Seek forward to specified encoded key (it is responsibility of caller to make sure it
// doesn't have hybrid time).
// doesn't have hybrid time). For efficiency, the method that takes a non-const KeyBytes pointer
// avoids memory allocation by using the KeyBytes buffer to prepare the key to seek to, and may
// append up to kMaxBytesPerEncodedHybridTime + 1 bytes of data to the buffer. The appended data
// is removed when the method returns.
void SeekForward(const Slice& key);
void SeekForward(KeyBytes* key);

// Seek past specified subdoc key (it is responsibility of caller to make sure it
// doesn't have hybrid time).
Expand All @@ -109,9 +113,7 @@ class IntentAwareIterator {
// provided
void PrevDocKey(const DocKey& doc_key);

// Adds new value to prefix stack. The top value of this stack is used to filter
// returned entries. After seek we check whether currently pointed value has active prefix.
// If not, than it means that we are out of range of interest and iterator becomes invalid.
// Adds new value to prefix stack. The top value of this stack is used to filter returned entries.
void PushPrefix(const Slice& prefix);

// Removes top value from prefix stack. This iteration could became valid after poping prefix,
Expand Down Expand Up @@ -217,6 +219,12 @@ class IntentAwareIterator {
KeyBytes resolved_intent_value_;
std::vector<Slice> prefix_stack_;
TransactionStatusCache transaction_status_cache_;

bool skip_future_records_needed_ = false;
bool skip_future_intents_needed_ = false;

// Reusable buffer to prepare seek key to avoid reallocating temporary buffers in critical paths.
KeyBytes seek_key_buffer_;
};

// Utility class that controls stack of prefixes in IntentAwareIterator.
Expand Down
5 changes: 5 additions & 0 deletions src/yb/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ class Slice {
return *begin_++;
}

void consume_byte(char c) {
char consumed = consume_byte();
DCHECK_EQ(consumed, c);
}

// Checks that this slice has size() = 'expected_size' and returns
// STATUS(Corruption, ) otherwise.
Status check_size(size_t expected_size) const;
Expand Down

0 comments on commit 158def7

Please sign in to comment.