Skip to content

Commit

Permalink
Merge branch 'master' into supress_grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Sep 6, 2022
2 parents 41d81be + f56b7b6 commit 3b4be84
Show file tree
Hide file tree
Showing 33 changed files with 759 additions and 352 deletions.
14 changes: 14 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/
Int64 current_amount = amount.load(std::memory_order_relaxed);
if (unlikely(!next.load(std::memory_order_relaxed) && accuracy_diff_for_test && real_rss > accuracy_diff_for_test + current_amount))
{
DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker accuracy ");
if (description)
fmt_buf.fmtAppend(" {}", description);

fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than tracked amount ({})",
formatReadableSizeWithBinarySuffix(real_rss),
formatReadableSizeWithBinarySuffix(current_amount));
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);

if (!next.load(std::memory_order_relaxed))
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;

/// To test the accuracy of memory track, it throws an exception when the part exceeding the tracked amount is greater than accuracy_diff_for_test.
Int64 accuracy_diff_for_test = 0;

/// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).
/// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.
std::atomic<MemoryTracker *> next{};
Expand Down Expand Up @@ -103,6 +106,8 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>

void setFaultProbability(double value) { fault_probability = value; }

void setAccuracyDiffForTest(double value) { accuracy_diff_for_test = value; }

/// next should be changed only once: from nullptr to some value.
void setNext(MemoryTracker * elem) { next.store(elem, std::memory_order_relaxed); }

Expand Down
10 changes: 2 additions & 8 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,10 @@ class DatabaseTiFlashTest : public ::testing::Test
static void recreateMetadataPath()
{
String path = TiFlashTestEnv::getContext().getPath();

auto p = path + "/metadata/";
if (Poco::File file(p); file.exists())
file.remove(true);
Poco::File{p}.createDirectories();

TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true);
p = path + "/data/";
if (Poco::File file(p); file.exists())
file.remove(true);
Poco::File{p}.createDirectories();
TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true);
}

protected:
Expand Down
29 changes: 16 additions & 13 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,33 @@ namespace DB
{
struct MockSchemaGetter
{
TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); }
static TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); }

Int64 getVersion() { return MockTiDB::instance().getVersion(); }
static Int64 getVersion() { return MockTiDB::instance().getVersion(); }

std::optional<SchemaDiff> getSchemaDiff(Int64 version)
static std::optional<SchemaDiff> getSchemaDiff(Int64 version)
{
return MockTiDB::instance().getSchemaDiff(version);
}

bool checkSchemaDiffExists(Int64 version)
static bool checkSchemaDiffExists(Int64 version)
{
return MockTiDB::instance().checkSchemaDiffExists(version);
}

TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id) { return MockTiDB::instance().getTableInfoByID(table_id); }
static TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id)
{
return MockTiDB::instance().getTableInfoByID(table_id);
}

std::vector<TiDB::DBInfoPtr> listDBs()
static std::vector<TiDB::DBInfoPtr> listDBs()
{
std::vector<TiDB::DBInfoPtr> res;
const auto & databases = MockTiDB::instance().getDatabases();
for (auto it = databases.begin(); it != databases.end(); it++)
for (const auto & database : databases)
{
auto db_id = it->second;
auto db_name = it->first;
auto db_id = database.second;
auto db_name = database.first;
TiDB::DBInfoPtr db_ptr = std::make_shared<TiDB::DBInfo>(TiDB::DBInfo());
db_ptr->id = db_id;
db_ptr->name = db_name;
Expand All @@ -55,15 +58,15 @@ struct MockSchemaGetter
return res;
}

std::vector<TiDB::TableInfoPtr> listTables(Int64 db_id)
static std::vector<TiDB::TableInfoPtr> listTables(Int64 db_id)
{
auto tables_by_id = MockTiDB::instance().getTables();
std::vector<TiDB::TableInfoPtr> res;
for (auto it = tables_by_id.begin(); it != tables_by_id.end(); it++)
for (auto & it : tables_by_id)
{
if (it->second->dbID() == db_id)
if (it.second->dbID() == db_id)
{
res.push_back(std::make_shared<TiDB::TableInfo>(TiDB::TableInfo(it->second->table_info)));
res.push_back(std::make_shared<TiDB::TableInfo>(TiDB::TableInfo(it.second->table_info)));
}
}
return res;
Expand Down
50 changes: 37 additions & 13 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <DataTypes/DataTypeDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeMyDate.h>
Expand All @@ -31,6 +32,8 @@
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>

#include <mutex>

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -350,27 +353,40 @@ Field getDefaultValue(const ASTPtr & default_value_ast)
return Field();
}

void MockTiDB::newPartition(const String & database_name, const String & table_name, TableID partition_id, Timestamp tso, bool is_add_part)
TableID MockTiDB::newPartition(TableID belong_logical_table, const String & partition_name, Timestamp tso, bool is_add_part)
{
std::lock_guard lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
TableInfo & table_info = table->table_info;
TablePtr logical_table = getTableByID(belong_logical_table);
TableID partition_id = table_id_allocator++; // allocate automatically

const auto & part_def = find_if(
table_info.partition.definitions.begin(),
table_info.partition.definitions.end(),
[&partition_id](PartitionDefinition & part_def) { return part_def.id == partition_id; });
if (part_def != table_info.partition.definitions.end())
throw Exception("Mock TiDB table " + database_name + "." + table_name + " already has partition " + std::to_string(partition_id),
ErrorCodes::LOGICAL_ERROR);
return newPartitionImpl(logical_table, partition_id, partition_name, tso, is_add_part);
}

TableID MockTiDB::newPartition(const String & database_name, const String & table_name, TableID partition_id, Timestamp tso, bool is_add_part)
{
std::lock_guard lock(tables_mutex);

TablePtr logical_table = getTableByNameInternal(database_name, table_name);
return newPartitionImpl(logical_table, partition_id, toString(partition_id), tso, is_add_part);
}

TableID MockTiDB::newPartitionImpl(const TablePtr & logical_table, TableID partition_id, const String & partition_name, Timestamp tso, bool is_add_part)
{
TableInfo & table_info = logical_table->table_info;
RUNTIME_CHECK_MSG(!logical_table->existPartitionID(partition_id),
"Mock TiDB table {}.{} already has partition {}, table_info={}",
logical_table->database_name,
logical_table->table_name,
partition_id,
table_info.serialize());

table_info.is_partition_table = true;
table_info.partition.enable = true;
table_info.partition.num++;
PartitionDefinition partition_def;
partition_def.id = partition_id;
partition_def.name = std::to_string(partition_id);
partition_def.name = partition_name;
table_info.partition.definitions.emplace_back(partition_def);
table_info.update_timestamp = tso;

Expand All @@ -380,11 +396,12 @@ void MockTiDB::newPartition(const String & database_name, const String & table_n

SchemaDiff diff;
diff.type = SchemaActionType::AddTablePartition;
diff.schema_id = table->database_id;
diff.table_id = table->id();
diff.schema_id = logical_table->database_id;
diff.table_id = logical_table->id();
diff.version = version;
version_diff[version] = diff;
}
return partition_id;
}

void MockTiDB::dropPartition(const String & database_name, const String & table_name, TableID partition_id)
Expand Down Expand Up @@ -631,6 +648,13 @@ TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const St
return it->second;
}

TablePtr MockTiDB::getTableByID(TableID table_id)
{
if (auto it = tables_by_id.find(table_id); it != tables_by_id.end())
return it->second;
throw Exception(fmt::format("Mock TiDB table does not exists, table_id={}", table_id), ErrorCodes::UNKNOWN_TABLE);
}

TiDB::TableInfoPtr MockTiDB::getTableInfoByID(TableID table_id)
{
auto it = tables_by_id.find(table_id);
Expand Down
26 changes: 19 additions & 7 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,32 @@ class MockTiDB : public ext::Singleton<MockTiDB>
public:
Table(const String & database_name, DatabaseID database_id, const String & table_name, TiDB::TableInfo && table_info);

TableID id() { return table_info.id; }
DatabaseID dbID() { return database_id; }
TableID id() const { return table_info.id; }
DatabaseID dbID() const { return database_id; }

ColumnID allocColumnID() { return ++col_id; }

bool isPartitionTable() { return table_info.is_partition_table; }
bool isPartitionTable() const { return table_info.is_partition_table; }

std::vector<TableID> getPartitionIDs()
std::vector<TableID> getPartitionIDs() const
{
std::vector<TableID> partition_ids;
std::for_each(
table_info.partition.definitions.begin(),
table_info.partition.definitions.end(),
[&](const TiDB::PartitionDefinition & part_def) { partition_ids.emplace_back(part_def.id); });
[&](const auto & part_def) { partition_ids.emplace_back(part_def.id); });
return partition_ids;
}

bool existPartitionID(TableID part_id) const
{
const auto & part_def = find_if(
table_info.partition.definitions.begin(),
table_info.partition.definitions.end(),
[&part_id](const auto & part_def) { return part_def.id == part_id; });
return part_def != table_info.partition.definitions.end();
}

TiDB::TableInfo table_info;

private:
Expand Down Expand Up @@ -89,7 +98,8 @@ class MockTiDB : public ext::Singleton<MockTiDB>

DatabaseID newDataBase(const String & database_name);

void newPartition(const String & database_name, const String & table_name, TableID partition_id, Timestamp tso, bool);
TableID newPartition(const String & database_name, const String & table_name, TableID partition_id, Timestamp tso, bool);
TableID newPartition(TableID belong_logical_table, const String & partition_name, Timestamp tso, bool);

void dropPartition(const String & database_name, const String & table_name, TableID partition_id);

Expand Down Expand Up @@ -135,13 +145,15 @@ class MockTiDB : public ext::Singleton<MockTiDB>

std::unordered_map<TableID, TablePtr> getTables() { return tables_by_id; }

Int64 getVersion() { return version; }
Int64 getVersion() const { return version; }

TableID newTableID() { return table_id_allocator++; }

private:
TableID newPartitionImpl(const TablePtr & logical_table, TableID partition_id, const String & partition_name, Timestamp tso, bool is_add_part);
TablePtr dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions);
TablePtr getTableByNameInternal(const String & database_name, const String & table_name);
TablePtr getTableByID(TableID table_id);

private:
std::mutex tables_mutex;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/ProcessList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ ProcessList::EntryPtr ProcessList::insert(
total_memory_tracker->setOrRaiseLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker->setBytesThatRssLargerThanLimit(settings.bytes_that_rss_larger_than_limit);
total_memory_tracker->setDescription("(total)");
total_memory_tracker->setAccuracyDiffForTest(settings.memory_tracker_accuracy_diff_for_test);
user_process_list.user_memory_tracker->setNext(total_memory_tracker.get());
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ struct Settings
\
M(SettingFloat, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.") \
\
M(SettingInt64 , memory_tracker_accuracy_diff_for_test, 0, "For testing of the accuracy of the memory tracker - throw an exception when real_rss is much larger than tracked amount.") \
\
M(SettingBool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.") \
M(SettingInt64, http_zlib_compression_level, 3, "Compression level - used if the client on HTTP said that it understands data compressed by gzip or deflate.") \
\
Expand Down Expand Up @@ -316,7 +318,7 @@ struct Settings
M(SettingUInt64, dt_checksum_frame_size, DBMS_DEFAULT_BUFFER_SIZE, "Frame size for delta tree stable storage") \
\
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingBool, dt_enable_read_thread, false, "Enable storage read thread or not") \
M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \
\
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Server/RaftConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ struct TiFlashRaftConfig
std::unordered_set<std::string> ignore_databases{"system"};
// Actually it is "flash.service_addr"
std::string flash_server_addr;

// Use PageStorage V1 for kvstore or not.
// TODO: remove this config
bool enable_compatible_mode = true;

bool for_unit_test = false;

static constexpr TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT;
TiDB::StorageEngine engine = DEFAULT_ENGINE;
TiDB::SnapshotApplyMethod snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Directory;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
max_version,
std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)));
}
LOG_FMT_TRACE(log, "Start to read segment [{}]", cur_segment->segmentId());
LOG_FMT_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
}
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);

Expand Down Expand Up @@ -151,7 +151,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
else
{
after_segment_read(dm_context, cur_segment);
LOG_FMT_TRACE(log, "Finish reading segment [{}]", cur_segment->segmentId());
LOG_FMT_TRACE(log, "Finish reading segment, segment={}", cur_segment->simpleInfo());
cur_segment = {};
cur_stream = {};
}
Expand Down
Loading

0 comments on commit 3b4be84

Please sign in to comment.