Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with broadcasting records #6585

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class TBaseChangeSender {
Y_ABORT_UNLESS(it != Broadcasting.end());

auto& broadcast = it->second;
if (broadcast.Partitions.contains(partitionId)) {
if (broadcast.CompletedPartitions.contains(partitionId)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,84 +556,87 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}

bool CheckRegistrations(TTestActorRuntime& runtime, NKikimrPQ::TMessageGroupInfo::EState expectedState,
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>& tablePartitions,
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition>& topicPartitions)
{
for (const auto& topicPartition : topicPartitions) {
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
{
auto& record = *request->Record.MutablePartitionRequest();
record.SetPartition(topicPartition.GetPartitionId());
auto& cmd = *record.MutableCmdGetMaxSeqNo();
for (const auto& tablePartition : tablePartitions) {
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
}
}

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, topicPartition.GetTabletId(), sender, request.Release());

auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
{
const auto& record = response->Get()->Record.GetPartitionResponse();
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();

UNIT_ASSERT_VALUES_EQUAL(result.size(), tablePartitions.size());
for (const auto& item: result) {
if (item.GetState() != expectedState) {
return false;
}
}
}
}

return true;
}

struct TItem {
TString Path;
ui32 nPartitions;
ui32 ExpectedPartitionCount;
};

void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) {
void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic,
const google::protobuf::RepeatedPtrField<NKikimrSchemeOp::TTablePartition>* initialTablePartitions = nullptr)
{
auto tableDesc = DescribePath(runtime, table.Path, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions);
UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.ExpectedPartitionCount);

auto topicDesc = DescribePrivatePath(runtime, topic.Path);
const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions();
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions);
UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.ExpectedPartitionCount);

while (true) {
runtime.SimulateSleep(TDuration::Seconds(1));
bool done = true;

for (ui32 i = 0; i < topic.nPartitions; ++i) {
auto request = MakeHolder<TEvPersQueue::TEvRequest>();
{
auto& record = *request->Record.MutablePartitionRequest();
record.SetPartition(topicPartitions[i].GetPartitionId());
auto& cmd = *record.MutableCmdGetMaxSeqNo();
for (const auto& tablePartition : tablePartitions) {
cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId())));
}
}

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release());

auto response = runtime.GrabEdgeEvent<TEvPersQueue::TEvResponse>(sender);
{
const auto& record = response->Get()->Record.GetPartitionResponse();
const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo();

UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions);
for (const auto& item: result) {
done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED;
if (!done) {
break;
}
}
}

if (!done) {
break;
}
}

if (done) {
if (CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED, tablePartitions, topicPartitions)) {
break;
}
}

if (initialTablePartitions) {
UNIT_ASSERT(CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN, *initialTablePartitions, topicPartitions));
}
}

Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
template <typename T>
void SplitTable(const TString& cdcStreamDesc) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
{
TInactiveZone inactive(activeZone);

TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uint32" }
KeyColumnNames: ["key"]
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);

TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}

Expand All @@ -651,16 +654,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
TInactiveZone inactive(activeZone);
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1},
&initialTableDesc.GetPathDescription().GetTablePartitions());
}
});
}

Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
SplitTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
}

Y_UNIT_TEST_WITH_REBOOTS(SplitTableResolvedTimestamps) {
SplitTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
ResolvedTimestampsIntervalMs: 1000
}
)");
}

template <typename T>
void MergeTable(const TString& cdcStreamDesc) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
NKikimrScheme::TEvDescribeSchemeResult initialTableDesc;
{
TInactiveZone inactive(activeZone);

TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Expand All @@ -674,15 +704,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
}
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true);

TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc);
t.TestEnv->TestWaitNotification(runtime, t.TxId);
}

Expand All @@ -696,11 +720,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
TInactiveZone inactive(activeZone);
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2},
&initialTableDesc.GetPathDescription().GetTablePartitions());
}
});
}

Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
MergeTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)");
}

Y_UNIT_TEST_WITH_REBOOTS(MergeTableResolvedTimestamps) {
MergeTable<T>(R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
ResolvedTimestampsIntervalMs: 1000
}
)");
}

Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
Expand Down
Loading