Skip to content

Commit

Permalink
[test](be-ut) fix load stream test after segment num check (#37660)
Browse files Browse the repository at this point in the history
## Proposed changes

fix load stream test after #36753
  • Loading branch information
kaijchen authored Jul 12, 2024
1 parent 37ed7ad commit d1bd06c
Showing 1 changed file with 111 additions and 41 deletions.
152 changes: 111 additions & 41 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,19 +495,17 @@ class LoadStreamMgrTest : public testing::Test {
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}

void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) {
void close_load(MockSinkClient& client, const std::vector<PTabletID>& tablets_to_commit = {},
uint32_t sender_id = NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
PStreamHeader header;
header.mutable_load_id()->set_hi(1);
header.mutable_load_id()->set_lo(1);
header.set_opcode(PStreamHeader::CLOSE_LOAD);
header.set_src_id(sender_id);
/* TODO: fix test with tablets_to_commit
PTabletID* tablets_to_commit = header.add_tablets();
tablets_to_commit->set_partition_id(NORMAL_PARTITION_ID);
tablets_to_commit->set_index_id(NORMAL_INDEX_ID);
tablets_to_commit->set_tablet_id(NORMAL_TABLET_ID);
*/
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
size_t hdr_len = header.ByteSizeLong();
append_buf.append((char*)&hdr_len, sizeof(size_t));
append_buf.append(header.SerializeAsString());
Expand Down Expand Up @@ -680,14 +678,19 @@ TEST_F(LoadStreamMgrTest, one_client_normal) {
write_normal(client);

reset_response_stat();
close_load(client, ABNORMAL_SENDER_ID);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, ABNORMAL_SENDER_ID);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);

close_load(client);
close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -738,14 +741,19 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);

close_load(client, 1);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(ABNORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -769,17 +777,23 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);

close_load(client, 1);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);

close_load(client, 0);
// on the final close_load, segment num check will fail
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2);

// server will close stream on CLOSE_LOAD
wait_for_close();
Expand All @@ -799,13 +813,18 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID);

close_load(client, 1);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(ABNORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -832,21 +851,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b
0, data, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -873,8 +897,13 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
data.length(), data, false);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD before EOS
close_load(client);
close_load(client, {tablet});
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -888,7 +917,7 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
data.length(), data, true);

// duplicated close, will not be handled
close_load(client);
close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -915,21 +944,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) {
data.length(), data, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -959,21 +993,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without
0, data, false);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand Down Expand Up @@ -1002,21 +1041,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) {
data.length(), data, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand Down Expand Up @@ -1049,21 +1093,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) {
0, data2, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(2);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -1101,21 +1150,32 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) {
NORMAL_TABLET_ID + 2, 0, 0, data2, true);

EXPECT_EQ(g_response_stat.num, 0);

PTabletID tablet1;
tablet1.set_partition_id(NORMAL_PARTITION_ID);
tablet1.set_index_id(NORMAL_INDEX_ID);
tablet1.set_tablet_id(NORMAL_TABLET_ID);
tablet1.set_num_segments(1);
PTabletID tablet2 {tablet1};
tablet2.set_tablet_id(NORMAL_TABLET_ID + 1);
PTabletID tablet3 {tablet1};
tablet3.set_tablet_id(NORMAL_TABLET_ID + 2);

// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet1, tablet2, tablet3}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3);
Expand Down Expand Up @@ -1169,22 +1229,27 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) {
}

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(3);
// CLOSE_LOAD
close_load(clients[1], 1);
close_load(clients[1], {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(clients[1], 1);
close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
// stream closed, no response will be sent
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(clients[0], 0);
close_load(clients[0], {tablet}, 0);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -1239,8 +1304,13 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
}

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(3);
// CLOSE_LOAD
close_load(clients[0], 0);
close_load(clients[0], {tablet}, 0);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -1257,7 +1327,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true);
}

close_load(clients[1], 1);
close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down

0 comments on commit d1bd06c

Please sign in to comment.