Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schemeboard: pass describe-result as an opaque payload #2391

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/protos/flat_tx_scheme.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,13 @@ message TEvDescribeSchemeResult {
optional string Reason = 2;
optional string Path = 3;
optional NKikimrSchemeOp.TPathDescription PathDescription = 4;
optional fixed64 PathOwner = 5;
optional fixed64 DEPRECATED_PathOwner = 5; // replaced by PathOwnerId
optional fixed64 PathId = 6;

optional string LastExistedPrefixPath = 7;
optional fixed64 LastExistedPrefixPathId = 8;
optional NKikimrSchemeOp.TPathDescription LastExistedPrefixDescription = 9;

optional fixed64 PathOwnerId = 10;
}

Expand Down
82 changes: 71 additions & 11 deletions ydb/core/protos/scheme_board.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import "ydb/core/protos/flat_tx_scheme.proto";
import "ydb/core/scheme/protos/pathid.proto";

package NKikimrSchemeBoard;
option java_package = "ru.yandex.kikimr.proto";
Expand All @@ -13,22 +13,76 @@ message TEvHandshake {
optional uint64 Generation = 2;
}

// here and below
// Owner is the tablet id of schemeshard witch holds the records
// LocalPathId is a second part of TPathId
// PathOwnerId is a first part of TPathId
// Here and below.
// Owner is the tablet id of schemeshard which holds the records.
// (PathOwnerId, LocalPathId) constitute TPathId of the object.

// TEvUpdate.DescribeSchemeResultSerialized is a NKikimrScheme.TEvDescribeSchemeResult
// in the form of opaque payload.
// Originally, that field existed as a properly typed TEvDescribeSchemeResult message.
// However, that induce additional overhead to serialize and deserialize this message
// when transferring over wire.
// This performance cost is usually either negligible or imperceptible.
// But in specific situations, particularly when rapidly updating partitioning information
// for tables with huge number of shards, this overhead could lead to significant issues.
// Schemeboard replicas could get overloaded and become unresponsive to further requests.
// This is problematic, especially considering the schemeboard subsystem's critical role
// in servicing all databases within a cluster, making it a Single Point of Failure (SPOF).
//
// The core realization is that the schemeboard components do not require the full content of
// a TEvDescribeSchemeResult message to operate efficiently. Instead, only a limited set of
// fields (path, path-id, version and info about subdomain/database) is required for processing.
// And a whole TEvDescribeSchemeResult could be passed through as an opaque payload.
//
// Type change from TEvDescribeSchemeResult to (repeated) bytes without changing field number
// is a safe move. Actual value of the field remains unchanged at the wire-format level.
// Thus, older implementations will interpret the payload as a TEvDescribeSchemeResult message
// and proceed with deserialization as usual. And newer implementations will recognize the data
// as a binary blob and will deserialize it explicitly only when necessary.
//
// Note that the `repeated` label for the `DescribeSchemeResultSerialized` field is essential
// to remain backward-compatible with the previous implementation. This is because even if
// DescribeSchemeResult previously was labeled `optional` but actual value used at
// the wire-format level was (and is) a pack of TEvDescribeSchemeResult messages.
// Automerge of consecutive messages for the same field is a prominent feature of the protobuf.
// Schemeshard use that feature to supply full TEvDescribeSchemeResult as a sequence of
// partially filled TEvDescribeSchemeResult's.
//
// - Path
// - PathOwnerId, LocalPathId
// - PathDirEntryPathVersion
// - PathSubdomainPathId
// - PathAbandonedTenantsSchemeShards
// are taken from the original TEvDescribeSchemeResult (one way or another).
//
message TEvUpdate {
optional uint64 Owner = 1;
optional uint64 Generation = 2;
optional TLocalPathIdRange DeletedLocalPathIds = 3;
optional string Path = 4;
optional uint64 LocalPathId = 5;

optional string Path = 4; // extracted from DescribeSchemeResult.Path
optional uint64 LocalPathId = 5; // extracted from DescribeSchemeResult.PathId

optional bool IsDeletion = 6 [default = false];
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 7;

repeated bytes DescribeSchemeResultSerialized = 7;

optional bool NeedAck = 8 [default = false];
optional uint64 PathOwnerId = 9;

optional uint64 PathOwnerId = 9; // extracted from DescribeSchemeResult.PathOwnerId, DescribeSchemeResult.PathDescription.Self.SchemeshardId in order of presence

optional TLocalPathIdRange MigratedLocalPathIds = 10;

// Explicit values extracted from DescribeSchemeResultSerialized

// DescribeSchemeResult.PathDescription.Self.PathVersion
optional uint64 PathDirEntryPathVersion = 11;

// DescribeSchemeResult.PathDescription.DomainDescription.DomainKey
optional NKikimrProto.TPathID PathSubdomainPathId = 13;

// DescribeSchemeResult.PathDescription.AbandonedTenantsSchemeShards
repeated uint64 PathAbandonedTenantsSchemeShards = 14;
}

message TEvUpdateAck {
Expand Down Expand Up @@ -65,16 +119,22 @@ message TEvUnsubscribe {
optional uint64 LocalPathId = 3;
}

// See comments for TEvUpdate.
message TEvNotify {
optional string Path = 1;
// and/or
optional uint64 PathOwnerId = 2;
optional uint64 LocalPathId = 3;
// common fields
optional bool IsDeletion = 4 [default = false];
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 5;
optional uint64 Version = 6;

optional bytes DescribeSchemeResultSerialized = 5;

optional uint64 Version = 6; // same as TEvUpdate.PathDirEntryPathVersion
optional bool Strong = 7 [default = false];

optional NKikimrProto.TPathID PathSubdomainPathId = 8;
repeated uint64 PathAbandonedTenantsSchemeShards = 9;
}

message TEvNotifyAck {
Expand Down
43 changes: 22 additions & 21 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database));
}
};
};

class TTestTopicEnv: public TTestEnv<TTestTopicEnv, NYdb::NTopic::TTopicClient> {
public:
Expand All @@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
static THolder<NYdb::NTopic::TTopicClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
return MakeHolder<NYdb::NTopic::TTopicClient>(driver, NYdb::NTopic::TTopicClientSettings().Database(database));
}
};
};

TShardedTableOptions SimpleTable() {
return TShardedTableOptions()
Expand Down Expand Up @@ -1344,7 +1344,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{},"key":[1]})",
R"({"update":{},"key":[2]})",
R"({"update":{},"key":[3]})",
Expand All @@ -1360,7 +1360,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1376,7 +1376,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{"value":10},"key":[1]})",
R"({"update":{"value":20},"key":[2]})",
R"({"update":{"value":30},"key":[3]})",
Expand All @@ -1397,7 +1397,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{},"newImage":{"value":10},"key":[1]})",
R"({"update":{},"newImage":{"value":20},"key":[2]})",
R"({"update":{},"newImage":{"value":30},"key":[3]})",
Expand All @@ -1421,7 +1421,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("c", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("c", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("c", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1445,7 +1445,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1456,7 +1456,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(NewImageLogDebezium) {
Y_UNIT_TEST(NewImageLogDebezium) {
TopicRunner::Read(SimpleTable(), NewImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
Expand All @@ -1469,7 +1469,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1486,7 +1486,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(1, 10),
(2, 20),
(3, 30);
)"}, {
)"}, {
R"({"update":{},"key":[1],"ts":"***"})",
R"({"update":{},"key":[2],"ts":"***"})",
R"({"update":{},"key":[3],"ts":"***"})",
Expand All @@ -1512,7 +1512,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
);
)"}, {
)"}, {
WriteJson(NJson::TJsonMap({
{"awsRegion", ""},
{"dynamodb", NJson::TJsonMap({
Expand Down Expand Up @@ -1541,7 +1541,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
);
)", R"(
DELETE FROM `/Root/Table` WHERE __Hash = 1;
)"}, {
)"}, {
WriteJson(NJson::TJsonMap({
{"awsRegion", ""},
{"dynamodb", NJson::TJsonMap({
Expand Down Expand Up @@ -1639,7 +1639,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(1, 0.0%s/0.0%s),
(2, 1.0%s/0.0%s),
(3, -1.0%s/0.0%s);
)", s, s, s, s, s, s)}, {
)", s, s, s, s, s, s)}, {
R"({"update":{"value":"nan"},"key":[1]})",
R"({"update":{"value":"inf"},"key":[2]})",
R"({"update":{"value":"-inf"},"key":[3]})",
Expand Down Expand Up @@ -1674,7 +1674,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
TopicRunner::Read(table, KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf(R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
("%s", 1);
)", key.c_str())}, {
)", key.c_str())}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())}}},
});
}
Expand Down Expand Up @@ -2043,7 +2043,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
UPSERT INTO `/Root/TableAux` (key, value)
VALUES (1, 10);
)");
)");

SetSplitMergePartCountLimit(&runtime, -1);
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
Expand Down Expand Up @@ -2292,7 +2292,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);

WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));

// execute on old partitions
Expand Down Expand Up @@ -2376,7 +2376,8 @@ Y_UNIT_TEST_SUITE(Cdc) {

case TSchemeBoardEvents::EvUpdate:
if (auto* msg = ev->Get<TSchemeBoardEvents::TEvUpdate>()) {
const auto desc = msg->GetRecord().GetDescribeSchemeResult();
NKikimrScheme::TEvDescribeSchemeResult desc;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(desc, *msg->GetRecord().GetDescribeSchemeResultSerialized().begin()));
if (desc.GetPath() == "/Root/Table/Stream" && desc.GetPathDescription().GetSelf().GetCreateFinished()) {
delayed.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -2446,7 +2447,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
UPSERT INTO `/Root/Table` (key, value)
VALUES (1, 10);
)");
)");

SetSplitMergePartCountLimit(&runtime, -1);
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
Expand Down Expand Up @@ -3266,7 +3267,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);

WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));

// merge
Expand Down Expand Up @@ -3298,7 +3299,7 @@ template <>
void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) {
output << x.first << ":" << x.second;
}

void AppendToString(TString& dst, const std::pair<TString, TString>& x) {
TStringOutput output(dst);
output << x;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/scheme_board/cache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class TCacheTest: public TTestWithSchemeshard {
" Kind: \"pool-kind-1\" "
"} "
" Name: \"Root\" ");

// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_DEBUG);
}

UNIT_TEST_SUITE(TCacheTest);
Expand Down
Loading
Loading