Skip to content

Commit

Permalink
add failpoint test
Browse files Browse the repository at this point in the history
  • Loading branch information
hehechen committed May 13, 2022
1 parent efc93ad commit f14952e
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 3 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_when_read_from_log) \
M(exception_mpp_hash_build) \
M(exception_before_drop_segment) \
M(exception_after_drop_segment)
M(exception_after_drop_segment) \
M(exception_after_write_blob)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
192 changes: 192 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ extern const char force_triggle_foreground_flush[];
extern const char force_set_segment_ingest_packs_fail[];
extern const char segment_merge_after_ingest_packs[];
extern const char force_set_segment_physical_split[];
extern const char exception_after_write_blob[];
} // namespace FailPoints

namespace DM
Expand Down Expand Up @@ -491,6 +492,197 @@ try
}
CATCH

TEST_P(DeltaMergeStoreRWTest, WriteCrashBeforeWalWithoutCache)
try
{

const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
const ColumnDefine col_i8_define(3, "i8", std::make_shared<DataTypeInt8>());
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines->emplace_back(col_str_define);
table_column_defines->emplace_back(col_i8_define);

store = reload(table_column_defines);
}

{
// check column structure
const auto & cols = store->getTableColumns();
ASSERT_EQ(cols.size(), 5UL);
const auto & str_col = cols[3];
ASSERT_EQ(str_col.name, col_str_define.name);
ASSERT_EQ(str_col.id, col_str_define.id);
ASSERT_TRUE(str_col.type->equals(*col_str_define.type));
const auto & i8_col = cols[4];
ASSERT_EQ(i8_col.name, col_i8_define.name);
ASSERT_EQ(i8_col.id, col_i8_define.id);
ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type));
}

const size_t num_rows_write = 128;
{
// write to store
Block block;
{
block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
// Add a column of col2:String for test
block.insert(DB::tests::createColumn<String>(
createNumberStrings(0, num_rows_write),
col_str_define.name,
col_str_define.id));
// Add a column of i8:Int8 for test
block.insert(DB::tests::createColumn<Int8>(
createSignedNumbers(0, num_rows_write),
col_i8_define.name,
col_i8_define.id));
}
db_context->getSettingsRef().dt_segment_delta_cache_limit_rows = 8;
FailPointHelper::enableFailPoint(FailPoints::exception_after_write_blob);
try
{
store->write(*db_context, db_context->getSettingsRef(), block);
}
catch (DB::Exception & e)
{
if (e.code() != ErrorCodes::FAIL_POINT_ERROR)
throw;
}
}
FailPointHelper::disableFailPoint(FailPoints::exception_after_write_blob);

{
// read all columns from store
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* expected_block_size= */ 1024)[0];

size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
{
num_rows_read += block.rows();
}
in->readSuffix();
ASSERT_EQ(num_rows_read, 0);
}
}
CATCH

TEST_P(DeltaMergeStoreRWTest, WriteCrashBeforeWalWithCache)
try
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
const ColumnDefine col_i8_define(3, "i8", std::make_shared<DataTypeInt8>());
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines->emplace_back(col_str_define);
table_column_defines->emplace_back(col_i8_define);

store = reload(table_column_defines);
}

{
// check column structure
const auto & cols = store->getTableColumns();
ASSERT_EQ(cols.size(), 5UL);
const auto & str_col = cols[3];
ASSERT_EQ(str_col.name, col_str_define.name);
ASSERT_EQ(str_col.id, col_str_define.id);
ASSERT_TRUE(str_col.type->equals(*col_str_define.type));
const auto & i8_col = cols[4];
ASSERT_EQ(i8_col.name, col_i8_define.name);
ASSERT_EQ(i8_col.id, col_i8_define.id);
ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type));
}

const size_t num_rows_write = 128;
{
// write to store
Block block;
{
block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
// Add a column of col2:String for test
block.insert(DB::tests::createColumn<String>(
createNumberStrings(0, num_rows_write),
col_str_define.name,
col_str_define.id));
// Add a column of i8:Int8 for test
block.insert(DB::tests::createColumn<Int8>(
createSignedNumbers(0, num_rows_write),
col_i8_define.name,
col_i8_define.id));
}

FailPointHelper::enableFailPoint(FailPoints::exception_after_write_blob);
try
{
store->write(*db_context, db_context->getSettingsRef(), block);
store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
}
catch (DB::Exception & e)
{
if (e.code() != ErrorCodes::FAIL_POINT_ERROR)
throw;
}
}
FailPointHelper::disableFailPoint(FailPoints::exception_after_write_blob);

{
// read all columns from store
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* expected_block_size= */ 1024)[0];

size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == DMTestEnv::pk_name)
{
//printf("pk:%lld\n", c->getInt(i));
EXPECT_EQ(c->getInt(i), i);
}
else if (iter.name == col_str_define.name)
{
//printf("%s:%s\n", col_str_define.name.c_str(), c->getDataAt(i).data);
EXPECT_EQ(c->getDataAt(i), DB::toString(i));
}
else if (iter.name == col_i8_define.name)
{
//printf("%s:%lld\n", col_i8_define.name.c_str(), c->getInt(i));
Int64 num = i * (i % 2 == 0 ? -1 : 1);
EXPECT_EQ(c->getInt(i), num);
}
}
}
}
in->readSuffix();
ASSERT_EQ(num_rows_read, num_rows_write);
}
}
CATCH

TEST_P(DeltaMergeStoreRWTest, DeleteRead)
try
{
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Checksum.h>
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
Expand Down Expand Up @@ -48,6 +49,11 @@ extern const int LOGICAL_ERROR;
extern const int CHECKSUM_DOESNT_MATCH;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char exception_after_write_blob[];
}

namespace PS::V3
{
static constexpr bool BLOBSTORE_CHECKSUM_ON_READ = true;
Expand Down Expand Up @@ -251,14 +257,18 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
{
auto blob_file = getBlobFile(blob_id);
blob_file->write(buffer, offset_in_file, all_page_data_size, write_limiter);
fiu_do_on(FailPoints::exception_after_write_blob,
{
throw Exception(fmt::format("Fail point {} is triggered.", FailPoints::exception_after_write_blob),
ErrorCodes::FAIL_POINT_ERROR);
});
}
catch (DB::Exception & e)
{
removePosFromStats(blob_id, offset_in_file, actually_allocated_size);
LOG_FMT_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] [actually_allocated_size={}] write failed.", blob_id, offset_in_file, all_page_data_size, actually_allocated_size);
throw e;
}

return edit;
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageEntriesEdit.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Storages/Page/WriteBatch.h>
#include <common/types.h>
#include <fmt/format.h>
#include <common/logger_useful.h>

namespace DB::PS::V3
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace FailPoints
{
extern const char exception_before_page_file_write_sync[];
extern const char force_set_page_file_write_errno[];
extern const char exception_after_write_blob[];
} // namespace FailPoints

namespace PS::V3::tests
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/TiFlashTestEnv.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class TiFlashTestEnv

static Context getContext(const DB::Settings & settings = DB::Settings(), Strings testdata_path = {});

static void initializeGlobalContext(Strings testdata_path = {}, bool enable_ps_v3 = false);
static void initializeGlobalContext(Strings testdata_path = {}, bool enable_ps_v3 = true);
static Context & getGlobalContext() { return *global_context; }
static void shutdown();

Expand Down

0 comments on commit f14952e

Please sign in to comment.