Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make DeltaValueSpace::info thread safe #4393

Merged
merged 4 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void ColumnFilePersistedSet::updateColumnFileStats()
}
}
persisted_files_count = new_persisted_files_count;
persisted_files_level_count = persisted_files_levels.size();
rows = new_rows;
bytes = new_bytes;
deletes = new_deletes;
Expand Down Expand Up @@ -319,6 +320,7 @@ bool ColumnFilePersistedSet::appendPersistedColumnFilesToLevel0(const ColumnFile
/// Commit updates in memory.
persisted_files_levels.swap(new_persisted_files_levels);
updateColumnFileStats();
LOG_FMT_DEBUG(log, "{}, after append {} column files, level info: {}", info(), column_files.size(), levelsInfo());

return true;
}
Expand Down Expand Up @@ -392,7 +394,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &
return false;
}
minor_compaction_version += 1;
LOG_FMT_DEBUG(log, "Before commit compaction, level summary: {}", info());
LOG_FMT_DEBUG(log, "{}, before commit compaction, level info: {}", info(), levelsInfo());
ColumnFilePersistedLevels new_persisted_files_levels;
auto compaction_src_level = compaction->getCompactionSourceLevel();
// Copy column files in level range [0, compaction_src_level)
Expand Down Expand Up @@ -461,7 +463,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &
/// Commit updates in memory.
persisted_files_levels.swap(new_persisted_files_levels);
updateColumnFileStats();
LOG_FMT_DEBUG(log, "After commit compaction, level summary: {}", info());
LOG_FMT_DEBUG(log, "{}, after commit compaction, level info: {}", info(), levelsInfo());

return true;
}
Expand Down
23 changes: 11 additions & 12 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ namespace DM
class ColumnFilePersistedSet;
using ColumnFilePersistedSetPtr = std::shared_ptr<ColumnFilePersistedSet>;

/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// Only the method that just access atomic variable can be called without extra synchronization
class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePersistedSet>
, private boost::noncopyable
{
Expand All @@ -55,7 +56,9 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
private:
PageId metadata_id;
ColumnFilePersistedLevels persisted_files_levels;
// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> persisted_files_count;
std::atomic<size_t> persisted_files_level_count;

std::atomic<size_t> rows = 0;
std::atomic<size_t> bytes = 0;
Expand All @@ -80,25 +83,19 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
/// Only called after reboot.
static ColumnFilePersistedSetPtr restore(DMContext & context, const RowKeyRange & segment_range, PageId id);

/// Thread safe part start
String simpleInfo() const { return "ColumnFilePersistedSet [" + DB::toString(metadata_id) + "]"; }
String info() const
{
String levels_summary;
for (size_t i = 0; i < persisted_files_levels.size(); i++)
{
levels_summary += fmt::format("[{}: {}]", i, persisted_files_levels[i].size());
if (i != persisted_files_levels.size() - 1)
levels_summary += ",";
}

return fmt::format("ColumnFilePersistedSet [{}][levels summary: {}]: {} column files, {} rows, {} bytes, {} deletes.",
return fmt::format("ColumnFilePersistedSet [{}]: {} levels, {} column files, {} rows, {} bytes, {} deletes.",
metadata_id,
levels_summary,
persisted_files_level_count.load(),
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
persisted_files_count.load(),
rows.load(),
bytes.load(),
deletes.load());
}
/// Thread safe part end
String levelsInfo() const
{
String levels_info;
Expand All @@ -116,13 +113,15 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
ColumnFilePersisteds
checkHeadAndCloneTail(DMContext & context, const RowKeyRange & target_range, const ColumnFiles & head_column_files, WriteBatches & wbs) const;

/// Thread safe part start
PageId getId() const { return metadata_id; }

size_t getColumnFileCount() const { return persisted_files_count.load(); }
size_t getColumnFileLevelCount() const { return persisted_files_levels.size(); }
size_t getColumnFileLevelCount() const { return persisted_files_level_count.load(); }
size_t getRows() const { return rows.load(); }
size_t getBytes() const { return bytes.load(); }
size_t getDeletes() const { return deletes.load(); }
/// Thread safe part end

size_t getTotalCacheRows() const;
size_t getTotalCacheBytes() const;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
}

column_files.push_back(column_file);
column_files_count = column_files.size();

rows += column_file->getRows();
bytes += column_file->getBytes();
Expand Down Expand Up @@ -269,6 +270,7 @@ void MemTableSet::removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush
column_file_iter++;
}
column_files.swap(new_column_files);
column_files_count = column_files.size();
rows = new_rows;
bytes = new_bytes;
deletes = new_deletes;
Expand Down
19 changes: 13 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ using MemTableSetPtr = std::shared_ptr<MemTableSet>;
/// MemTableSet contains column file which data just resides in memory and it cannot be restored after restart.
/// And the column files will be flushed periodically to ColumnFilePersistedSet.
///
/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// Only the method that just access atomic variable can be called without extra synchronization
class MemTableSet : public std::enable_shared_from_this<MemTableSet>
, private boost::noncopyable
{
Expand All @@ -37,6 +38,8 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
BlockPtr last_schema;

ColumnFiles column_files;
// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> column_files_count;
lidezhu marked this conversation as resolved.
Show resolved Hide resolved

std::atomic<size_t> rows = 0;
std::atomic<size_t> bytes = 0;
Expand All @@ -53,6 +56,7 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
, column_files(in_memory_files)
, log(&Poco::Logger::get("MemTableSet"))
{
column_files_count = column_files.size();
for (const auto & file : column_files)
{
rows += file->getRows();
Expand All @@ -61,21 +65,24 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
}
}

/// Thread safe part start
String info() const
{
return fmt::format("MemTableSet: {} column files, {} rows, {} bytes, {} deletes",
column_files.size(),
column_files_count.load(),
rows.load(),
bytes.load(),
deletes.load());
}

size_t getColumnFileCount() const { return column_files_count.load(); }
size_t getRows() const { return rows.load(); }
size_t getBytes() const { return bytes.load(); }
size_t getDeletes() const { return deletes.load(); }
/// Thread safe part end

ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs);

size_t getColumnFileCount() const { return column_files.size(); }
size_t getRows() const { return rows; }
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

/// The following methods returning false means this operation failed, caused by other threads could have done
/// some updates on this instance. E.g. this instance have been abandoned.
Expand Down