Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ddl job CreateTables in TiFlash #4201

Merged
merged 15 commits into from
Mar 11, 2022
1 change: 1 addition & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ DBGInvoker::DBGInvoker()
regSchemalessFunc("rename_column_in_tidb_table", MockTiDBTable::dbgFuncRenameColumnInTiDBTable);
regSchemalessFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regSchemalessFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);
regSchemalessFunc("create_tidb_tables", MockTiDBTable::dbgFuncCreateTiDBTables);

regSchemalessFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

Expand Down
29 changes: 29 additions & 0 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,35 @@ TableID MockTiDB::newTable(
return addTable(database_name, std::move(*table_info));
}

int MockTiDB::newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
const String & engine_type)
{
std::lock_guard lock(tables_mutex);

for (const auto & [table_name, columns, handle_pk_name] : tables)
{
String qualified_name = database_name + "." + table_name;
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
if (tables_by_name.find(qualified_name) != tables_by_name.end())
{
throw Exception("Mock TiDB table " + qualified_name + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS);
}

if (databases.find(database_name) == databases.end())
{
throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR);
}

auto table_info = parseColumns(table_name, columns, handle_pk_name, engine_type);
table_info->id = table_id_allocator++;
table_info->update_timestamp = tso;
addTable(database_name, std::move(*table_info));
}
return 0;
}

TableID MockTiDB::addTable(const String & database_name, TiDB::TableInfo && table_info)
{
auto table = std::make_shared<Table>(database_name, databases[database_name], table_info.name, std::move(table_info));
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class MockTiDB : public ext::Singleton<MockTiDB>
const String & handle_pk_name,
const String & engine_type);

int newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
const String & engine_type);

TableID addTable(const String & database_name, TiDB::TableInfo && table_info);

static TiDB::TableInfoPtr parseColumns(
Expand Down
32 changes: 32 additions & 0 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,36 @@ void MockTiDBTable::dbgFuncCleanUpRegions(DB::Context & context, const DB::ASTs
output("all regions have been cleaned");
}

void MockTiDBTable::dbgFuncCreateTiDBTables(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 2)
throw Exception("Args not matched, should be: db_name, table_name, [table_name], ..., [table_name]", ErrorCodes::BAD_ARGUMENTS);
const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
auto db = context.getDatabase(database_name);

std::vector<std::tuple<String, ColumnsDescription, String>> tables;

for (ASTs::size_type i = 1; i < args.size(); i++)
{
String schema_str = "i Int64";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A hard-coded schema for all the tables?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since newTables is just a delegate of addTable. This ut is to ensure this behavior.
If it can not support other schemas, test for mock_tidb_table shall fail.

String table_name = fmt::format("t{}", i);
ASTPtr columns_ast;
ParserColumnDeclarationList schema_parser;
Tokens tokens(schema_str.data(), schema_str.data() + schema_str.length());
TokenIterator pos(tokens);
Expected expected;
if (!schema_parser.parse(pos, columns_ast, expected))
throw Exception("Invalid TiDB table schema", ErrorCodes::LOGICAL_ERROR);
ColumnsDescription columns
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);
tables.emplace_back(table_name, columns, "");
}
auto tso = context.getTMTContext().getPDClient()->getTS();
String engine_type("dt");
if (context.getTMTContext().getEngineType() == ::TiDB::StorageEngine::TMT)
engine_type = "tmt";
MockTiDB::instance().newTables(database_name, tables, tso, engine_type);
output("");
}

} // namespace DB
14 changes: 11 additions & 3 deletions dbms/src/Debug/dbgFuncMockTiDBTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

namespace DB
{

class Context;

// TiDB table test tool
struct MockTiDBTable
{

// Inject mocked TiDB table.
// Usage:

Expand Down Expand Up @@ -84,9 +82,19 @@ struct MockTiDBTable
// ./storages-client.sh "DBGInvoke clean_up_region()"
static void dbgFuncCleanUpRegions(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Trigger a create tables ddl job.
// Usage:
// ./storage-client.sh "DBGInvoke create_tidb_tables(db_name, table_name, [table_name], ..., [table_name])"
static void dbgFuncCreateTiDBTables(Context & context, const ASTs & args, DBGInvoker::Printer output);

private:
static void dbgFuncDropTiDBTableImpl(
Context & context, String database_name, String table_name, bool drop_regions, bool is_drop_db, DBGInvoker::Printer output);
Context & context,
String database_name,
String table_name,
bool drop_regions,
bool is_drop_db,
DBGInvoker::Printer output);
};

} // namespace DB
10 changes: 7 additions & 3 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
#include <Databases/DatabaseTiFlash.h>
#include <Debug/dbgFuncSchema.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserCreateQuery.h>
#include <Storages/IManageableStorage.h>
#include <Storages/Transaction/SchemaSyncService.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiDB.h>
#include <fmt/core.h>

#include <ext/singleton.h>

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -56,7 +60,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer
{
auto & service = context.getSchemaSyncService();
Timestamp gc_safe_point = 0;
if (args.size() == 0)
if (args.empty())
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
else
gc_safe_point = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[0]).value);
Expand All @@ -76,7 +80,7 @@ void dbgFuncResetSchemas(Context & context, const ASTs &, DBGInvoker::Printer ou

void dbgFuncIsTombstone(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1 || args.size() > 2)
if (args.empty() || args.size() > 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
Expand All @@ -103,4 +107,4 @@ void dbgFuncIsTombstone(Context & context, const ASTs & args, DBGInvoker::Printe
output(fmt_buf.toString());
}

} // namespace DB
} // namespace DB
25 changes: 21 additions & 4 deletions dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,23 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
return;
}

if (diff.type == SchemaActionType::CreateTables)
{
for (auto && opt : diff.affected_opts)
{
SchemaDiff new_diff;
new_diff.type = SchemaActionType::CreateTable;
new_diff.version = diff.version;
new_diff.schema_id = opt.schema_id;
new_diff.table_id = opt.table_id;
new_diff.old_schema_id = opt.old_schema_id;
new_diff.old_table_id = opt.old_table_id;
LOG_FMT_WARNING(log, "find CreateTables of version {} schema {} table {}", new_diff.version, opt.schema_id, opt.table_id);
applyDiff(new_diff);
}
return;
}

auto db_info = getter.getDatabase(diff.schema_id);
if (db_info == nullptr)
throw TiFlashException("miss database: " + std::to_string(diff.schema_id), Errors::DDL::StaleSchema);
Expand Down Expand Up @@ -564,14 +581,14 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(TiDB::DBInfoPtr db_in
updated_table_info.partition = table_info->partition;

/// Apply changes to physical tables.
for (auto orig_def : orig_defs)
for (const auto & orig_def : orig_defs)
{
if (new_part_id_set.count(orig_def.id) == 0)
{
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id);
}
}
for (auto new_def : new_defs)
for (const auto & new_def : new_defs)
{
if (orig_part_id_set.count(new_def.id) == 0)
{
Expand Down Expand Up @@ -979,7 +996,7 @@ void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(DBInfoPtr db_in
/// Check if this is a RECOVER table.
{
auto & tmt_context = context.getTMTContext();
if (auto storage = tmt_context.getStorages().get(table_info->id).get(); storage)
if (auto * storage = tmt_context.getStorages().get(table_info->id).get(); storage)
{
if (!storage->isTombstone())
{
Expand Down Expand Up @@ -1093,7 +1110,7 @@ template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyDropTable(DBInfoPtr db_info, TableID table_id)
{
auto & tmt_context = context.getTMTContext();
auto storage = tmt_context.getStorages().get(table_id).get();
auto * storage = tmt_context.getStorages().get(table_id).get();
if (storage == nullptr)
{
LOG_DEBUG(log, "table " << table_id << " does not exist.");
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ struct SchemaBuilder
private:
void applyDropSchema(DatabaseID schema_id);

/// Parameter schema_name should be mapped.
void applyDropSchema(const String & schema_name);
/// Parameter db_name should be mapped.
void applyDropSchema(const String & db_name);

bool applyCreateSchema(DatabaseID schema_id);

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/SchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum class SchemaActionType : Int8
RebaseAutoRandomBase = 40,
AlterIndexVisibility = 41,
ExchangeTablePartition = 42,
CreateTables = 60,
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
};

struct AffectedOption
Expand Down
29 changes: 29 additions & 0 deletions tests/delta-merge-test/raft/schema/create_tidb_tables.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('false')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.t1
=> drop table if exists default.t2
=> drop table if exists default.t3
=> DBGInvoke __refresh_schemas()

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# create table and insert some rows
=> DBGInvoke __create_tidb_tables(default, t1, t2, t3)
=> DBGInvoke __refresh_schemas()
=> select database,name,engine from system.tables where database='default' and name='t1'
┌─database─┬─name─┬─engine─────┐
│ default │ t1 │ DeltaMerge │
└──────────┴──────┴────────────┘
=> select database,name,engine from system.tables where database='default' and name='t2'
┌─database─┬─name─┬─engine─────┐
│ default │ t2 │ DeltaMerge │
└──────────┴──────┴────────────┘
=> select database,name,engine from system.tables where database='default' and name='t3'
┌─database─┬─name─┬─engine─────┐
│ default │ t3 │ DeltaMerge │
└──────────┴──────┴────────────┘
=> drop table if exists default.t1
=> drop table if exists default.t2
=> drop table if exists default.t3