Skip to content

Commit

Permalink
[test](move-memtable) add more injection test for load stream (apache…
Browse files Browse the repository at this point in the history
…#39383)

Add more injection test for load stream to improve coverage.
  • Loading branch information
kaijchen authored Aug 26, 2024
1 parent 2a5b30d commit 6b81529
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 6 deletions.
31 changes: 27 additions & 4 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,13 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header, file_type]() {
auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable {
signal::set_signal_task_id(_load_id);
g_load_stream_flush_running_threads << -1;
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
if (eos && st.ok()) {
DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
{ file_type = static_cast<FileType>(-1); });
if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
st = _load_stream_writer->close_writer(new_segid, file_type);
} else {
Expand All @@ -159,6 +161,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
file_type, new_segid);
}
}
DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok() && _status.ok()) {
_status = st;
LOG(WARNING) << "write data failed " << st << ", " << *this;
Expand Down Expand Up @@ -188,7 +192,12 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
auto st = flush_token->submit_func(flush_func);
Status st = Status::OK();
DBUG_EXECUTE_IF("TabletStream.append_data.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
st = flush_token->submit_func(flush_func);
}
if (!st.ok()) {
_status = st;
}
Expand Down Expand Up @@ -222,6 +231,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
src_id, segid);
return _status;
}
DBUG_EXECUTE_IF("TabletStream.add_segment.segid_never_written",
{ segid = _segids_mapping[src_id]->size(); });
if (segid >= _segids_mapping[src_id]->size()) {
_status = Status::InternalError(
"add segment failed, segment is never written, src_id={}, segment_id={}",
Expand All @@ -235,13 +246,20 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok() && _status.ok()) {
_status = st;
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
auto st = flush_token->submit_func(add_segment_func);
Status st = Status::OK();
DBUG_EXECUTE_IF("TabletStream.add_segment.submit_func_failed",
{ st = Status::InternalError("fault injection"); });
if (st.ok()) {
st = flush_token->submit_func(add_segment_func);
}
if (!st.ok()) {
_status = st;
}
Expand Down Expand Up @@ -274,6 +292,7 @@ Status TabletStream::close() {
return _status;
}

DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
if (_next_segid.load() != _num_segments) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
Expand Down Expand Up @@ -519,7 +538,11 @@ void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {

Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
for (;;) {
int ret = brpc::StreamWrite(stream, buf);
int ret = 0;
DBUG_EXECUTE_IF("LoadStream._write_stream.EAGAIN", { ret = EAGAIN; });
if (ret == 0) {
ret = brpc::StreamWrite(stream, buf);
}
switch (ret) {
case 0:
return Status::OK();
Expand Down
37 changes: 36 additions & 1 deletion be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ LoadStreamWriter::~LoadStreamWriter() {
}

Status LoadStreamWriter::init() {
DBUG_EXECUTE_IF("LoadStreamWriter.init.failure",
{ return Status::InternalError("fault injection"); });
RETURN_IF_ERROR(_rowset_builder->init());
_rowset_writer = _rowset_builder->rowset_writer();
_is_init = true;
Expand All @@ -110,6 +112,8 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB
Status st;
io::FileWriterPtr file_writer;
st = _rowset_writer->create_file_writer(i, file_writer, file_type);
DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
_is_canceled = true;
return st;
Expand All @@ -127,6 +131,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB
if (file_writer == nullptr) {
return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
}
DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; });
if (file_writer->bytes_appended() != offset) {
return Status::Corruption(
"append_data out-of-order in segment={}, expected offset={}, actual={}",
Expand Down Expand Up @@ -195,6 +200,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st
}
}

DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; });
if (segment_file_size + inverted_file_size != stat.data_size) {
return Status::Corruption(
"add_segment failed, segment stat {} does not match, file size={}, inverted file "
Expand All @@ -210,17 +216,27 @@ Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, siz
auto& file_writers =
(file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers;

DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment",
{ segid = file_writers.size(); });
if (segid >= file_writers.size()) {
return Status::Corruption("calc file size failed, file {} is never opened, file type is {}",
segid, file_type);
}
file_writer = file_writers[segid].get();
DBUG_EXECUTE_IF("LoadStreamWriter.calc_file_size.null_file_writer", { file_writer = nullptr; });
DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer",
{ file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption(
"calc file size failed, file writer {} is destoryed, file type is {}", segid,
file_type);
}
DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", {
io::FileWriterPtr fwriter;
static_cast<void>(_rowset_writer->create_file_writer(file_writers.size(), fwriter,
FileType::SEGMENT_FILE));
file_writers.push_back(std::move(fwriter));
file_writer = file_writers.back().get();
});
if (file_writer->state() != io::FileWriter::State::CLOSED) {
return Status::Corruption("calc file size failed, file {} is not closed",
file_writer->path().native());
Expand All @@ -244,23 +260,42 @@ Status LoadStreamWriter::close() {
DCHECK(_is_init)
<< "rowset builder is supposed be to initialized before close_wait() being called";

DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; });
if (_is_canceled) {
return Status::InternalError("flush segment failed");
}
DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", {
io::FileWriterPtr file_writer;
static_cast<void>(_rowset_writer->create_file_writer(
_inverted_file_writers.size(), file_writer, FileType::INVERTED_INDEX_FILE));
_inverted_file_writers.push_back(std::move(file_writer));
});
if (_inverted_file_writers.size() > 0 &&
_inverted_file_writers.size() != _segment_file_writers.size()) {
return Status::Corruption(
"LoadStreamWriter close failed, inverted file writer size is {},"
"segment file writer size is {}",
_inverted_file_writers.size(), _segment_file_writers.size());
}
DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", {
io::FileWriterPtr file_writer;
static_cast<void>(_rowset_writer->create_file_writer(_segment_file_writers.size(),
file_writer, FileType::SEGMENT_FILE));
_segment_file_writers.push_back(std::move(file_writer));
});
for (const auto& writer : _segment_file_writers) {
if (writer->state() != io::FileWriter::State::CLOSED) {
return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed",
writer->path().native());
}
}

DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", {
io::FileWriterPtr file_writer;
static_cast<void>(_rowset_writer->create_file_writer(
_inverted_file_writers.size(), file_writer, FileType::INVERTED_INDEX_FILE));
_inverted_file_writers.push_back(std::move(file_writer));
});
for (const auto& writer : _inverted_file_writers) {
if (writer->state() != io::FileWriter::State::CLOSED) {
return Status::Corruption(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,53 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "")
// LoadStreamWriter append_data meet null file writer error
load_with_injection("LoadStreamWriter.append_data.null_file_writer", "")
load_with_injection("LoadStreamWriter.append_data.create_file_writer_failed", "")
load_with_injection("LoadStreamWriter.append_data.wrong_offset", "")
// LoadStreamWriter close_writer meet not bad segid error
load_with_injection("LoadStreamWriter.close_writer.bad_segid", "")
// LoadStreamWriter close_writer meet null file writer error
load_with_injection("LoadStreamWriter.close_writer.null_file_writer", "")
// LoadStreamWriter close_writer meet file writer failed to close error
load_with_injection("LocalFileWriter.close.failed", "")
load_with_injection("LoadStreamWriter.close.cancelled", "")
load_with_injection("LoadStreamWriter.close.inverted_writers_size_not_match", "")
load_with_injection("LoadStreamWriter.close.file_not_closed", "")
load_with_injection("LoadStreamWriter.close.inverted_file_not_closed", "")
// LoadStreamWriter close_writer meet bytes_appended and real file size not match error
load_with_injection("FileWriter.close_writer.zero_bytes_appended", "")
// LoadStreamWriter close_writer/add_segment meet not inited error
load_with_injection("TabletStream.init.uninited_writer", "")
// LoadStreamWriter init failure
load_with_injection("LoadStreamWriter.init.failure", "")
// LoadStreamWriter add_segment meet not bad segid error
load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
load_with_injection("LoadStreamWriter.add_segment.size_not_match", "")
load_with_injection("LoadStreamWriter._calc_file_size.unknown_segment", "")
// LoadStreamWriter add_segment meet null file writer error
load_with_injection("LoadStreamWriter.calc_file_size.null_file_writer", "")
load_with_injection("LoadStreamWriter._calc_file_size.null_file_writer", "")
load_with_injection("LoadStreamWriter._calc_file_size.file_not_closed", "")
// LoadStreamWriter add_segment meet bytes_appended and real file size not match error
load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
// LoadStream init failed coz LoadStreamWriter init failed
load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version", "")
// LoadStream add_segment meet unknown segid in request header
load_with_injection("TabletStream.add_segment.unknown_segid", "")
// LoadStream add_segment meet unknown segid in request header
load_with_injection("TabletStream.add_segment.segid_never_written", "")
// LoadStream add_segment meet LoadStreamWriter add segment failure
load_with_injection("TabletStream.add_segment.add_segment_failed", "")
// LoadStream add_segment meet submit function failure
load_with_injection("TabletStream.add_segment.submit_func_failed", "")
// LoadStream append_data meet unknown file type
load_with_injection("TabletStream.append_data.unknown_file_type", "")
// LoadStream append_data meet LoadStremWriter append or close failed
load_with_injection("TabletStream.append_data.append_failed", "")
// LoadStream append_data meet submit function failure
load_with_injection("TabletStream.append_data.submit_func_failed", "")
// LoadStream append_data meet unknown index id in request header
load_with_injection("TabletStream._append_data.unknown_indexid", "")
// LoadStream close meet segment num mismatch
load_with_injection("TabletStream.close.segment_num_mismatch", "")
// LoadStream dispatch meet unknown load id
load_with_injection("LoadStream._dispatch.unknown_loadid", "")
// LoadStream dispatch meet unknown src id
Expand Down

0 comments on commit 6b81529

Please sign in to comment.