Skip to content

Commit

Permalink
schemeboard: pass describe-result as an opaque payload (#2083)
Browse files Browse the repository at this point in the history
  • Loading branch information
ijon authored Feb 29, 2024
1 parent ecdcbb4 commit 3819aed
Show file tree
Hide file tree
Showing 25 changed files with 788 additions and 401 deletions.
3 changes: 2 additions & 1 deletion ydb/core/protos/flat_tx_scheme.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,13 @@ message TEvDescribeSchemeResult {
optional string Reason = 2;
optional string Path = 3;
optional NKikimrSchemeOp.TPathDescription PathDescription = 4;
optional fixed64 PathOwner = 5;
optional fixed64 DEPRECATED_PathOwner = 5; // replaced by PathOwnerId
optional fixed64 PathId = 6;

optional string LastExistedPrefixPath = 7;
optional fixed64 LastExistedPrefixPathId = 8;
optional NKikimrSchemeOp.TPathDescription LastExistedPrefixDescription = 9;

optional fixed64 PathOwnerId = 10;
}

Expand Down
82 changes: 71 additions & 11 deletions ydb/core/protos/scheme_board.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import "ydb/core/protos/flat_tx_scheme.proto";
import "ydb/core/scheme/protos/pathid.proto";

package NKikimrSchemeBoard;
option java_package = "ru.yandex.kikimr.proto";
Expand All @@ -13,22 +13,76 @@ message TEvHandshake {
optional uint64 Generation = 2;
}

// here and below
// Owner is the tablet id of schemeshard witch holds the records
// LocalPathId is a second part of TPathId
// PathOwnerId is a first part of TPathId
// Here and below.
// Owner is the tablet id of schemeshard which holds the records.
// (PathOwnerId, LocalPathId) constitute TPathId of the object.

// TEvUpdate.DescribeSchemeResultSerialized is a NKikimrScheme.TEvDescribeSchemeResult
// in the form of opaque payload.
// Originally, that field existed as a properly typed TEvDescribeSchemeResult message.
// However, that induce additional overhead to serialize and deserialize this message
// when transferring over wire.
// This performance cost is usually either negligible or imperceptible.
// But in specific situations, particularly when rapidly updating partitioning information
// for tables with huge number of shards, this overhead could lead to significant issues.
// Schemeboard replicas could get overloaded and become unresponsive to further requests.
// This is problematic, especially considering the schemeboard subsystem's critical role
// in servicing all databases within a cluster, making it a Single Point of Failure (SPOF).
//
// The core realization is that the schemeboard components do not require the full content of
// a TEvDescribeSchemeResult message to operate efficiently. Instead, only a limited set of
// fields (path, path-id, version and info about subdomain/database) is required for processing.
// And a whole TEvDescribeSchemeResult could be passed through as an opaque payload.
//
// Type change from TEvDescribeSchemeResult to (repeated) bytes without changing field number
// is a safe move. Actual value of the field remains unchanged at the wire-format level.
// Thus, older implementations will interpret the payload as a TEvDescribeSchemeResult message
// and proceed with deserialization as usual. And newer implementations will recognize the data
// as a binary blob and will deserialize it explicitly only when necessary.
//
// Note that the `repeated` label for the `DescribeSchemeResultSerialized` field is essential
// to remain backward-compatible with the previous implementation. This is because even if
// DescribeSchemeResult previously was labeled `optional` but actual value used at
// the wire-format level was (and is) a pack of TEvDescribeSchemeResult messages.
// Automerge of consecutive messages for the same field is a prominent feature of the protobuf.
// Schemeshard use that feature to supply full TEvDescribeSchemeResult as a sequence of
// partially filled TEvDescribeSchemeResult's.
//
// - Path
// - PathOwnerId, LocalPathId
// - PathDirEntryPathVersion
// - PathSubdomainPathId
// - PathAbandonedTenantsSchemeShards
// are taken from the original TEvDescribeSchemeResult (one way or another).
//
message TEvUpdate {
optional uint64 Owner = 1;
optional uint64 Generation = 2;
optional TLocalPathIdRange DeletedLocalPathIds = 3;
optional string Path = 4;
optional uint64 LocalPathId = 5;

optional string Path = 4; // extracted from DescribeSchemeResult.Path
optional uint64 LocalPathId = 5; // extracted from DescribeSchemeResult.PathId

optional bool IsDeletion = 6 [default = false];
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 7;

repeated bytes DescribeSchemeResultSerialized = 7;

optional bool NeedAck = 8 [default = false];
optional uint64 PathOwnerId = 9;

optional uint64 PathOwnerId = 9; // extracted from DescribeSchemeResult.PathOwnerId, DescribeSchemeResult.PathDescription.Self.SchemeshardId in order of presence

optional TLocalPathIdRange MigratedLocalPathIds = 10;

// Explicit values extracted from DescribeSchemeResultSerialized

// DescribeSchemeResult.PathDescription.Self.PathVersion
optional uint64 PathDirEntryPathVersion = 11;

// DescribeSchemeResult.PathDescription.DomainDescription.DomainKey
optional NKikimrProto.TPathID PathSubdomainPathId = 13;

// DescribeSchemeResult.PathDescription.AbandonedTenantsSchemeShards
repeated uint64 PathAbandonedTenantsSchemeShards = 14;
}

message TEvUpdateAck {
Expand Down Expand Up @@ -65,16 +119,22 @@ message TEvUnsubscribe {
optional uint64 LocalPathId = 3;
}

// See comments for TEvUpdate.
message TEvNotify {
optional string Path = 1;
// and/or
optional uint64 PathOwnerId = 2;
optional uint64 LocalPathId = 3;
// common fields
optional bool IsDeletion = 4 [default = false];
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 5;
optional uint64 Version = 6;

optional bytes DescribeSchemeResultSerialized = 5;

optional uint64 Version = 6; // same as TEvUpdate.PathDirEntryPathVersion
optional bool Strong = 7 [default = false];

optional NKikimrProto.TPathID PathSubdomainPathId = 8;
repeated uint64 PathAbandonedTenantsSchemeShards = 9;
}

message TEvNotifyAck {
Expand Down
43 changes: 22 additions & 21 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database));
}
};
};

class TTestTopicEnv: public TTestEnv<TTestTopicEnv, NYdb::NTopic::TTopicClient> {
public:
Expand All @@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
static THolder<NYdb::NTopic::TTopicClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
return MakeHolder<NYdb::NTopic::TTopicClient>(driver, NYdb::NTopic::TTopicClientSettings().Database(database));
}
};
};

TShardedTableOptions SimpleTable() {
return TShardedTableOptions()
Expand Down Expand Up @@ -1344,7 +1344,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{},"key":[1]})",
R"({"update":{},"key":[2]})",
R"({"update":{},"key":[3]})",
Expand All @@ -1360,7 +1360,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1376,7 +1376,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{"value":10},"key":[1]})",
R"({"update":{"value":20},"key":[2]})",
R"({"update":{"value":30},"key":[3]})",
Expand All @@ -1397,7 +1397,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{},"newImage":{"value":10},"key":[1]})",
R"({"update":{},"newImage":{"value":20},"key":[2]})",
R"({"update":{},"newImage":{"value":30},"key":[3]})",
Expand All @@ -1421,7 +1421,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("c", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("c", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("c", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1445,7 +1445,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1456,7 +1456,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(NewImageLogDebezium) {
Y_UNIT_TEST(NewImageLogDebezium) {
TopicRunner::Read(SimpleTable(), NewImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
Expand All @@ -1469,7 +1469,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1486,7 +1486,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(1, 10),
(2, 20),
(3, 30);
)"}, {
)"}, {
R"({"update":{},"key":[1],"ts":"***"})",
R"({"update":{},"key":[2],"ts":"***"})",
R"({"update":{},"key":[3],"ts":"***"})",
Expand All @@ -1512,7 +1512,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
);
)"}, {
)"}, {
WriteJson(NJson::TJsonMap({
{"awsRegion", ""},
{"dynamodb", NJson::TJsonMap({
Expand Down Expand Up @@ -1541,7 +1541,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
);
)", R"(
DELETE FROM `/Root/Table` WHERE __Hash = 1;
)"}, {
)"}, {
WriteJson(NJson::TJsonMap({
{"awsRegion", ""},
{"dynamodb", NJson::TJsonMap({
Expand Down Expand Up @@ -1639,7 +1639,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(1, 0.0%s/0.0%s),
(2, 1.0%s/0.0%s),
(3, -1.0%s/0.0%s);
)", s, s, s, s, s, s)}, {
)", s, s, s, s, s, s)}, {
R"({"update":{"value":"nan"},"key":[1]})",
R"({"update":{"value":"inf"},"key":[2]})",
R"({"update":{"value":"-inf"},"key":[3]})",
Expand Down Expand Up @@ -1674,7 +1674,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
TopicRunner::Read(table, KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf(R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
("%s", 1);
)", key.c_str())}, {
)", key.c_str())}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())}}},
});
}
Expand Down Expand Up @@ -2043,7 +2043,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
UPSERT INTO `/Root/TableAux` (key, value)
VALUES (1, 10);
)");
)");

SetSplitMergePartCountLimit(&runtime, -1);
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
Expand Down Expand Up @@ -2292,7 +2292,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);

WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));

// execute on old partitions
Expand Down Expand Up @@ -2376,7 +2376,8 @@ Y_UNIT_TEST_SUITE(Cdc) {

case TSchemeBoardEvents::EvUpdate:
if (auto* msg = ev->Get<TSchemeBoardEvents::TEvUpdate>()) {
const auto desc = msg->GetRecord().GetDescribeSchemeResult();
NKikimrScheme::TEvDescribeSchemeResult desc;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(desc, *msg->GetRecord().GetDescribeSchemeResultSerialized().begin()));
if (desc.GetPath() == "/Root/Table/Stream" && desc.GetPathDescription().GetSelf().GetCreateFinished()) {
delayed.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -2446,7 +2447,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
UPSERT INTO `/Root/Table` (key, value)
VALUES (1, 10);
)");
)");

SetSplitMergePartCountLimit(&runtime, -1);
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
Expand Down Expand Up @@ -3266,7 +3267,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);

WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));

// merge
Expand Down Expand Up @@ -3298,7 +3299,7 @@ template <>
void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) {
output << x.first << ":" << x.second;
}

void AppendToString(TString& dst, const std::pair<TString, TString>& x) {
TStringOutput output(dst);
output << x;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/scheme_board/cache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class TCacheTest: public TTestWithSchemeshard {
" Kind: \"pool-kind-1\" "
"} "
" Name: \"Root\" ");

// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_DEBUG);
}

UNIT_TEST_SUITE(TCacheTest);
Expand Down
Loading

0 comments on commit 3819aed

Please sign in to comment.