From e9d9d8a4cd87fa02c815bd7c7af81710fc7bb575 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 8 Jul 2024 17:58:28 +0800 Subject: [PATCH 1/2] [fix](move-memtable) check segment num when closing each tablet (#36753) Previously, there is chance that sender failed to send some data while the receiver being unaware of. This will cause lost data if some segments are skipped. This PR fixes the problem by including checks in both sender and receiver. When sender failed to send rpc, LoadStreamStub will mark the involved tablets failed. Each sender will send segment num for each tablet in CLOSE_LOAD, and receivers (LoadStream) will sum up and check total segment nums. --- be/src/io/fs/stream_sink_file_writer.cpp | 53 +++++++++-------- be/src/olap/delta_writer_v2.cpp | 3 +- be/src/olap/delta_writer_v2.h | 2 +- be/src/olap/rowset/beta_rowset_writer_v2.h | 2 + be/src/runtime/load_stream.cpp | 13 ++++- be/src/runtime/load_stream.h | 2 + be/src/vec/sink/delta_writer_v2_pool.cpp | 9 ++- be/src/vec/sink/delta_writer_v2_pool.h | 3 +- be/src/vec/sink/load_stream_map_pool.cpp | 18 +++++- be/src/vec/sink/load_stream_map_pool.h | 7 ++- be/src/vec/sink/load_stream_stub.cpp | 58 ++++++++++++++++++- be/src/vec/sink/load_stream_stub.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 10 +++- .../vec/exec/delta_writer_v2_pool_test.cpp | 10 ++-- gensrc/proto/internal_service.proto | 1 + .../test_multi_replica_fault_injection.groovy | 5 +- 16 files changed, 151 insertions(+), 47 deletions(-) diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index cfc924fad0aa1b..e6007550396912 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -51,42 +51,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { << ", data_length: " << bytes_req; std::span slices {data, data_cnt}; - size_t stream_index = 0; + size_t fault_injection_skipped_streams = 0; bool ok = false; - bool skip_stream = false; Status st; for (auto& stream : _streams) { DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { - if (stream_index >= 2) { - skip_stream = true; + if (fault_injection_skipped_streams < 1) { + fault_injection_skipped_streams++; + continue; } }); DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { - if (stream_index >= 1) { - skip_stream = true; + if (fault_injection_skipped_streams < 2) { + fault_injection_skipped_streams++; + continue; } }); - if (!skip_stream) { - st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, - _bytes_appended, slices); - } - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { - if (stream_index >= 2) { - st = Status::InternalError("stream sink file writer append data failed"); - } - stream_index++; - skip_stream = false; - }); - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { - if (stream_index >= 1) { - st = Status::InternalError("stream sink file writer append data failed"); - } - stream_index++; - skip_stream = false; - }); - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", { - st = Status::InternalError("stream sink file writer append data failed"); - }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", + { continue; }); + st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, + slices); ok = ok || st.ok(); if (!st.ok()) { LOG(WARNING) << "failed to send segment data to backend " << stream->dst_id() @@ -116,8 +100,23 @@ Status StreamSinkFileWriter::finalize() { VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat + size_t fault_injection_skipped_streams = 0; bool ok = false; for (auto& stream : _streams) { + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { + if (fault_injection_skipped_streams < 1) { + fault_injection_skipped_streams++; + continue; + } + }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { + if (fault_injection_skipped_streams < 2) { + fault_injection_skipped_streams++; + continue; + } + }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", + { continue; }); auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, {}, true); ok = ok || st.ok(); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 378728f025cdb4..e02c8eea70cae2 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -180,7 +180,7 @@ Status DeltaWriterV2::close() { return _memtable_writer->close(); } -Status DeltaWriterV2::close_wait(RuntimeProfile* profile) { +Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) { SCOPED_RAW_TIMER(&_close_wait_time); std::lock_guard l(_lock); DCHECK(_is_init) @@ -190,6 +190,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) { _update_profile(profile); } RETURN_IF_ERROR(_memtable_writer->close_wait(profile)); + num_segments = _rowset_writer->next_segment_id(); _delta_written_success = true; return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 31b364e103880a..0ef564be393762 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -77,7 +77,7 @@ class DeltaWriterV2 { Status close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - Status close_wait(RuntimeProfile* profile = nullptr); + Status close_wait(int32_t& num_segments, RuntimeProfile* profile = nullptr); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index 4b0ab950de483f..6d1321bd144aca 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -131,6 +131,8 @@ class BetaRowsetWriterV2 : public RowsetWriter { int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); }; + int32_t next_segment_id() { return _segment_creator.next_segment_id(); }; + int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; } int64_t segment_writer_ns() override { return _segment_writer_ns; } diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 8de15091ec5030..1f8c33995b3eaa 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -244,6 +244,11 @@ Status TabletStream::close() { if (!_failed_st->ok()) { return *_failed_st; } + if (_next_segid.load() != _num_segments) { + return Status::Corruption( + "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, + _num_segments, _next_segid.load(), print_id(_load_id)); + } Status st = Status::OK(); auto close_func = [this, &mu, &cv, &st]() { @@ -307,11 +312,17 @@ Status IndexStream::close(const std::vector& tablets_to_commit, SCOPED_TIMER(_close_wait_timer); // open all need commit tablets for (const auto& tablet : tablets_to_commit) { + if (_id != tablet.index_id()) { + continue; + } TabletStreamSharedPtr tablet_stream; auto it = _tablet_streams_map.find(tablet.tablet_id()); - if (it == _tablet_streams_map.end() && _id == tablet.index_id()) { + if (it == _tablet_streams_map.end()) { RETURN_IF_ERROR( _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id())); + tablet_stream->add_num_segments(tablet.num_segments()); + } else { + it->second->add_num_segments(tablet.num_segments()); } } diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index b2635698379f6d..f690882a878285 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -52,6 +52,7 @@ class TabletStream { Status append_data(const PStreamHeader& header, butil::IOBuf* data); Status add_segment(const PStreamHeader& header, butil::IOBuf* data); + void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } Status close(); int64_t id() const { return _id; } @@ -63,6 +64,7 @@ class TabletStream { std::vector> _flush_tokens; std::unordered_map> _segids_mapping; std::atomic _next_segid; + int64_t _num_segments = 0; bthread::Mutex _lock; std::shared_ptr _failed_st; PUniqueId _load_id; diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index cfb2b5294c7c55..b61d29819d4919 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -41,7 +41,8 @@ std::shared_ptr DeltaWriterV2Map::get_or_create( return writer; } -Status DeltaWriterV2Map::close(RuntimeProfile* profile) { +Status DeltaWriterV2Map::close(std::unordered_map& segments_for_tablet, + RuntimeProfile* profile) { int num_use = --_use_cnt; if (num_use > 0) { LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use; @@ -56,8 +57,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) { RETURN_IF_ERROR(writer->close()); } LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id; - for (auto& [_, writer] : _map) { - RETURN_IF_ERROR(writer->close_wait(profile)); + for (auto& [tablet_id, writer] : _map) { + int32_t num_segments; + RETURN_IF_ERROR(writer->close_wait(num_segments, profile)); + segments_for_tablet[tablet_id] = num_segments; } return Status::OK(); } diff --git a/be/src/vec/sink/delta_writer_v2_pool.h b/be/src/vec/sink/delta_writer_v2_pool.h index 912b9216e9f58e..7e58eea31498f6 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.h +++ b/be/src/vec/sink/delta_writer_v2_pool.h @@ -70,7 +70,8 @@ class DeltaWriterV2Map { int64_t tablet_id, std::function()> creator); // close all delta writers in this DeltaWriterV2Map if there is no other users - Status close(RuntimeProfile* profile = nullptr); + Status close(std::unordered_map& segments_for_tablet, + RuntimeProfile* profile = nullptr); // cancel all delta writers in this DeltaWriterV2Map void cancel(Status status); diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 7a3072ade6e70b..2fcb8deaeb2c85 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -87,7 +87,9 @@ void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit) { std::lock_guard lock(_tablets_to_commit_mutex); auto& tablets = _tablets_to_commit[dst_id]; - tablets.insert(tablets.end(), tablets_to_commit.begin(), tablets_to_commit.end()); + for (const auto& tablet : tablets_to_commit) { + tablets.emplace(tablet.tablet_id(), tablet); + } } bool LoadStreamMap::release() { @@ -103,12 +105,24 @@ bool LoadStreamMap::release() { Status LoadStreamMap::close_load(bool incremental) { return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { + std::vector tablets_to_commit; const auto& tablets = _tablets_to_commit[dst_id]; + tablets_to_commit.reserve(tablets.size()); + for (const auto& [tablet_id, tablet] : tablets) { + tablets_to_commit.push_back(tablet); + tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); + } + bool first = true; for (auto& stream : streams) { if (stream->is_incremental() != incremental) { continue; } - RETURN_IF_ERROR(stream->close_load(tablets)); + if (first) { + RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + first = false; + } else { + RETURN_IF_ERROR(stream->close_load({})); + } } return Status::OK(); }); diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index d0f72ab7e004e0..dcddcdaf8d8ac4 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -90,6 +90,10 @@ class LoadStreamMap { void save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit); + void save_segments_for_tablet(const std::unordered_map& segments_for_tablet) { + _segments_for_tablet.insert(segments_for_tablet.cbegin(), segments_for_tablet.cend()); + } + // Return true if the last instance is just released. bool release(); @@ -109,7 +113,8 @@ class LoadStreamMap { std::shared_ptr _enable_unique_mow_for_index; std::mutex _tablets_to_commit_mutex; - std::unordered_map> _tablets_to_commit; + std::unordered_map> _tablets_to_commit; + std::unordered_map _segments_for_tablet; }; class LoadStreamMapPool { diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index caebb381db6048..93f3fd87a8571d 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -207,6 +207,11 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span data, bool segment_eos) { + DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { + if (segment_id != 0) { + return Status::OK(); + } + }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -224,6 +229,11 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, const SegmentStatistics& segment_stat, TabletSchemaSPtr flush_schema) { + DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { + if (segment_id != 0) { + return Status::OK(); + } + }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -368,7 +378,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { std::lock_guard send_lock(_send_mutex); buffer_lock.unlock(); VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync; - return _send_with_retry(output); + auto st = _send_with_retry(output); + if (!st.ok()) { + _handle_failure(output, st); + } + return st; +} + +void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) { + while (buf.size() > 0) { + // step 1: parse header + size_t hdr_len = 0; + buf.cutn((void*)&hdr_len, sizeof(size_t)); + butil::IOBuf hdr_buf; + PStreamHeader hdr; + buf.cutn(&hdr_buf, hdr_len); + butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf); + hdr.ParseFromZeroCopyStream(&wrapper); + + // step 2: cut data + size_t data_len = 0; + buf.cutn((void*)&data_len, sizeof(size_t)); + butil::IOBuf data_buf; + buf.cutn(&data_buf, data_len); + + // step 3: handle failure + switch (hdr.opcode()) { + case PStreamHeader::ADD_SEGMENT: + case PStreamHeader::APPEND_DATA: { + add_failed_tablet(hdr.tablet_id(), st); + } break; + case PStreamHeader::CLOSE_LOAD: { + brpc::StreamClose(_stream_id); + } break; + case PStreamHeader::GET_SCHEMA: { + // Just log and let wait_for_schema timeout + std::ostringstream oss; + for (const auto& tablet : hdr.tablets()) { + oss << " " << tablet.tablet_id(); + } + LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", " + << *this; + } break; + default: + LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; + DCHECK(false); + } + } } Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 1bf0fac4e381b8..4e6aad8d1ae739 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -207,7 +207,6 @@ class LoadStreamStub : public std::enable_shared_from_this { _success_tablets.push_back(tablet_id); } - // for tests only void add_failed_tablet(int64_t tablet_id, Status reason) { std::lock_guard lock(_failed_tablets_mutex); _failed_tablets[tablet_id] = reason; @@ -217,6 +216,7 @@ class LoadStreamStub : public std::enable_shared_from_this { Status _encode_and_send(PStreamHeader& header, std::span data = {}); Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); Status _send_with_retry(butil::IOBuf& buf); + void _handle_failure(butil::IOBuf& buf, Status st); Status _check_cancel() { if (!_is_cancelled.load()) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index c1c6e1cfc86e2f..c7a9a5733aad89 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -541,13 +541,18 @@ Status VTabletWriterV2::close(Status exec_status) { // close DeltaWriters { + std::unordered_map segments_for_tablet; SCOPED_TIMER(_close_writer_timer); // close all delta writers if this is the last user - auto st = _delta_writer_for_tablet->close(_profile); + auto st = _delta_writer_for_tablet->close(segments_for_tablet, _profile); _delta_writer_for_tablet.reset(); if (!st.ok()) { RETURN_IF_ERROR(_cancel(st)); } + // only the last sink closing delta writers will have segment num + if (!segments_for_tablet.empty()) { + _load_stream_map->save_segments_for_tablet(segments_for_tablet); + } } _calc_tablets_to_commit(); @@ -657,7 +662,8 @@ void VTabletWriterV2::_calc_tablets_to_commit() { if (VLOG_DEBUG_IS_ON) { partition_ids.push_back(tablet.partition_id()); } - tablets_to_commit.push_back(tablet); + PTabletID t(tablet); + tablets_to_commit.push_back(t); } } if (VLOG_DEBUG_IS_ON) { diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp b/be/test/vec/exec/delta_writer_v2_pool_test.cpp index a67a701c409e59..dc86ce8c3a28aa 100644 --- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp +++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp @@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) { EXPECT_EQ(2, pool.size()); EXPECT_EQ(map, map3); EXPECT_NE(map, map2); - EXPECT_TRUE(map->close().ok()); - EXPECT_TRUE(map2->close().ok()); - EXPECT_TRUE(map3->close().ok()); + std::unordered_map sft; + EXPECT_TRUE(map->close(sft).ok()); + EXPECT_TRUE(map2->close(sft).ok()); + EXPECT_TRUE(map3->close(sft).ok()); EXPECT_EQ(0, pool.size()); } @@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { EXPECT_EQ(2, map->size()); EXPECT_EQ(writer, writer3); EXPECT_NE(writer, writer2); - static_cast(map->close()); + std::unordered_map sft; + static_cast(map->close(sft)); EXPECT_EQ(0, pool.size()); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 0a975b81991224..14a165a3b9df56 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -66,6 +66,7 @@ message PTabletID { optional int64 partition_id = 1; optional int64 index_id = 2; optional int64 tablet_id = 3; + optional int64 num_segments = 4; } message PTabletInfo { diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 37d8b4f26100c4..8080b52ff483a1 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -91,10 +91,11 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { // success load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess") // StreamSinkFileWriter appendv write segment failed two replica - load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "replica num 1 < load required replica num 2") + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed") // StreamSinkFileWriter appendv write segment failed all replica load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") - + // test segment num check when LoadStreamStub missed tail segments + load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") sql """ set enable_memtable_on_sink_node=false """ } } From c748b2e3b1b6969c2b1874e32bf70ce542c9471d Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 11 Jul 2024 15:50:31 +0800 Subject: [PATCH 2/2] [test](be-ut) fix load stream test after segment num check --- be/test/runtime/load_stream_test.cpp | 152 +++++++++++++++++++-------- 1 file changed, 111 insertions(+), 41 deletions(-) diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 2ee3f86c1a4632..f0cc3354d8e112 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -494,19 +494,17 @@ class LoadStreamMgrTest : public testing::Test { : _heavy_work_pool(4, 32, "load_stream_test_heavy"), _light_work_pool(4, 32, "load_stream_test_light") {} - void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) { + void close_load(MockSinkClient& client, const std::vector& tablets_to_commit = {}, + uint32_t sender_id = NORMAL_SENDER_ID) { butil::IOBuf append_buf; PStreamHeader header; header.mutable_load_id()->set_hi(1); header.mutable_load_id()->set_lo(1); header.set_opcode(PStreamHeader::CLOSE_LOAD); header.set_src_id(sender_id); - /* TODO: fix test with tablets_to_commit - PTabletID* tablets_to_commit = header.add_tablets(); - tablets_to_commit->set_partition_id(NORMAL_PARTITION_ID); - tablets_to_commit->set_index_id(NORMAL_INDEX_ID); - tablets_to_commit->set_tablet_id(NORMAL_TABLET_ID); - */ + for (const auto& tablet : tablets_to_commit) { + *header.add_tablets() = tablet; + } size_t hdr_len = header.ByteSizeLong(); append_buf.append((char*)&hdr_len, sizeof(size_t)); append_buf.append(header.SerializeAsString()); @@ -677,14 +675,19 @@ TEST_F(LoadStreamMgrTest, one_client_normal) { write_normal(client); reset_response_stat(); - close_load(client, ABNORMAL_SENDER_ID); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, ABNORMAL_SENDER_ID); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); - close_load(client); + close_load(client, {tablet}); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -735,14 +738,19 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) { EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); - close_load(client, 1); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(ABNORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -766,17 +774,23 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) { EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); - close_load(client, 1); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - close_load(client, 0); + // on the final close_load, segment num check will fail + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); - EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2); // server will close stream on CLOSE_LOAD wait_for_close(); @@ -796,13 +810,18 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) { EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID); - close_load(client, 1); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(ABNORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -829,21 +848,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b 0, data, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -870,8 +894,13 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) { data.length(), data, false); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD before EOS - close_load(client); + close_load(client, {tablet}); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -885,7 +914,7 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) { data.length(), data, true); // duplicated close, will not be handled - close_load(client); + close_load(client, {tablet}); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -912,21 +941,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) { data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -956,21 +990,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without 0, data, false); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -999,21 +1038,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) { data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -1046,21 +1090,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) { 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(2); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -1098,21 +1147,32 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) { NORMAL_TABLET_ID + 2, 0, 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); + + PTabletID tablet1; + tablet1.set_partition_id(NORMAL_PARTITION_ID); + tablet1.set_index_id(NORMAL_INDEX_ID); + tablet1.set_tablet_id(NORMAL_TABLET_ID); + tablet1.set_num_segments(1); + PTabletID tablet2 {tablet1}; + tablet2.set_tablet_id(NORMAL_TABLET_ID + 1); + PTabletID tablet3 {tablet1}; + tablet3.set_tablet_id(NORMAL_TABLET_ID + 2); + // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet1, tablet2, tablet3}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet1, tablet2, tablet3}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet1, tablet2, tablet3}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3); @@ -1166,22 +1226,27 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { } EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(3); // CLOSE_LOAD - close_load(clients[1], 1); + close_load(clients[1], {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(clients[1], 1); + close_load(clients[1], {tablet}, 1); wait_for_ack(2); // stream closed, no response will be sent EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(clients[0], 0); + close_load(clients[0], {tablet}, 0); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -1236,8 +1301,13 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { } EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(3); // CLOSE_LOAD - close_load(clients[0], 0); + close_load(clients[0], {tablet}, 0); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -1254,7 +1324,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); } - close_load(clients[1], 1); + close_load(clients[1], {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);