From 0c98df870157720c0635d398d802af7fd848ba03 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 23 Mar 2022 15:20:24 +0800 Subject: [PATCH 1/3] make DeltaValueSpace::info thread safe Signed-off-by: lidezhu --- .../Delta/ColumnFilePersistedSet.cpp | 6 +++-- .../DeltaMerge/Delta/ColumnFilePersistedSet.h | 22 +++++++++---------- .../Storages/DeltaMerge/Delta/MemTableSet.cpp | 2 ++ .../Storages/DeltaMerge/Delta/MemTableSet.h | 18 ++++++++++----- 4 files changed, 28 insertions(+), 20 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index a52fde00a4a..7ffd2eeca03 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -67,6 +67,7 @@ void ColumnFilePersistedSet::updateColumnFileStats() } } persisted_files_count = new_persisted_files_count; + persisted_files_level_count = persisted_files_levels.size(); rows = new_rows; bytes = new_bytes; deletes = new_deletes; @@ -319,6 +320,7 @@ bool ColumnFilePersistedSet::appendPersistedColumnFilesToLevel0(const ColumnFile /// Commit updates in memory. persisted_files_levels.swap(new_persisted_files_levels); updateColumnFileStats(); + LOG_FMT_DEBUG(log, "{}, after append {} column files, level info: {}", info(), column_files.size(), levelsInfo()); return true; } @@ -392,7 +394,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr & return false; } minor_compaction_version += 1; - LOG_FMT_DEBUG(log, "Before commit compaction, level summary: {}", info()); + LOG_FMT_DEBUG(log, "{}, before commit compaction, level info: {}", info(), levelsInfo()); ColumnFilePersistedLevels new_persisted_files_levels; auto compaction_src_level = compaction->getCompactionSourceLevel(); // Copy column files in level range [0, compaction_src_level) @@ -461,7 +463,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr & /// Commit updates in memory. persisted_files_levels.swap(new_persisted_files_levels); updateColumnFileStats(); - LOG_FMT_DEBUG(log, "After commit compaction, level summary: {}", info()); + LOG_FMT_DEBUG(log, "{}, after commit compaction, level info: {}", info(), levelsInfo()); return true; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index ea6e6fbaddd..cc699145caa 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -44,7 +44,8 @@ namespace DM class ColumnFilePersistedSet; using ColumnFilePersistedSetPtr = std::shared_ptr; -/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// Only the method that just access atomic variable can be called without extra synchronization class ColumnFilePersistedSet : public std::enable_shared_from_this , private boost::noncopyable { @@ -56,6 +57,7 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this persisted_files_count; + std::atomic persisted_files_level_count; std::atomic rows = 0; std::atomic bytes = 0; @@ -80,25 +82,19 @@ class ColumnFilePersistedSet : public std::enable_shared_from_thisgetRows(); bytes += column_file->getBytes(); @@ -269,6 +270,7 @@ void MemTableSet::removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush column_file_iter++; } column_files.swap(new_column_files); + column_files_count = column_files.size(); rows = new_rows; bytes = new_bytes; deletes = new_deletes; diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index e1144200249..316fabfe251 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -28,7 +28,8 @@ using MemTableSetPtr = std::shared_ptr; /// MemTableSet contains column file which data just resides in memory and it cannot be restored after restart. /// And the column files will be flushed periodically to ColumnFilePersistedSet. /// -/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// Only the method that just access atomic variable can be called without extra synchronization class MemTableSet : public std::enable_shared_from_this , private boost::noncopyable { @@ -37,6 +38,7 @@ class MemTableSet : public std::enable_shared_from_this BlockPtr last_schema; ColumnFiles column_files; + std::atomic column_files_count; std::atomic rows = 0; std::atomic bytes = 0; @@ -53,6 +55,7 @@ class MemTableSet : public std::enable_shared_from_this , column_files(in_memory_files) , log(&Poco::Logger::get("MemTableSet")) { + column_files_count = column_files.size(); for (const auto & file : column_files) { rows += file->getRows(); @@ -61,21 +64,24 @@ class MemTableSet : public std::enable_shared_from_this } } + /// Thread safe part start String info() const { return fmt::format("MemTableSet: {} column files, {} rows, {} bytes, {} deletes", - column_files.size(), + column_files_count.load(), rows.load(), bytes.load(), deletes.load()); } + size_t getColumnFileCount() const { return column_files_count.load(); } + size_t getRows() const { return rows.load(); } + size_t getBytes() const { return bytes.load(); } + size_t getDeletes() const { return deletes.load(); } + /// Thread safe part end + ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs); - size_t getColumnFileCount() const { return column_files.size(); } - size_t getRows() const { return rows; } - size_t getBytes() const { return bytes; } - size_t getDeletes() const { return deletes; } /// The following methods returning false means this operation failed, caused by other threads could have done /// some updates on this instance. E.g. this instance have been abandoned. From 8b19243316c9e1da4430e85fe9e856420bb6e70b Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 23 Mar 2022 15:25:30 +0800 Subject: [PATCH 2/3] add some todo Signed-off-by: lidezhu --- dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h | 1 + dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h | 1 + 2 files changed, 2 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index cc699145caa..5fcd9b8c618 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -56,6 +56,7 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this persisted_files_count; std::atomic persisted_files_level_count; diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index 316fabfe251..5de57a7c9a8 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -38,6 +38,7 @@ class MemTableSet : public std::enable_shared_from_this BlockPtr last_schema; ColumnFiles column_files; + // TODO: check the proper memory_order when use this atomic variable std::atomic column_files_count; std::atomic rows = 0; From c2034026ce862e2b071e1256818a4864b8a15786 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Wed, 23 Mar 2022 15:37:01 +0800 Subject: [PATCH 3/3] Update dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h Co-authored-by: JaySon --- dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index 5de57a7c9a8..295b358090e 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -37,6 +37,7 @@ class MemTableSet : public std::enable_shared_from_this /// To avoid serialize the same schema between continuous ColumnFileInMemory and ColumnFileTiny instance. BlockPtr last_schema; + // Note that we must update `column_files_count` for outer thread-safe after `column_files` changed ColumnFiles column_files; // TODO: check the proper memory_order when use this atomic variable std::atomic column_files_count;