Skip to content

Commit

Permalink
fix MemTableSet clone column files
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Feb 7, 2022
1 parent dbe1fb5 commit 2d59d38
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 7 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void ColumnFilePersistedSet::checkColumnFiles(const ColumnFilePersistedLevels &

if (unlikely(new_rows != rows || new_deletes != deletes))
{
LOG_FMT_ERROR(log, "{}: Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}]. Current column files: {}, new column files: {}.", __PRETTY_FUNCTION__, new_rows, new_deletes, rows.load(), deletes.load(), columnFilesToString(flattenColumnFileLevels(persisted_files_levels)), columnFilesToString(flattenColumnFileLevels(new_column_file_levels)));
LOG_FMT_ERROR(log, "Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}]. Current column files: {}, new column files: {}.", new_rows, new_deletes, rows.load(), deletes.load(), columnFilesToString(flattenColumnFileLevels(persisted_files_levels)), columnFilesToString(flattenColumnFileLevels(new_column_file_levels)));
throw Exception("Rows and deletes check failed.", ErrorCodes::LOGICAL_ERROR);
}
}
Expand Down Expand Up @@ -460,7 +460,7 @@ ColumnFileSetSnapshotPtr ColumnFilePersistedSet::createSnapshot(const DMContext

if (unlikely(total_rows != rows || total_deletes != deletes))
{
LOG_FMT_ERROR(log, "{}: Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}].", __PRETTY_FUNCTION__, total_rows, total_deletes, rows.load(), deletes.load());
LOG_FMT_ERROR(log, "Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}].", total_rows, total_deletes, rows.load(), deletes.load());
throw Exception("Rows and deletes check failed.", ErrorCodes::LOGICAL_ERROR);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ DeltaValueSpace::checkHeadAndCloneTail(DMContext & context,
WriteBatches & wbs) const
{
auto tail_persisted_files = persisted_file_set->checkHeadAndCloneTail(context, target_range, head_column_files, wbs);
auto memory_files = mem_table_set->cloneColumnFiles();
auto memory_files = mem_table_set->cloneColumnFiles(context, target_range, wbs);
return std::make_pair(std::move(tail_persisted_files), std::move(memory_files));
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
String simpleInfo() const { return "Delta [" + DB::toString(persisted_file_set->getId()) + "]"; }
String info() const
{
return fmt::format("{}, {}", mem_table_set->info(), persisted_file_set->info());
return fmt::format("{}. {}", mem_table_set->info(), persisted_file_set->info());
}

bool getLock(Lock & lock) const
Expand Down
55 changes: 53 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Delta/MemTableSet.h>
#include <Storages/DeltaMerge/WriteBatches.h>
#include <Storages/PathPool.h>

namespace ProfileEvents
{
Expand Down Expand Up @@ -49,6 +52,54 @@ void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
deletes += column_file->getDeletes();
}

ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs)
{
ColumnFiles cloned_column_files;
for (const auto & column_file : column_files)
{
if (auto * dr = column_file->tryToDeleteRange(); dr)
{
auto new_dr = dr->getDeleteRange().shrink(target_range);
if (!new_dr.none())
{
// Only use the available delete_range pack.
cloned_column_files.push_back(dr->cloneWith(new_dr));
}
}
else if (auto * b = column_file->tryToInMemoryFile(); b)
{
auto new_column_file = b->clone();

// No matter or what, don't append to packs which cloned from old packs again.
// Because they could shared the same cache. And the cache can NOT be inserted from different packs in different delta.
new_column_file->disableAppend();
cloned_column_files.push_back(new_column_file);
}
else if (auto * t = column_file->tryToTinyFile(); t)
{
// Use a newly created page_id to reference the data page_id of current pack.
PageId new_data_page_id = context.storage_pool.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
auto new_column_file = t->cloneWith(new_data_page_id);

cloned_column_files.push_back(new_column_file);
}
else if (auto * f = column_file->tryToBigFile(); f)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path, DMFile::ReadMetaMode::all());

auto new_column_file = f->cloneWith(context, new_file, target_range);
cloned_column_files.push_back(new_column_file);
}
}
return cloned_column_files;
}

void MemTableSet::appendColumnFile(const ColumnFilePtr & column_file)
{
appendColumnFileInner(column_file);
Expand Down Expand Up @@ -125,7 +176,7 @@ ColumnFileSetSnapshotPtr MemTableSet::createSnapshot()

if (unlikely(total_rows != rows || total_deletes != deletes))
{
LOG_FMT_ERROR(log, "{}: Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}].", __PRETTY_FUNCTION__, total_rows, total_deletes, rows.load(), deletes.load());
LOG_FMT_ERROR(log, "Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}].", total_rows, total_deletes, rows.load(), deletes.load());
throw Exception("Rows and deletes check failed.", ErrorCodes::LOGICAL_ERROR);
}

Expand Down Expand Up @@ -158,7 +209,7 @@ ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(DMContext & context, size_t r
}
if (unlikely(flush_task->getFlushRows() != rows || flush_task->getFlushDeletes() != deletes))
{
LOG_FMT_ERROR(log, "{}: Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}].", __PRETTY_FUNCTION__, flush_task->getFlushRows(), flush_task->getFlushDeletes(), rows.load(), deletes.load());
LOG_FMT_ERROR(log, "Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}]. Column Files: {}", flush_task->getFlushRows(), flush_task->getFlushDeletes(), rows.load(), deletes.load(), columnFilesToString(column_files));
throw Exception("Rows and deletes check failed.", ErrorCodes::LOGICAL_ERROR);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
deletes.load());
}

ColumnFiles cloneColumnFiles() { return column_files; }
ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs);

size_t getColumnFileCount() const { return column_files.size(); }
size_t getRows() const { return rows; }
Expand Down

0 comments on commit 2d59d38

Please sign in to comment.