diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index a52fde00a4a..7ffd2eeca03 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -67,6 +67,7 @@ void ColumnFilePersistedSet::updateColumnFileStats() } } persisted_files_count = new_persisted_files_count; + persisted_files_level_count = persisted_files_levels.size(); rows = new_rows; bytes = new_bytes; deletes = new_deletes; @@ -319,6 +320,7 @@ bool ColumnFilePersistedSet::appendPersistedColumnFilesToLevel0(const ColumnFile /// Commit updates in memory. persisted_files_levels.swap(new_persisted_files_levels); updateColumnFileStats(); + LOG_FMT_DEBUG(log, "{}, after append {} column files, level info: {}", info(), column_files.size(), levelsInfo()); return true; } @@ -392,7 +394,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr & return false; } minor_compaction_version += 1; - LOG_FMT_DEBUG(log, "Before commit compaction, level summary: {}", info()); + LOG_FMT_DEBUG(log, "{}, before commit compaction, level info: {}", info(), levelsInfo()); ColumnFilePersistedLevels new_persisted_files_levels; auto compaction_src_level = compaction->getCompactionSourceLevel(); // Copy column files in level range [0, compaction_src_level) @@ -461,7 +463,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr & /// Commit updates in memory. persisted_files_levels.swap(new_persisted_files_levels); updateColumnFileStats(); - LOG_FMT_DEBUG(log, "After commit compaction, level summary: {}", info()); + LOG_FMT_DEBUG(log, "{}, after commit compaction, level info: {}", info(), levelsInfo()); return true; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index ea6e6fbaddd..5fcd9b8c618 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -44,7 +44,8 @@ namespace DM class ColumnFilePersistedSet; using ColumnFilePersistedSetPtr = std::shared_ptr; -/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// Only the method that just access atomic variable can be called without extra synchronization class ColumnFilePersistedSet : public std::enable_shared_from_this , private boost::noncopyable { @@ -55,7 +56,9 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this persisted_files_count; + std::atomic persisted_files_level_count; std::atomic rows = 0; std::atomic bytes = 0; @@ -80,25 +83,19 @@ class ColumnFilePersistedSet : public std::enable_shared_from_thisgetRows(); bytes += column_file->getBytes(); @@ -269,6 +270,7 @@ void MemTableSet::removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush column_file_iter++; } column_files.swap(new_column_files); + column_files_count = column_files.size(); rows = new_rows; bytes = new_bytes; deletes = new_deletes; diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index e1144200249..295b358090e 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -28,7 +28,8 @@ using MemTableSetPtr = std::shared_ptr; /// MemTableSet contains column file which data just resides in memory and it cannot be restored after restart. /// And the column files will be flushed periodically to ColumnFilePersistedSet. /// -/// This class is not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// This class is mostly not thread safe, manipulate on it requires acquire extra synchronization on the DeltaValueSpace +/// Only the method that just access atomic variable can be called without extra synchronization class MemTableSet : public std::enable_shared_from_this , private boost::noncopyable { @@ -36,7 +37,10 @@ class MemTableSet : public std::enable_shared_from_this /// To avoid serialize the same schema between continuous ColumnFileInMemory and ColumnFileTiny instance. BlockPtr last_schema; + // Note that we must update `column_files_count` for outer thread-safe after `column_files` changed ColumnFiles column_files; + // TODO: check the proper memory_order when use this atomic variable + std::atomic column_files_count; std::atomic rows = 0; std::atomic bytes = 0; @@ -53,6 +57,7 @@ class MemTableSet : public std::enable_shared_from_this , column_files(in_memory_files) , log(&Poco::Logger::get("MemTableSet")) { + column_files_count = column_files.size(); for (const auto & file : column_files) { rows += file->getRows(); @@ -61,21 +66,24 @@ class MemTableSet : public std::enable_shared_from_this } } + /// Thread safe part start String info() const { return fmt::format("MemTableSet: {} column files, {} rows, {} bytes, {} deletes", - column_files.size(), + column_files_count.load(), rows.load(), bytes.load(), deletes.load()); } + size_t getColumnFileCount() const { return column_files_count.load(); } + size_t getRows() const { return rows.load(); } + size_t getBytes() const { return bytes.load(); } + size_t getDeletes() const { return deletes.load(); } + /// Thread safe part end + ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs); - size_t getColumnFileCount() const { return column_files.size(); } - size_t getRows() const { return rows; } - size_t getBytes() const { return bytes; } - size_t getDeletes() const { return deletes; } /// The following methods returning false means this operation failed, caused by other threads could have done /// some updates on this instance. E.g. this instance have been abandoned.