Skip to content

Commit

Permalink
Test: Bind delta merge storage with executor ut (pingcap#6561)
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored and JaySon-Huang committed Jan 4, 2023
1 parent 25eaa58 commit 3887b91
Show file tree
Hide file tree
Showing 30 changed files with 691 additions and 163 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
#include <Common/FmtUtils.h>
#include <Debug/MockComputeServerManager.h>
#include <Debug/MockStorage.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Storages/Transaction/TMTContext.h>
#include <TestUtils/TiFlashTestEnv.h>
Expand Down Expand Up @@ -71,7 +72,7 @@ void MockComputeServerManager::startServers(const LoggerPtr & log_ptr, int start
prepareMockMPPServerInfo();
}

void MockComputeServerManager::setMockStorage(MockStorage & mock_storage)
void MockComputeServerManager::setMockStorage(MockStorage * mock_storage)
{
for (const auto & server : server_map)
{
Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#pragma once

#include <Debug/MockStorage.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Server/FlashGrpcServerHolder.h>

namespace DB::tests
namespace DB
{
class MockStorage;
namespace tests
{
/** Hold Mock Compute Server to manage the lifetime of them.
* Maintains Mock Compute Server info.
Expand All @@ -35,7 +37,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>
void startServers(const LoggerPtr & log_ptr, int start_idx);

/// set MockStorage for Compute Server in order to mock input columns.
void setMockStorage(MockStorage & mock_storage);
void setMockStorage(MockStorage * mock_storage);

/// stop all servers.
void reset();
Expand All @@ -58,4 +60,5 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>
std::unordered_map<size_t, std::unique_ptr<FlashGrpcServerHolder>> server_map;
std::unordered_map<size_t, MockServerConfig> server_config_map;
};
} // namespace DB::tests
} // namespace tests
} // namespace DB
175 changes: 173 additions & 2 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,19 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <DataStreams/IBlockOutputStream.h>
#include <Debug/MockStorage.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageDeltaMerge.h>

namespace DB::tests
namespace DB
{
/// for table scan
void MockStorage::addTableSchema(const String & name, const MockColumnInfoVec & columnInfos)
{
name_to_id_map[name] = MockTableIdGenerator::instance().nextTableId();
Expand Down Expand Up @@ -63,6 +71,145 @@ MockColumnInfoVec MockStorage::getTableSchema(const String & name)
throw Exception(fmt::format("Failed to get table schema by table name '{}'", name));
}

/// for delta merge
void MockStorage::addTableSchemaForDeltaMerge(const String & name, const MockColumnInfoVec & columnInfos)
{
name_to_id_map_for_delta_merge[name] = MockTableIdGenerator::instance().nextTableId();
table_schema_for_delta_merge[getTableIdForDeltaMerge(name)] = columnInfos;
addTableInfoForDeltaMerge(name, columnInfos);
}

void MockStorage::addTableDataForDeltaMerge(Context & context, const String & name, ColumnsWithTypeAndName & columns)
{
auto table_id = getTableIdForDeltaMerge(name);
addNamesAndTypesForDeltaMerge(table_id, columns);
if (storage_delta_merge_map.find(table_id) == storage_delta_merge_map.end())
{
// init
ASTPtr astptr(new ASTIdentifier(name, ASTIdentifier::Kind::Table));
NamesAndTypesList names_and_types_list;
for (const auto & column : columns)
{
names_and_types_list.emplace_back(column.name, column.type);
}
astptr->children.emplace_back(new ASTIdentifier(columns[0].name));

storage_delta_merge_map[table_id] = StorageDeltaMerge::create("TiFlash",
/* db_name= */ "default",
name,
std::nullopt,
ColumnsDescription{names_and_types_list},
astptr,
0,
context);

auto storage = storage_delta_merge_map[table_id];
assert(storage);
storage->startup();

// write data to DeltaMergeStorage
ASTPtr insertptr(new ASTInsertQuery());
BlockOutputStreamPtr output = storage->write(insertptr, context.getSettingsRef());

Block insert_block{columns};

output->writePrefix();
output->write(insert_block);
output->writeSuffix();
}
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 id)
{
auto storage = storage_delta_merge_map[id];
auto column_infos = table_schema_for_delta_merge[id];
assert(storage);
assert(!column_infos.empty());
Names column_names;
for (const auto & column_info : column_infos)
column_names.push_back(column_info.first);

auto scan_context = std::make_shared<DM::ScanContext>();
QueryProcessingStage::Enum stage;
SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(context.getSettingsRef().resolve_locks, std::numeric_limits<UInt64>::max(), scan_context);
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams

BlockInputStreamPtr in = ins[0];
return in;
}

void MockStorage::addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns)
{
TableInfo table_info;
table_info.name = name;
table_info.id = getTableIdForDeltaMerge(name);
int i = 0;
for (const auto & column : columns)
{
TiDB::ColumnInfo ret;
std::tie(ret.name, ret.tp) = column;
// TODO: find a way to assign decimal field's flen.
if (ret.tp == TiDB::TP::TypeNewDecimal)
ret.flen = 65;
ret.id = i++;
table_info.columns.push_back(std::move(ret));
}
table_infos_for_delta_merge[name] = table_info;
}

void MockStorage::addNamesAndTypesForDeltaMerge(Int64 table_id, const ColumnsWithTypeAndName & columns)
{
NamesAndTypes names_and_types;
for (const auto & column : columns)
{
names_and_types.emplace_back(column.name, column.type);
}
names_and_types_map_for_delta_merge[table_id] = names_and_types;
}

Int64 MockStorage::getTableIdForDeltaMerge(const String & name)
{
if (name_to_id_map_for_delta_merge.find(name) != name_to_id_map_for_delta_merge.end())
{
return name_to_id_map_for_delta_merge[name];
}
throw Exception(fmt::format("Failed to get table id by table name '{}'", name));
}

bool MockStorage::tableExistsForDeltaMerge(Int64 table_id)
{
return table_schema_for_delta_merge.find(table_id) != table_schema_for_delta_merge.end();
}

MockColumnInfoVec MockStorage::getTableSchemaForDeltaMerge(const String & name)
{
if (tableExistsForDeltaMerge(getTableIdForDeltaMerge(name)))
{
return table_schema_for_delta_merge[getTableIdForDeltaMerge(name)];
}
throw Exception(fmt::format("Failed to get table schema by table name '{}'", name));
}

MockColumnInfoVec MockStorage::getTableSchemaForDeltaMerge(Int64 table_id)
{
if (tableExistsForDeltaMerge(table_id))
{
return table_schema_for_delta_merge[table_id];
}
throw Exception(fmt::format("Failed to get table schema by table id '{}'", table_id));
}

NamesAndTypes MockStorage::getNameAndTypesForDeltaMerge(Int64 table_id)
{
if (tableExistsForDeltaMerge(table_id))
{
return names_and_types_map_for_delta_merge[table_id];
}
throw Exception(fmt::format("Failed to get NamesAndTypes by table id '{}'", table_id));
}

/// for exchange receiver
void MockStorage::addExchangeSchema(const String & exchange_name, const MockColumnInfoVec & columnInfos)
{
Expand Down Expand Up @@ -107,6 +254,25 @@ MockColumnInfoVec MockStorage::getExchangeSchema(const String & exchange_name)
throw Exception(fmt::format("Failed to get exchange schema by exchange name '{}'", exchange_name));
}

void MockStorage::clear()
{
for (auto [_, storage] : storage_delta_merge_map)
{
storage->drop();
storage->removeFromTMTContext();
}
}

void MockStorage::setUseDeltaMerge(bool flag)
{
use_storage_delta_merge = flag;
}

bool MockStorage::useDeltaMerge() const
{
return use_storage_delta_merge;
}

// use this function to determine where to cut the columns,
// and how many rows are needed for each partition of MPP task.
CutColumnInfo getCutColumnInfo(size_t rows, Int64 partition_id, Int64 partition_num)
Expand Down Expand Up @@ -192,4 +358,9 @@ TableInfo MockStorage::getTableInfo(const String & name)
{
return table_infos[name];
}
} // namespace DB::tests

TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
{
return table_infos_for_delta_merge[name];
}
} // namespace DB
71 changes: 60 additions & 11 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@
// limitations under the License.
#pragma once
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <common/types.h>

#include <atomic>
#include <memory>
#include <unordered_map>
namespace DB::tests

namespace DB
{
class StorageDeltaMerge;
class Context;

using MockColumnInfo = std::pair<String, TiDB::TP>;
using MockColumnInfoVec = std::vector<MockColumnInfo>;
using TableInfo = TiDB::TableInfo;
Expand All @@ -41,6 +47,7 @@ class MockTableIdGenerator : public ext::Singleton<MockTableIdGenerator>
/** Responsible for mock data for executor tests and mpp tests.
* 1. Use this class to add mock table schema and table column data.
* 2. Use this class to add mock exchange schema and exchange column data.
* 3. Use this class to add table schema and table column data into StorageDeltaMerge.
*/
class MockStorage
{
Expand All @@ -50,46 +57,88 @@ class MockStorage

void addTableData(const String & name, ColumnsWithTypeAndName & columns);

Int64 getTableId(const String & name);
MockColumnInfoVec getTableSchema(const String & name);

ColumnsWithTypeAndName getColumns(Int64 table_id);

bool tableExists(Int64 table_id);

ColumnsWithTypeAndName getColumns(Int64 table_id);
/// for storage delta merge table scan
void addTableSchemaForDeltaMerge(const String & name, const MockColumnInfoVec & columnInfos);

MockColumnInfoVec getTableSchema(const String & name);
void addTableDataForDeltaMerge(Context & context, const String & name, ColumnsWithTypeAndName & columns);

MockColumnInfoVec getTableSchemaForDeltaMerge(const String & name);

MockColumnInfoVec getTableSchemaForDeltaMerge(Int64 table_id);

NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id);

BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id);

bool tableExistsForDeltaMerge(Int64 table_id);

/// for exchange receiver
void addExchangeSchema(const String & exchange_name, const MockColumnInfoVec & columnInfos);

void addExchangeData(const String & exchange_name, const ColumnsWithTypeAndName & columns);

bool exchangeExists(const String & executor_id);
bool exchangeExistsWithName(const String & name);

ColumnsWithTypeAndName getExchangeColumns(const String & executor_id);
MockColumnInfoVec getExchangeSchema(const String & exchange_name);

void addExchangeRelation(const String & executor_id, const String & exchange_name);

MockColumnInfoVec getExchangeSchema(const String & exchange_name);
ColumnsWithTypeAndName getExchangeColumns(const String & executor_id);

bool exchangeExists(const String & executor_id);

/// for MPP Tasks, it will split data by partition num, then each MPP service will have a subset of mock data.
ColumnsWithTypeAndName getColumnsForMPPTableScan(const TiDBTableScan & table_scan, Int64 partition_id, Int64 partition_num);

TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);

/// clear for StorageDeltaMerge
void clear();

void setUseDeltaMerge(bool flag);

bool useDeltaMerge() const;

private:
/// for mock table scan
std::unordered_map<String, Int64> name_to_id_map; /// <table_name, table_id>
std::unordered_map<Int64, MockColumnInfoVec> table_schema; /// <table_id, columnInfo>
std::unordered_map<Int64, ColumnsWithTypeAndName> table_columns; /// <table_id, columns>
std::unordered_map<String, TableInfo> table_infos;
std::unordered_map<String, TableInfo> table_infos; /// <table_name, table_info>

/// for mock exchange receiver
std::unordered_map<String, String> executor_id_to_name_map; /// <executor_id, exchange name>
std::unordered_map<String, MockColumnInfoVec> exchange_schemas; /// <exchange_name, columnInfo>
std::unordered_map<String, ColumnsWithTypeAndName> exchange_columns; /// <exchange_name, columns>

/// for mock storage delta merge
std::unordered_map<String, Int64> name_to_id_map_for_delta_merge; /// <table_name, table_id>
std::unordered_map<Int64, MockColumnInfoVec> table_schema_for_delta_merge; /// <table_id, columnInfo>
std::unordered_map<Int64, std::shared_ptr<StorageDeltaMerge>> storage_delta_merge_map; // <table_id, StorageDeltaMerge>
std::unordered_map<String, TableInfo> table_infos_for_delta_merge; /// <table_name, table_info>
std::unordered_map<Int64, NamesAndTypes> names_and_types_map_for_delta_merge; /// <table_id, NamesAndTypes>

// storage delta merge can be used in executor ut test only.
bool use_storage_delta_merge = false;

private:
/// for table scan
Int64 getTableId(const String & name);

void addTableInfo(const String & name, const MockColumnInfoVec & columns);

// for storage delta merge table scan
Int64 getTableIdForDeltaMerge(const String & name);

void addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns);

void addNamesAndTypesForDeltaMerge(Int64 table_id, const ColumnsWithTypeAndName & columns);
/// for exchange receiver
bool exchangeExistsWithName(const String & name);
};
} // namespace DB::tests
} // namespace DB
Loading

0 comments on commit 3887b91

Please sign in to comment.