diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index cfc924fad0aa1b..e6007550396912 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -51,42 +51,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { << ", data_length: " << bytes_req; std::span slices {data, data_cnt}; - size_t stream_index = 0; + size_t fault_injection_skipped_streams = 0; bool ok = false; - bool skip_stream = false; Status st; for (auto& stream : _streams) { DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { - if (stream_index >= 2) { - skip_stream = true; + if (fault_injection_skipped_streams < 1) { + fault_injection_skipped_streams++; + continue; } }); DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { - if (stream_index >= 1) { - skip_stream = true; + if (fault_injection_skipped_streams < 2) { + fault_injection_skipped_streams++; + continue; } }); - if (!skip_stream) { - st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, - _bytes_appended, slices); - } - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { - if (stream_index >= 2) { - st = Status::InternalError("stream sink file writer append data failed"); - } - stream_index++; - skip_stream = false; - }); - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { - if (stream_index >= 1) { - st = Status::InternalError("stream sink file writer append data failed"); - } - stream_index++; - skip_stream = false; - }); - DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", { - st = Status::InternalError("stream sink file writer append data failed"); - }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", + { continue; }); + st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, + slices); ok = ok || st.ok(); if (!st.ok()) { LOG(WARNING) << "failed to send segment data to backend " << stream->dst_id() @@ -116,8 +100,23 @@ Status StreamSinkFileWriter::finalize() { VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat + size_t fault_injection_skipped_streams = 0; bool ok = false; for (auto& stream : _streams) { + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { + if (fault_injection_skipped_streams < 1) { + fault_injection_skipped_streams++; + continue; + } + }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { + if (fault_injection_skipped_streams < 2) { + fault_injection_skipped_streams++; + continue; + } + }); + DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", + { continue; }); auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, {}, true); ok = ok || st.ok(); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 378728f025cdb4..e02c8eea70cae2 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -180,7 +180,7 @@ Status DeltaWriterV2::close() { return _memtable_writer->close(); } -Status DeltaWriterV2::close_wait(RuntimeProfile* profile) { +Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) { SCOPED_RAW_TIMER(&_close_wait_time); std::lock_guard l(_lock); DCHECK(_is_init) @@ -190,6 +190,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) { _update_profile(profile); } RETURN_IF_ERROR(_memtable_writer->close_wait(profile)); + num_segments = _rowset_writer->next_segment_id(); _delta_written_success = true; return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 31b364e103880a..0ef564be393762 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -77,7 +77,7 @@ class DeltaWriterV2 { Status close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - Status close_wait(RuntimeProfile* profile = nullptr); + Status close_wait(int32_t& num_segments, RuntimeProfile* profile = nullptr); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index 4b0ab950de483f..6d1321bd144aca 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -131,6 +131,8 @@ class BetaRowsetWriterV2 : public RowsetWriter { int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); }; + int32_t next_segment_id() { return _segment_creator.next_segment_id(); }; + int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; } int64_t segment_writer_ns() override { return _segment_writer_ns; } diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 8de15091ec5030..1f8c33995b3eaa 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -244,6 +244,11 @@ Status TabletStream::close() { if (!_failed_st->ok()) { return *_failed_st; } + if (_next_segid.load() != _num_segments) { + return Status::Corruption( + "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, + _num_segments, _next_segid.load(), print_id(_load_id)); + } Status st = Status::OK(); auto close_func = [this, &mu, &cv, &st]() { @@ -307,11 +312,17 @@ Status IndexStream::close(const std::vector& tablets_to_commit, SCOPED_TIMER(_close_wait_timer); // open all need commit tablets for (const auto& tablet : tablets_to_commit) { + if (_id != tablet.index_id()) { + continue; + } TabletStreamSharedPtr tablet_stream; auto it = _tablet_streams_map.find(tablet.tablet_id()); - if (it == _tablet_streams_map.end() && _id == tablet.index_id()) { + if (it == _tablet_streams_map.end()) { RETURN_IF_ERROR( _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id())); + tablet_stream->add_num_segments(tablet.num_segments()); + } else { + it->second->add_num_segments(tablet.num_segments()); } } diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index b2635698379f6d..f690882a878285 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -52,6 +52,7 @@ class TabletStream { Status append_data(const PStreamHeader& header, butil::IOBuf* data); Status add_segment(const PStreamHeader& header, butil::IOBuf* data); + void add_num_segments(int64_t num_segments) { _num_segments += num_segments; } Status close(); int64_t id() const { return _id; } @@ -63,6 +64,7 @@ class TabletStream { std::vector> _flush_tokens; std::unordered_map> _segids_mapping; std::atomic _next_segid; + int64_t _num_segments = 0; bthread::Mutex _lock; std::shared_ptr _failed_st; PUniqueId _load_id; diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index cfb2b5294c7c55..b61d29819d4919 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -41,7 +41,8 @@ std::shared_ptr DeltaWriterV2Map::get_or_create( return writer; } -Status DeltaWriterV2Map::close(RuntimeProfile* profile) { +Status DeltaWriterV2Map::close(std::unordered_map& segments_for_tablet, + RuntimeProfile* profile) { int num_use = --_use_cnt; if (num_use > 0) { LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use; @@ -56,8 +57,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) { RETURN_IF_ERROR(writer->close()); } LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id; - for (auto& [_, writer] : _map) { - RETURN_IF_ERROR(writer->close_wait(profile)); + for (auto& [tablet_id, writer] : _map) { + int32_t num_segments; + RETURN_IF_ERROR(writer->close_wait(num_segments, profile)); + segments_for_tablet[tablet_id] = num_segments; } return Status::OK(); } diff --git a/be/src/vec/sink/delta_writer_v2_pool.h b/be/src/vec/sink/delta_writer_v2_pool.h index 912b9216e9f58e..7e58eea31498f6 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.h +++ b/be/src/vec/sink/delta_writer_v2_pool.h @@ -70,7 +70,8 @@ class DeltaWriterV2Map { int64_t tablet_id, std::function()> creator); // close all delta writers in this DeltaWriterV2Map if there is no other users - Status close(RuntimeProfile* profile = nullptr); + Status close(std::unordered_map& segments_for_tablet, + RuntimeProfile* profile = nullptr); // cancel all delta writers in this DeltaWriterV2Map void cancel(Status status); diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 7a3072ade6e70b..2fcb8deaeb2c85 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -87,7 +87,9 @@ void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit) { std::lock_guard lock(_tablets_to_commit_mutex); auto& tablets = _tablets_to_commit[dst_id]; - tablets.insert(tablets.end(), tablets_to_commit.begin(), tablets_to_commit.end()); + for (const auto& tablet : tablets_to_commit) { + tablets.emplace(tablet.tablet_id(), tablet); + } } bool LoadStreamMap::release() { @@ -103,12 +105,24 @@ bool LoadStreamMap::release() { Status LoadStreamMap::close_load(bool incremental) { return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { + std::vector tablets_to_commit; const auto& tablets = _tablets_to_commit[dst_id]; + tablets_to_commit.reserve(tablets.size()); + for (const auto& [tablet_id, tablet] : tablets) { + tablets_to_commit.push_back(tablet); + tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); + } + bool first = true; for (auto& stream : streams) { if (stream->is_incremental() != incremental) { continue; } - RETURN_IF_ERROR(stream->close_load(tablets)); + if (first) { + RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + first = false; + } else { + RETURN_IF_ERROR(stream->close_load({})); + } } return Status::OK(); }); diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index d0f72ab7e004e0..dcddcdaf8d8ac4 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -90,6 +90,10 @@ class LoadStreamMap { void save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit); + void save_segments_for_tablet(const std::unordered_map& segments_for_tablet) { + _segments_for_tablet.insert(segments_for_tablet.cbegin(), segments_for_tablet.cend()); + } + // Return true if the last instance is just released. bool release(); @@ -109,7 +113,8 @@ class LoadStreamMap { std::shared_ptr _enable_unique_mow_for_index; std::mutex _tablets_to_commit_mutex; - std::unordered_map> _tablets_to_commit; + std::unordered_map> _tablets_to_commit; + std::unordered_map _segments_for_tablet; }; class LoadStreamMapPool { diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index caebb381db6048..93f3fd87a8571d 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -207,6 +207,11 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span data, bool segment_eos) { + DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { + if (segment_id != 0) { + return Status::OK(); + } + }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -224,6 +229,11 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, const SegmentStatistics& segment_stat, TabletSchemaSPtr flush_schema) { + DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { + if (segment_id != 0) { + return Status::OK(); + } + }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -368,7 +378,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { std::lock_guard send_lock(_send_mutex); buffer_lock.unlock(); VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync; - return _send_with_retry(output); + auto st = _send_with_retry(output); + if (!st.ok()) { + _handle_failure(output, st); + } + return st; +} + +void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) { + while (buf.size() > 0) { + // step 1: parse header + size_t hdr_len = 0; + buf.cutn((void*)&hdr_len, sizeof(size_t)); + butil::IOBuf hdr_buf; + PStreamHeader hdr; + buf.cutn(&hdr_buf, hdr_len); + butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf); + hdr.ParseFromZeroCopyStream(&wrapper); + + // step 2: cut data + size_t data_len = 0; + buf.cutn((void*)&data_len, sizeof(size_t)); + butil::IOBuf data_buf; + buf.cutn(&data_buf, data_len); + + // step 3: handle failure + switch (hdr.opcode()) { + case PStreamHeader::ADD_SEGMENT: + case PStreamHeader::APPEND_DATA: { + add_failed_tablet(hdr.tablet_id(), st); + } break; + case PStreamHeader::CLOSE_LOAD: { + brpc::StreamClose(_stream_id); + } break; + case PStreamHeader::GET_SCHEMA: { + // Just log and let wait_for_schema timeout + std::ostringstream oss; + for (const auto& tablet : hdr.tablets()) { + oss << " " << tablet.tablet_id(); + } + LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", " + << *this; + } break; + default: + LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this; + DCHECK(false); + } + } } Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 1bf0fac4e381b8..4e6aad8d1ae739 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -207,7 +207,6 @@ class LoadStreamStub : public std::enable_shared_from_this { _success_tablets.push_back(tablet_id); } - // for tests only void add_failed_tablet(int64_t tablet_id, Status reason) { std::lock_guard lock(_failed_tablets_mutex); _failed_tablets[tablet_id] = reason; @@ -217,6 +216,7 @@ class LoadStreamStub : public std::enable_shared_from_this { Status _encode_and_send(PStreamHeader& header, std::span data = {}); Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); Status _send_with_retry(butil::IOBuf& buf); + void _handle_failure(butil::IOBuf& buf, Status st); Status _check_cancel() { if (!_is_cancelled.load()) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index c1c6e1cfc86e2f..c7a9a5733aad89 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -541,13 +541,18 @@ Status VTabletWriterV2::close(Status exec_status) { // close DeltaWriters { + std::unordered_map segments_for_tablet; SCOPED_TIMER(_close_writer_timer); // close all delta writers if this is the last user - auto st = _delta_writer_for_tablet->close(_profile); + auto st = _delta_writer_for_tablet->close(segments_for_tablet, _profile); _delta_writer_for_tablet.reset(); if (!st.ok()) { RETURN_IF_ERROR(_cancel(st)); } + // only the last sink closing delta writers will have segment num + if (!segments_for_tablet.empty()) { + _load_stream_map->save_segments_for_tablet(segments_for_tablet); + } } _calc_tablets_to_commit(); @@ -657,7 +662,8 @@ void VTabletWriterV2::_calc_tablets_to_commit() { if (VLOG_DEBUG_IS_ON) { partition_ids.push_back(tablet.partition_id()); } - tablets_to_commit.push_back(tablet); + PTabletID t(tablet); + tablets_to_commit.push_back(t); } } if (VLOG_DEBUG_IS_ON) { diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp b/be/test/vec/exec/delta_writer_v2_pool_test.cpp index a67a701c409e59..dc86ce8c3a28aa 100644 --- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp +++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp @@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) { EXPECT_EQ(2, pool.size()); EXPECT_EQ(map, map3); EXPECT_NE(map, map2); - EXPECT_TRUE(map->close().ok()); - EXPECT_TRUE(map2->close().ok()); - EXPECT_TRUE(map3->close().ok()); + std::unordered_map sft; + EXPECT_TRUE(map->close(sft).ok()); + EXPECT_TRUE(map2->close(sft).ok()); + EXPECT_TRUE(map3->close(sft).ok()); EXPECT_EQ(0, pool.size()); } @@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { EXPECT_EQ(2, map->size()); EXPECT_EQ(writer, writer3); EXPECT_NE(writer, writer2); - static_cast(map->close()); + std::unordered_map sft; + static_cast(map->close(sft)); EXPECT_EQ(0, pool.size()); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 0a975b81991224..14a165a3b9df56 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -66,6 +66,7 @@ message PTabletID { optional int64 partition_id = 1; optional int64 index_id = 2; optional int64 tablet_id = 3; + optional int64 num_segments = 4; } message PTabletInfo { diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 37d8b4f26100c4..8080b52ff483a1 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -91,10 +91,11 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { // success load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess") // StreamSinkFileWriter appendv write segment failed two replica - load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "replica num 1 < load required replica num 2") + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed") // StreamSinkFileWriter appendv write segment failed all replica load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") - + // test segment num check when LoadStreamStub missed tail segments + load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") sql """ set enable_memtable_on_sink_node=false """ } }