Skip to content

Commit

Permalink
[BACKPORT 2024.1][yugabyte#19342] docdb: Implement backward scan supp…
Browse files Browse the repository at this point in the history
…ort for GetTableKeyRanges API

Summary:
- Implemented backward scan support for GetTableKeyRanges API.
- Tablet::GetTabletKeyRanges - limited to colocated range-sharded tables for now
- Updated QLTabletRf1TestToggleEnablePackedRow.GetTabletKeyRanges, PggateTestSelect.GetColocatedTableKeyRanges tests
- Disabled PggateTestSelect.GetTableKeyRanges till we support non-colocated use case
- Added rocksdb::Iterator::KeyDebugHexString()
- Removed GetTabletKeyRangesEmbeddedRequestPB.is_forward

**Upgrade/Rollback safety:**
`GetTabletKeyRangesEmbeddedRequestPB` only affects RPCs within the same node between pggate and local tserver, so it is safe to change.
Jira: DB-8144

Original commit: cb302e0 / D32519

Test Plan: QLTabletRf1TestToggleEnablePackedRow.GetTabletKeyRanges, PggateTestSelect.GetColocatedTableKeyRanges

Reviewers: arybochkin, rthallam

Reviewed By: rthallam

Subscribers: ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D34696
  • Loading branch information
ttyusupov committed May 3, 2024
1 parent 264a63a commit 41f6c17
Show file tree
Hide file tree
Showing 11 changed files with 601 additions and 320 deletions.
214 changes: 137 additions & 77 deletions src/yb/client/ql-tablet-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
#include "yb/util/shared_lock.h"
#include "yb/util/status_format.h"
#include "yb/util/stopwatch.h"
#include "yb/util/test_thread_holder.h"
#include "yb/util/tsan_util.h"

#include "yb/yql/cql/ql/util/statement_result.h"
Expand Down Expand Up @@ -1940,33 +1941,44 @@ namespace {

Status CalcKeysDistributionAcrossWorkers(
tablet::Tablet* tablet, std::vector<std::string> range_end_keys, const size_t num_workers,
const double min_max_keys_ratio_limit) {
const double min_max_keys_ratio_limit, tablet::Direction direction) {
std::vector<size_t> keys_per_worker(num_workers);

auto iter = CreateRocksDBIterator(
tablet->doc_db().regular, tablet->doc_db().key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, rocksdb::kDefaultQueryId);
iter.SeekToFirst();
if (direction == tablet::Direction::kForward) {
iter.SeekToFirst();
} else {
iter.SeekToLast();
}
std::string current_range_start_key;

auto range_key_iter = range_end_keys.begin();
size_t current_range_idx = 0;
size_t entries_in_current_range = 0;
while (iter.Valid() || current_range_idx < range_end_keys.size()) {
while (current_range_idx < range_end_keys.size() &&
(!iter.Valid() || (!range_end_keys[current_range_idx].empty() &&
iter.key() >= range_end_keys[current_range_idx]))) {
while (iter.Valid() || range_key_iter != range_end_keys.end()) {
while (range_key_iter != range_end_keys.end() &&
(!iter.Valid() || (!range_key_iter->empty() && (direction == tablet::Direction::kForward
? iter.key() >= *range_key_iter
: iter.key() <= *range_key_iter)))) {
LOG(INFO) << "Range #" << current_range_idx << "["
<< Slice(current_range_start_key).ToDebugHexString() << ", "
<< Slice(range_end_keys[current_range_idx]).ToDebugHexString() << ") has "
<< entries_in_current_range << " rocksdb records";
<< Slice(*range_key_iter).ToDebugHexString() << ") has " << entries_in_current_range
<< " rocksdb records";
keys_per_worker[current_range_idx % num_workers] += entries_in_current_range;
current_range_start_key = range_end_keys[current_range_idx];
current_range_start_key = *range_key_iter;
++range_key_iter;
++current_range_idx;
entries_in_current_range = 0;
}
if (iter.Valid()) {
++entries_in_current_range;
iter.Next();
if (direction == tablet::Direction::kForward) {
iter.Next();
} else {
iter.Prev();
}
}
}
RETURN_NOT_OK(iter.status());
Expand All @@ -1989,27 +2001,62 @@ Status CalcKeysDistributionAcrossWorkers(
} // namespace

TEST_P(QLTabletRf1TestToggleEnablePackedRow, GetTabletKeyRanges) {
constexpr auto kValueSize = 16;
constexpr auto kWriteBatchSize = 128;
constexpr auto kNumWriteThreads = 2;

constexpr auto kNumSstFiles = 4;
constexpr auto kNumFlushes = 15;

constexpr auto kNumWorkers = 5;
constexpr auto kMinMaxKeysRatioLimit = 0.75;

FLAGS_db_block_size_bytes = 4_KB;
FLAGS_db_write_buffer_size = 200_KB;

TestWorkload workload(cluster_.get());
workload.set_table_name(kTable1Name);
workload.set_write_timeout_millis(30000);
workload.set_num_tablets(1);
workload.set_num_write_threads(2);
workload.set_write_batch_size(1);
workload.set_payload_bytes(16);
workload.Setup();
TableHandle table;

{
YBSchemaBuilder builder;
// TODO(get_table_key_ranges): also test for hash-based sharding as soon as it is properly
// supported by GetTabletKeyRanges.
builder.AddColumn(kKeyColumn)->Type(DataType::INT32)->PrimaryKey()->NotNull();
builder.AddColumn(kValueColumn)->Type(DataType::STRING);
ASSERT_OK(table.Create(kTable1Name, /* num_tablets = */ 1, client_.get(), &builder));
table.NewInsertOp();
}

std::atomic<size_t> num_rows_inserted{0};
TestThreadHolder write_threads;

LOG(INFO) << "Starting workload ...";
Stopwatch s(Stopwatch::ALL_THREADS);
s.start();
workload.Start();

for (auto t = 0; t < kNumWriteThreads; ++t) {
write_threads.AddThreadFunctor(
[this, &stop_requested = write_threads.stop_flag(), &table, &num_rows_inserted] {
auto session = CreateSession();
size_t num_ops_applied = 0;

while (!stop_requested) {
auto insert = table.NewInsertOp();
auto req = insert->mutable_request();
const auto key = RandomUniformInt<int32_t>();

QLAddInt32RangeValue(req, key);
table.AddStringColumnValue(req, kValueColumn, RandomString(kValueSize));
session->Apply(insert);
if (++num_ops_applied == kWriteBatchSize) {
const auto flush_status = session->TEST_FlushAndGetOpsErrors();
ASSERT_OK(flush_status.status);
ASSERT_EQ(flush_status.errors.size(), 0);
num_rows_inserted.fetch_add(num_ops_applied);
num_ops_applied = 0;
}
}
});
}

const auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
ASSERT_EQ(peers.size(), 1);
Expand All @@ -2023,91 +2070,104 @@ TEST_P(QLTabletRf1TestToggleEnablePackedRow, GetTabletKeyRanges) {
std::this_thread::sleep_for(100ms);
}

workload.StopAndJoin();
write_threads.Stop();
s.stop();
LOG(INFO) << "Workload stopped, it took: " << AsString(s.elapsed());

LOG(INFO) << "Rows inserted: " << workload.rows_inserted();
LOG(INFO) << "Rows inserted: " << num_rows_inserted;
LOG(INFO) << "Number of SST files: " << db->GetCurrentVersionNumSSTFiles();
LOG(INFO) << "SST data size: " << db->GetCurrentVersionSstFilesUncompressedSize();

NO_PENDING_FATALS();

const auto range_size_bytes = db->GetCurrentVersionSstFilesUncompressedSize() / 50;

for (uint32_t max_key_length : {1024, 16, 8, 4, 2}) {
LOG(INFO) << "max_key_length: " << max_key_length;
for (auto direction : {tablet::Direction::kForward, tablet::Direction::kBackward}) {
for (uint32_t max_key_length : {1024, 16, 8, 4, 2}) {
LOG(INFO) << "max_key_length: " << max_key_length << " direction: " << AsString(direction);

std::vector<std::string> range_end_keys;
auto add_range_end_key = [&range_end_keys](Slice key) {
LOG(INFO) << "Got range end key: " << key.ToDebugHexString();
range_end_keys.push_back(key.ToBuffer());
};
// List of range end/start keys depending on whether is_forward is true/false.
std::vector<std::string> range_boundary_keys;
auto add_range_boundary_key = [&range_boundary_keys](Slice key) {
LOG(INFO) << "Got range boundary key: " << key.ToDebugHexString();
range_boundary_keys.push_back(key.ToBuffer());
};

ASSERT_OK(tablet->TEST_GetTabletKeyRanges(
Slice(), Slice(), std::numeric_limits<uint64_t>::max(), range_size_bytes,
tablet::IsForward::kTrue, max_key_length, add_range_end_key));
ASSERT_OK(tablet->TEST_GetTabletKeyRanges(
Slice(), Slice(), std::numeric_limits<uint64_t>::max(), range_size_bytes,
direction, max_key_length, add_range_boundary_key));

LOG(INFO) << "Ranges count: " << range_end_keys.size();
LOG(INFO) << "Ranges count: " << range_boundary_keys.size();

// We should have at least 1 range.
ASSERT_GE(range_end_keys.size(), 1);
// We should have at least 1 range.
ASSERT_GE(range_boundary_keys.size(), 1);

ASSERT_EQ(range_boundary_keys.back(), "");
for (size_t i = 0; i + 2 < range_boundary_keys.size(); ++i) {
switch (direction) {
case tablet::Direction::kForward:
ASSERT_LT(range_boundary_keys[i], range_boundary_keys[i + 1]);
continue;
case tablet::Direction::kBackward:
ASSERT_GT(range_boundary_keys[i], range_boundary_keys[i + 1]);
continue;
}
FATAL_INVALID_ENUM_VALUE(tablet::Direction, direction);
}

ASSERT_EQ(range_end_keys.back(), "");
for (size_t i = 0; i + 2 < range_end_keys.size(); ++i) {
ASSERT_LT(range_end_keys[i], range_end_keys[i + 1]);
}
// TODO(get_table_key_ranges): For now we are returning full DocKeys and skipping ones that
// are longer than max_key_length, because truncated DocKeys are not supported as lower/upper
// bounds for scans. So, if DocKeys are longer than max_key_length we can have very uneven
// distribution across ranges/workers.
const auto min_max_keys_ratio_limit = max_key_length > 8 ? kMinMaxKeysRatioLimit : 0;

// TODO(get_table_key_ranges): For now we are returning full DocKeys and skipping ones that
// are longer than max_key_length, because truncated DocKeys are not supported as lower/upper
// bounds for scans. So, if DocKeys are longer than max_key_length we can have very uneven
// distribution across ranges/workers.
const auto min_max_keys_ratio_limit = max_key_length > 8 ? kMinMaxKeysRatioLimit : 0;
ASSERT_OK(CalcKeysDistributionAcrossWorkers(
tablet.get(), range_boundary_keys, kNumWorkers, min_max_keys_ratio_limit, direction));

ASSERT_OK(CalcKeysDistributionAcrossWorkers(
tablet.get(), range_end_keys, kNumWorkers, min_max_keys_ratio_limit));
// Get ranges in multiple batches, verify it covers the whole space.
constexpr auto kNumRangesPerBatch = 5;
std::string start_key = "";

// Get ranges in multiple batches, verify it covers the whole space.
constexpr auto kNumRangesPerBatch = 5;
std::string lower_bound = "";
std::vector<string> range_boundary_keys_from_batches;
for (;;) {
std::vector<string> range_boundary_keys_batch;
LOG(INFO) << "Getting tablet key ranges starting from: "
<< Slice(start_key).ToDebugHexString() << " direction: " << AsString(direction);

std::vector<string> range_end_keys_from_batches;
for (;;) {
std::vector<string> range_end_keys_batch;
LOG(INFO) << "Getting tablet key ranges starting from: "
<< Slice(lower_bound).ToDebugHexString();
ASSERT_OK(tablet->TEST_GetTabletKeyRanges(
direction == tablet::Direction::kForward ? start_key : Slice(),
direction == tablet::Direction::kForward ? Slice() : start_key, kNumRangesPerBatch,
range_size_bytes, direction, max_key_length, [&range_boundary_keys_batch](Slice key) {
LOG(INFO) << "Got range boundary key: " << key.ToDebugHexString();
range_boundary_keys_batch.push_back(key.ToBuffer());
}));

ASSERT_OK(tablet->TEST_GetTabletKeyRanges(
lower_bound, Slice(), kNumRangesPerBatch, range_size_bytes, tablet::IsForward::kTrue,
max_key_length, [&range_end_keys_batch](Slice key) {
LOG(INFO) << "Got range end key: " << key.ToDebugHexString();
range_end_keys_batch.push_back(key.ToBuffer());
}));
ASSERT_LE(range_boundary_keys_batch.size(), kNumRangesPerBatch);

ASSERT_LE(range_end_keys_batch.size(), kNumRangesPerBatch);
// We should have at least 1 range.
ASSERT_GE(range_boundary_keys_batch.size(), 1);

// We should have at least 1 range.
ASSERT_GE(range_end_keys_batch.size(), 1);
for (const auto& key : range_boundary_keys_batch) {
range_boundary_keys_from_batches.push_back(key);
}

for (const auto& key : range_end_keys_batch) {
range_end_keys_from_batches.push_back(key);
}
if (range_boundary_keys_batch.back().empty()) {
// We've reached the end.
break;
}

if (range_end_keys_batch.back().empty()) {
// We've reached the end.
break;
}
ASSERT_EQ(range_boundary_keys_batch.size(), kNumRangesPerBatch);

ASSERT_EQ(range_end_keys_batch.size(), kNumRangesPerBatch);
// Use last returned key as start key for next batch of ranges.
start_key = range_boundary_keys_batch.back();
}

// Use last returned key as lower bound for next batch of ranges.
lower_bound = range_end_keys_batch.back();
ASSERT_OK(CalcKeysDistributionAcrossWorkers(
tablet.get(), range_boundary_keys_from_batches, kNumWorkers, min_max_keys_ratio_limit,
direction));
}

ASSERT_OK(CalcKeysDistributionAcrossWorkers(
tablet.get(), range_end_keys_from_batches, kNumWorkers, min_max_keys_ratio_limit));
}
// TODO(get_table_key_ranges): test getting ranges in reverse order.
}


} // namespace client
} // namespace yb
15 changes: 15 additions & 0 deletions src/yb/client/table_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ Status TableHandle::Create(const YBTableName& table_name,
.schema(&schema)
.num_tablets(num_tablets);

if (schema.num_hash_key_columns() == 0) {
// Setup range key columns for range-sharded tables.
std::vector<std::string> range_column_names;
range_column_names.reserve(schema.num_range_key_columns());
auto& columns = schema.columns();
for (size_t i = 0; i < schema.num_key_columns(); ++i) {
auto& column_schema = columns[i];
CHECK(column_schema.is_key());
if (!column_schema.is_hash_key()) {
range_column_names.push_back(column_schema.name());
}
}
table_creator->set_range_partition_columns(range_column_names);
}

// Setup Index properties.
if (index_info) {
table_creator->indexed_table_id(index_info->indexed_table_id())
Expand Down
3 changes: 1 addition & 2 deletions src/yb/common/pgsql_protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ message GetTabletKeyRangesEmbeddedRequestPB {
optional bytes lower_bound_key = 1;
optional bytes upper_bound_key = 2;
optional uint64 range_size_bytes = 3;
optional bool is_forward = 4 [ default = true ];
optional uint32 max_key_length = 5;
optional uint32 max_key_length = 4;
}

// TODO(neil) The protocol for select needs to be changed accordingly when we introduce and cache
Expand Down
1 change: 1 addition & 0 deletions src/yb/integration-tests/external_mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ class ExternalMiniCluster : public MiniClusterBase {
Result<tserver::GetSplitKeyResponsePB> GetSplitKey(const ExternalTabletServer& ts,
const yb::TabletId& tablet_id, bool fail_on_response_error = true);

// Flushes all tablets if tablets_ids is empty.
Status FlushTabletsOnSingleTServer(
ExternalTabletServer* ts, const std::vector<yb::TabletId> tablet_ids,
tserver::FlushTabletsRequestPB_Operation operation);
Expand Down
4 changes: 4 additions & 0 deletions src/yb/rocksdb/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class Iterator : public Cleanable {
return Entry().Valid();
}

std::string KeyDebugHexString() const {
return Valid() ? key().ToDebugHexString() : "<not valid>";
}

// Same as Valid(), but returns error if there was a read error.
// For hot paths consider using Valid() in a loop and checking status after the loop.
yb::Result<bool> CheckedValid() const {
Expand Down
Loading

0 comments on commit 41f6c17

Please sign in to comment.