Skip to content

Commit

Permalink
Fix bug with broadcasting records (ydb-platform#6585)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jul 12, 2024
1 parent 8e800d6 commit 59f23b0
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 61 deletions.
2 changes: 1 addition & 1 deletion ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class TBaseChangeSender {
Y_ABORT_UNLESS(it != Broadcasting.end());

auto& broadcast = it->second;
if (broadcast.Partitions.contains(partitionId)) {
if (broadcast.CompletedPartitions.contains(partitionId)) {
return false;
}

Expand Down
168 changes: 108 additions & 60 deletions ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,84 +556,87 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}

bool CheckRegistrations(TTestActorRuntime& runtime, NKikimrPQ::TMessageGroupInfo::EState expectedState,
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>& tablePartitions,
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>& topicPartitions)
{
for (const auto& topicPartition : topicPartitions) {
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
{
auto& record = *request->Record.MutablePartitionRequest();
record.SetPartition(topicPartition.GetPartitionId());
auto& cmd = *record.MutableCmdGetMaxSeqNo();
for (const auto& tablePartition : tablePartitions) {
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
}
}

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, topicPartition.GetTabletId(), sender, request.Release());

auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
{
const auto& record = response->Get()->Record.GetPartitionResponse();
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();

UNIT_ASSERT_VALUES_EQUAL(result.size(), tablePartitions.size());
for (const auto& item: result) {
if (item.GetState() != expectedState) {
return false;
}
}
}
}

return true;
}

struct TItem {
TString Path;
ui32 nPartitions;
ui32 ExpectedPartitionCount;
};

void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) {
void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic,
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>* initialTablePartitions = nullptr)
{
auto tableDesc = DescribePath(runtime, table.Path, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions);
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.ExpectedPartitionCount);

auto topicDesc = DescribePrivatePath(runtime, topic.Path);
const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions();
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions);
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.ExpectedPartitionCount);

while (true) {
runtime.SimulateSleep(TDuration::Seconds(1));
bool done = true;

for (ui32 i = 0; i < topic.nPartitions; ++i) {
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
{
auto& record = *request->Record.MutablePartitionRequest();
record.SetPartition(topicPartitions[i].GetPartitionId());
auto& cmd = *record.MutableCmdGetMaxSeqNo();
for (const auto& tablePartition : tablePartitions) {
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
}
}

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release());

auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
{
const auto& record = response->Get()->Record.GetPartitionResponse();
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();

UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions);
for (const auto& item: result) {
done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED;
if (!done) {
break;
}
}
}

if (!done) {
break;
}
}

if (done) {
if (CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED, tablePartitions, topicPartitions)) {
break;
}
}

if (initialTablePartitions) {
UNIT_ASSERT(CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN, *initialTablePartitions, topicPartitions));
}
}

Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
template <typename T>
void SplitTable(const TString& cdcStreamDesc) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
{
TInactiveZone inactive(activeZone);

TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uint32" }
KeyColumnNames: ["key"]
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);

TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}

Expand All @@ -651,16 +654,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
TInactiveZone inactive(activeZone);
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1},
&initialTableDesc.GetPathDescription().GetTablePartitions());
}
});
}

Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
SplitTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
}

Y_UNIT_TEST_WITH_REBOOTS(SplitTableResolvedTimestamps) {
SplitTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
ResolvedTimestampsIntervalMs: 1000
}
)");
}

template <typename T>
void MergeTable(const TString& cdcStreamDesc) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
{
TInactiveZone inactive(activeZone);

TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Expand All @@ -674,15 +704,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
}
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);

TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}

Expand All @@ -696,11 +720,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
TInactiveZone inactive(activeZone);
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2},
&initialTableDesc.GetPathDescription().GetTablePartitions());
}
});
}

Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
MergeTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
}

Y_UNIT_TEST_WITH_REBOOTS(MergeTableResolvedTimestamps) {
MergeTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
ResolvedTimestampsIntervalMs: 1000
}
)");
}

Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
Expand Down

0 comments on commit 59f23b0

Please sign in to comment.