Skip to content

Commit

Permalink
make DeltaValueSpace::info thread safe (#4393) (#4396)
Browse files Browse the repository at this point in the history
close #4376
  • Loading branch information
ti-chi-bot authored Mar 23, 2022
1 parent 4c0d177 commit e9a1ded
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 20 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void ColumnFilePersistedSet::updateColumnFileStats()
}
}
persisted_files_count = new_persisted_files_count;
persisted_files_level_count = persisted_files_levels.size();
rows = new_rows;
bytes = new_bytes;
deletes = new_deletes;
Expand Down Expand Up @@ -319,6 +320,7 @@ bool ColumnFilePersistedSet::appendPersistedColumnFilesToLevel0(const ColumnFile
/// Commit updates in memory.
persisted_files_levels.swap(new_persisted_files_levels);
updateColumnFileStats();
LOG_FMT_DEBUG(log, "{}, after append {} column files, level info: {}", info(), column_files.size(), levelsInfo());

return true;
}
Expand Down Expand Up @@ -392,7 +394,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &
return false;
}
minor_compaction_version += 1;
LOG_FMT_DEBUG(log, "Before commit compaction, level summary: {}", info());
LOG_FMT_DEBUG(log, "{}, before commit compaction, level info: {}", info(), levelsInfo());
ColumnFilePersistedLevels new_persisted_files_levels;
auto compaction_src_level = compaction->getCompactionSourceLevel();
// Copy column files in level range [0, compaction_src_level)
Expand Down Expand Up @@ -461,7 +463,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &
/// Commit updates in memory.
persisted_files_levels.swap(new_persisted_files_levels);
updateColumnFileStats();
LOG_FMT_DEBUG(log, "After commit compaction, level summary: {}", info());
LOG_FMT_DEBUG(log, "{}, after commit compaction, level info: {}", info(), levelsInfo());

return true;
}
Expand Down
23 changes: 11 additions & 12 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ namespace DM
class ColumnFilePersistedSet;
using ColumnFilePersistedSetPtr = std::shared_ptr<ColumnFilePersistedSet>;

/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// Only the method that just access atomic variable can be called without extra synchronization
class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePersistedSet>
, private boost::noncopyable
{
Expand All @@ -55,7 +56,9 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
private:
PageId metadata_id;
ColumnFilePersistedLevels persisted_files_levels;
// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> persisted_files_count;
std::atomic<size_t> persisted_files_level_count;

std::atomic<size_t> rows = 0;
std::atomic<size_t> bytes = 0;
Expand All @@ -80,25 +83,19 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
/// Only called after reboot.
static ColumnFilePersistedSetPtr restore(DMContext & context, const RowKeyRange & segment_range, PageId id);

/// Thread safe part start
String simpleInfo() const { return "ColumnFilePersistedSet [" + DB::toString(metadata_id) + "]"; }
String info() const
{
String levels_summary;
for (size_t i = 0; i < persisted_files_levels.size(); i++)
{
levels_summary += fmt::format("[{}: {}]", i, persisted_files_levels[i].size());
if (i != persisted_files_levels.size() - 1)
levels_summary += ",";
}

return fmt::format("ColumnFilePersistedSet [{}][levels summary: {}]: {} column files, {} rows, {} bytes, {} deletes.",
return fmt::format("ColumnFilePersistedSet [{}]: {} levels, {} column files, {} rows, {} bytes, {} deletes.",
metadata_id,
levels_summary,
persisted_files_level_count.load(),
persisted_files_count.load(),
rows.load(),
bytes.load(),
deletes.load());
}
/// Thread safe part end
String levelsInfo() const
{
String levels_info;
Expand All @@ -116,13 +113,15 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
ColumnFilePersisteds
checkHeadAndCloneTail(DMContext & context, const RowKeyRange & target_range, const ColumnFiles & head_column_files, WriteBatches & wbs) const;

/// Thread safe part start
PageId getId() const { return metadata_id; }

size_t getColumnFileCount() const { return persisted_files_count.load(); }
size_t getColumnFileLevelCount() const { return persisted_files_levels.size(); }
size_t getColumnFileLevelCount() const { return persisted_files_level_count.load(); }
size_t getRows() const { return rows.load(); }
size_t getBytes() const { return bytes.load(); }
size_t getDeletes() const { return deletes.load(); }
/// Thread safe part end

size_t getTotalCacheRows() const;
size_t getTotalCacheBytes() const;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
}

column_files.push_back(column_file);
column_files_count = column_files.size();

rows += column_file->getRows();
bytes += column_file->getBytes();
Expand Down Expand Up @@ -269,6 +270,7 @@ void MemTableSet::removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush
column_file_iter++;
}
column_files.swap(new_column_files);
column_files_count = column_files.size();
rows = new_rows;
bytes = new_bytes;
deletes = new_deletes;
Expand Down
20 changes: 14 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ using MemTableSetPtr = std::shared_ptr<MemTableSet>;
/// MemTableSet contains column file which data just resides in memory and it cannot be restored after restart.
/// And the column files will be flushed periodically to ColumnFilePersistedSet.
///
/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace
/// Only the method that just access atomic variable can be called without extra synchronization
class MemTableSet : public std::enable_shared_from_this<MemTableSet>
, private boost::noncopyable
{
private:
/// To avoid serialize the same schema between continuous ColumnFileInMemory and ColumnFileTiny instance.
BlockPtr last_schema;

// Note that we must update `column_files_count` for outer thread-safe after `column_files` changed
ColumnFiles column_files;
// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> column_files_count;

std::atomic<size_t> rows = 0;
std::atomic<size_t> bytes = 0;
Expand All @@ -53,6 +57,7 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
, column_files(in_memory_files)
, log(&Poco::Logger::get("MemTableSet"))
{
column_files_count = column_files.size();
for (const auto & file : column_files)
{
rows += file->getRows();
Expand All @@ -61,21 +66,24 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
}
}

/// Thread safe part start
String info() const
{
return fmt::format("MemTableSet: {} column files, {} rows, {} bytes, {} deletes",
column_files.size(),
column_files_count.load(),
rows.load(),
bytes.load(),
deletes.load());
}

size_t getColumnFileCount() const { return column_files_count.load(); }
size_t getRows() const { return rows.load(); }
size_t getBytes() const { return bytes.load(); }
size_t getDeletes() const { return deletes.load(); }
/// Thread safe part end

ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs);

size_t getColumnFileCount() const { return column_files.size(); }
size_t getRows() const { return rows; }
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

/// The following methods returning false means this operation failed, caused by other threads could have done
/// some updates on this instance. E.g. this instance have been abandoned.
Expand Down

0 comments on commit e9a1ded

Please sign in to comment.