Skip to content

Commit

Permalink
Enable PageStorage V3 in UT (#4884)
Browse files Browse the repository at this point in the history
close #4835
  • Loading branch information
hehechen authored May 25, 2022
1 parent 2d57895 commit 0f56b15
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 26 deletions.
27 changes: 13 additions & 14 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ struct ContextShared
PathCapacityMetricsPtr path_capacity_ptr; /// Path capacity metrics
FileProviderPtr file_provider; /// File provider.
IORateLimiter io_rate_limiter;
PageStorageRunMode storage_run_mode;
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalStoragePoolPtr global_storage_pool;
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.

Expand Down Expand Up @@ -714,7 +714,7 @@ Dependencies Context::getDependencies(const String & database_name, const String
checkDatabaseAccessRightsImpl(db);
}

ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name));
auto iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name));
if (iter == shared->view_dependencies.end())
return {};

Expand All @@ -728,7 +728,7 @@ bool Context::isTableExist(const String & database_name, const String & table_na
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRightsImpl(db);

Databases::const_iterator it = shared->databases.find(db);
auto it = shared->databases.find(db);
return shared->databases.end() != it
&& it->second->isTableExist(*this, table_name);
}
Expand All @@ -754,7 +754,7 @@ void Context::assertTableExists(const String & database_name, const String & tab
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRightsImpl(db);

Databases::const_iterator it = shared->databases.find(db);
auto it = shared->databases.find(db);
if (shared->databases.end() == it)
throw Exception(fmt::format("Database {} doesn't exist", backQuoteIfNeed(db)), ErrorCodes::UNKNOWN_DATABASE);

Expand All @@ -771,7 +771,7 @@ void Context::assertTableDoesntExist(const String & database_name, const String
if (check_database_access_rights)
checkDatabaseAccessRightsImpl(db);

Databases::const_iterator it = shared->databases.find(db);
auto it = shared->databases.find(db);
if (shared->databases.end() != it && it->second->isTableExist(*this, table_name))
throw Exception(fmt::format("Table {}.{} already exists.", backQuoteIfNeed(db), backQuoteIfNeed(table_name)), ErrorCodes::TABLE_ALREADY_EXISTS);
}
Expand Down Expand Up @@ -826,7 +826,7 @@ Tables Context::getExternalTables() const

StoragePtr Context::tryGetExternalTable(const String & table_name) const
{
TableAndCreateASTs::const_iterator jt = external_tables.find(table_name);
auto jt = external_tables.find(table_name);
if (external_tables.end() == jt)
return StoragePtr();

Expand Down Expand Up @@ -864,7 +864,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRightsImpl(db);

Databases::const_iterator it = shared->databases.find(db);
auto it = shared->databases.find(db);
if (shared->databases.end() == it)
{
if (exception)
Expand Down Expand Up @@ -894,7 +894,7 @@ void Context::addExternalTable(const String & table_name, const StoragePtr & sto

StoragePtr Context::tryRemoveExternalTable(const String & table_name)
{
TableAndCreateASTs::const_iterator it = external_tables.find(table_name);
auto it = external_tables.find(table_name);

if (external_tables.end() == it)
return StoragePtr();
Expand Down Expand Up @@ -954,7 +954,7 @@ std::unique_ptr<DDLGuard> Context::getDDLGuardIfTableDoesntExist(const String &
{
auto lock = getLock();

Databases::const_iterator it = shared->databases.find(database);
auto it = shared->databases.find(database);
if (shared->databases.end() != it && it->second->isTableExist(*this, table))
return {};

Expand Down Expand Up @@ -993,7 +993,7 @@ ASTPtr Context::getCreateTableQuery(const String & database_name, const String &

ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const
{
TableAndCreateASTs::const_iterator jt = external_tables.find(table_name);
auto jt = external_tables.find(table_name);
if (external_tables.end() == jt)
throw Exception(fmt::format("Temporary table {} doesn't exist", backQuoteIfNeed(table_name)), ErrorCodes::UNKNOWN_TABLE);

Expand Down Expand Up @@ -1088,7 +1088,7 @@ void Context::setCurrentQueryId(const String & query_id)
UInt64 a;
UInt64 b;
};
} random;
} random{};

{
auto lock = getLock();
Expand Down Expand Up @@ -1650,9 +1650,8 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
auto lock = getLock();
if (shared->global_storage_pool)
{
// Can't init GlobalStoragePool twice.
// otherwise the pagestorage instances in `StoragePool` for each table won't be updated and cause unexpected problem.
throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR);
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized.");
}
CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast<UInt8>(shared->storage_run_mode));
if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3)
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ dt_page_gc_low_write_prob = 0.2
std::unique_ptr<StoragePathPool> path_pool = std::make_unique<StoragePathPool>(global_ctx.getPathPool().withTable("test", "t1", false));
std::unique_ptr<DM::StoragePool> storage_pool = std::make_unique<DM::StoragePool>(global_ctx, /*ns_id*/ 100, *path_pool, "test.t1");

auto verify_storage_pool_reload_config = [&global_ctx](std::unique_ptr<DM::StoragePool> & storage_pool) {
auto verify_storage_pool_reload_config = [&](std::unique_ptr<DM::StoragePool> & storage_pool) {
DB::Settings & settings = global_ctx.getSettingsRef();

auto cfg = storage_pool->data_storage_v2->getSettings();
auto cfg = storage_pool->dataWriter()->getSettings();
EXPECT_NE(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num);
EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
Expand All @@ -384,9 +384,9 @@ dt_page_gc_low_write_prob = 0.2
EXPECT_NE(cfg.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_NE(cfg.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);

storage_pool->gc(settings, DM::StoragePool::Seconds(0));
global_ctx.getGlobalStoragePool()->gc();

cfg = storage_pool->data_storage_v2->getSettings();
cfg = storage_pool->dataWriter()->getSettings();
EXPECT_EQ(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num);
EXPECT_EQ(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_EQ(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
Expand Down
193 changes: 193 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ extern const char force_triggle_foreground_flush[];
extern const char force_set_segment_ingest_packs_fail[];
extern const char segment_merge_after_ingest_packs[];
extern const char force_set_segment_physical_split[];
extern const char force_set_page_file_write_errno[];
} // namespace FailPoints

namespace DM
Expand Down Expand Up @@ -495,6 +496,198 @@ try
}
CATCH

TEST_P(DeltaMergeStoreRWTest, WriteCrashBeforeWalWithoutCache)
try
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
const ColumnDefine col_i8_define(3, "i8", std::make_shared<DataTypeInt8>());
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines->emplace_back(col_str_define);
table_column_defines->emplace_back(col_i8_define);

store = reload(table_column_defines);
}

{
// check column structure
const auto & cols = store->getTableColumns();
ASSERT_EQ(cols.size(), 5UL);
const auto & str_col = cols[3];
ASSERT_EQ(str_col.name, col_str_define.name);
ASSERT_EQ(str_col.id, col_str_define.id);
ASSERT_TRUE(str_col.type->equals(*col_str_define.type));
const auto & i8_col = cols[4];
ASSERT_EQ(i8_col.name, col_i8_define.name);
ASSERT_EQ(i8_col.id, col_i8_define.id);
ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type));
}

const size_t num_rows_write = 128;
{
// write to store
Block block;
{
block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
// Add a column of col2:String for test
block.insert(DB::tests::createColumn<String>(
createNumberStrings(0, num_rows_write),
col_str_define.name,
col_str_define.id));
// Add a column of i8:Int8 for test
block.insert(DB::tests::createColumn<Int8>(
createSignedNumbers(0, num_rows_write),
col_i8_define.name,
col_i8_define.id));
}
db_context->getSettingsRef().dt_segment_delta_cache_limit_rows = 8;
FailPointHelper::enableFailPoint(FailPoints::force_set_page_file_write_errno);
ASSERT_THROW(store->write(*db_context, db_context->getSettingsRef(), block), DB::Exception);
try
{
store->write(*db_context, db_context->getSettingsRef(), block);
}
catch (DB::Exception & e)
{
if (e.code() != ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR)
throw;
}
}
FailPointHelper::disableFailPoint(FailPoints::force_set_page_file_write_errno);

{
// read all columns from store
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* expected_block_size= */ 1024)[0];

size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
{
num_rows_read += block.rows();
}
in->readSuffix();
ASSERT_EQ(num_rows_read, 0);
}
}
CATCH

TEST_P(DeltaMergeStoreRWTest, WriteCrashBeforeWalWithCache)
try
{
const ColumnDefine col_str_define(2, "col2", std::make_shared<DataTypeString>());
const ColumnDefine col_i8_define(3, "i8", std::make_shared<DataTypeInt8>());
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines->emplace_back(col_str_define);
table_column_defines->emplace_back(col_i8_define);

store = reload(table_column_defines);
}

{
// check column structure
const auto & cols = store->getTableColumns();
ASSERT_EQ(cols.size(), 5UL);
const auto & str_col = cols[3];
ASSERT_EQ(str_col.name, col_str_define.name);
ASSERT_EQ(str_col.id, col_str_define.id);
ASSERT_TRUE(str_col.type->equals(*col_str_define.type));
const auto & i8_col = cols[4];
ASSERT_EQ(i8_col.name, col_i8_define.name);
ASSERT_EQ(i8_col.id, col_i8_define.id);
ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type));
}

const size_t num_rows_write = 128;
{
// write to store
Block block;
{
block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
// Add a column of col2:String for test
block.insert(DB::tests::createColumn<String>(
createNumberStrings(0, num_rows_write),
col_str_define.name,
col_str_define.id));
// Add a column of i8:Int8 for test
block.insert(DB::tests::createColumn<Int8>(
createSignedNumbers(0, num_rows_write),
col_i8_define.name,
col_i8_define.id));
}

FailPointHelper::enableFailPoint(FailPoints::force_set_page_file_write_errno);
store->write(*db_context, db_context->getSettingsRef(), block);
ASSERT_THROW(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())), DB::Exception);
try
{
store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
}
catch (DB::Exception & e)
{
if (e.code() != ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR)
throw;
}
}
FailPointHelper::disableFailPoint(FailPoints::force_set_page_file_write_errno);

{
// read all columns from store
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* expected_block_size= */ 1024)[0];

size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == DMTestEnv::pk_name)
{
//printf("pk:%lld\n", c->getInt(i));
EXPECT_EQ(c->getInt(i), i);
}
else if (iter.name == col_str_define.name)
{
//printf("%s:%s\n", col_str_define.name.c_str(), c->getDataAt(i).data);
EXPECT_EQ(c->getDataAt(i), DB::toString(i));
}
else if (iter.name == col_i8_define.name)
{
//printf("%s:%lld\n", col_i8_define.name.c_str(), c->getInt(i));
Int64 num = i * (i % 2 == 0 ? -1 : 1);
EXPECT_EQ(c->getInt(i), num);
}
}
}
}
in->readSuffix();
ASSERT_EQ(num_rows_read, num_rows_write);
}
}
CATCH

TEST_P(DeltaMergeStoreRWTest, DeleteRead)
try
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ class PageWriter : private boost::noncopyable
// Only used for DATA transform data
void writeIntoV3(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const;

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
void writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const;

// A wrap of getSettings only used for `RegionPersister::gc`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic
storage_path_pool_v3 = std::make_unique<PathPool>(Strings{path}, Strings{path}, Strings{}, std::make_shared<PathCapacityMetrics>(0, paths, caps, Strings{}, caps), global_context.getFileProvider(), true);

global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE);
if (!global_context.getGlobalStoragePool())
global_context.initializeGlobalStoragePoolIfNeed(*storage_path_pool_v3);
}

void SetUp() override
{
auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext();
global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE);
TiFlashStorageTestBasic::SetUp();
const auto & path = getTemporaryPath();
createIfNotExist(path);

auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext();

std::vector<size_t> caps = {};
Strings paths = {path};
Expand All @@ -76,7 +75,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic

PageStorageRunMode reloadMixedStoragePool()
{
DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::MIX_MODE);
db_context->setPageStorageRunMode(PageStorageRunMode::MIX_MODE);
PageStorageRunMode run_mode = storage_pool_mix->restore();
page_writer_mix = storage_pool_mix->logWriter();
page_reader_mix = storage_pool_mix->logReader();
Expand All @@ -85,7 +84,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic

void reloadV2StoragePool()
{
DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::ONLY_V2);
db_context->setPageStorageRunMode(PageStorageRunMode::ONLY_V2);
storage_pool_v2->restore();
page_writer_v2 = storage_pool_v2->logWriter();
page_reader_v2 = storage_pool_v2->logReader();
Expand Down
Loading

0 comments on commit 0f56b15

Please sign in to comment.