Skip to content

Commit

Permalink
Save storage schema snapshot for decoding Region snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang committed Aug 3, 2021
1 parent 2cec74c commit d20b395
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 142 deletions.
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,14 @@ inline Block getSubBlock(const Block & block, size_t offset, size_t limit)
}
}

// Add an extra handle column if handle reused the original column data.
Block DeltaMergeStore::addExtraColumnIfNeed(const Context & db_context, Block && block) const
// Add an extra handle column if pkIsHandle
Block DeltaMergeStore::addExtraColumnIfNeed(const Context & db_context, const ColumnDefine & handle_define, Block && block)
{
if (pkIsHandle())
if (pkIsHandle(handle_define))
{
if (!EXTRA_HANDLE_COLUMN_INT_TYPE->equals(*original_table_handle_define.type))
if (!EXTRA_HANDLE_COLUMN_INT_TYPE->equals(*handle_define.type))
{
auto handle_pos = getPosByColumnId(block, original_table_handle_define.id);
auto handle_pos = getPosByColumnId(block, handle_define.id);
addColumnToBlock(block, //
EXTRA_HANDLE_COLUMN_ID,
EXTRA_HANDLE_COLUMN_NAME,
Expand All @@ -423,7 +423,7 @@ Block DeltaMergeStore::addExtraColumnIfNeed(const Context & db_context, Block &&
{
// If types are identical, `FunctionToInt64` just take reference to the original column.
// We need a deep copy for the pk column or it will make trobule for later processing.
auto pk_col_with_name = getByColumnId(block, original_table_handle_define.id);
auto pk_col_with_name = getByColumnId(block, handle_define.id);
auto pk_column = pk_col_with_name.column;
ColumnPtr handle_column = pk_column->cloneResized(pk_column->size());
addColumnToBlock(block, //
Expand All @@ -447,7 +447,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
return;

auto dm_context = newDMContext(db_context, db_settings);
Block block = addExtraColumnIfNeed(db_context, std::move(to_write));
Block block = addExtraColumnIfNeed(db_context, original_table_handle_define, std::move(to_write));

const auto bytes = block.bytes();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class DeltaMergeStore : private boost::noncopyable
// Stop all background tasks.
void shutdown();

Block addExtraColumnIfNeed(const Context & db_context, Block && block) const;
static Block addExtraColumnIfNeed(const Context & db_context, const ColumnDefine & handle_define, Block && block);

void write(const Context & db_context, const DB::Settings & db_settings, Block && block);

Expand Down Expand Up @@ -404,7 +404,7 @@ class DeltaMergeStore : private boost::noncopyable

DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id="");

bool pkIsHandle() const { return original_table_handle_define.id != EXTRA_HANDLE_COLUMN_ID; }
static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; }

void waitForWrite(const DMContextPtr & context, const SegmentPtr & segment);
void waitForDeleteRange(const DMContextPtr & context, const SegmentPtr & segment);
Expand Down
47 changes: 17 additions & 30 deletions dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Storages/DeltaMerge/PKSquashingBlockInputStream.h>
#include <Storages/DeltaMerge/SSTFilesToBlockInputStream.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/Transaction/PartitionStreams.h>
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/SSTReader.h>
Expand All @@ -23,32 +24,22 @@ namespace ErrorCodes
extern const int ILLFORMAT_RAFT_ROW;
} // namespace ErrorCodes

Block GenRegionBlockDatawithSchema( //
const RegionPtr &,
const std::shared_ptr<StorageDeltaMerge> &,
const DM::ColumnDefinesPtr &,
Timestamp,
bool,
TMTContext &);

namespace DM
{

SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
RegionPtr region_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
SSTFilesToBlockInputStream::StorageDeltaMergePtr ingest_storage_,
DM::ColumnDefinesPtr schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_)
RegionPtr region_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
const DecodingStorageSchemaSnapshot & schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_)
: region(std::move(region_)),
snaps(snaps_),
proxy_helper(proxy_helper_),
ingest_storage(std::move(ingest_storage_)),
schema_snap(std::move(schema_snap_)),
schema_snap(schema_snap_),
tmt(tmt_),
gc_safepoint(gc_safepoint_),
expected_size(expected_size_),
Expand Down Expand Up @@ -237,7 +228,7 @@ Block SSTFilesToBlockInputStream::readCommitedBlock()
{
// Read block from `region`. If the schema has been updated, it will
// throw an exception with code `ErrorCodes::REGION_DATA_SCHEMA_UPDATED`
return GenRegionBlockDatawithSchema(region, ingest_storage, schema_snap, gc_safepoint, force_decode, tmt);
return GenRegionBlockDatawithSchema(region, schema_snap, gc_safepoint, force_decode, tmt);
}
catch (DB::Exception & e)
{
Expand All @@ -260,18 +251,19 @@ Block SSTFilesToBlockInputStream::readCommitedBlock()
/// Methods for BoundedSSTFilesToBlockInputStream

BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( //
SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const bool is_common_handle_)
: pk_column_id(pk_column_id_), is_common_handle(is_common_handle_), _raw_child(std::move(child))
SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const DecodingStorageSchemaSnapshot & schema_snap)
: pk_column_id(pk_column_id_), _raw_child(std::move(child))
{
const bool is_common_handle = schema_snap.is_common_handle;
// Initlize `mvcc_compact_stream`
// First refine the boundary of blocks. Note that the rows decoded from SSTFiles are sorted by primary key asc, timestamp desc
// (https://github.com/tikv/tikv/blob/v5.0.1/components/txn_types/src/types.rs#L103-L108).
// While DMVersionFilter require rows sorted by primary key asc, timestamp asc, so we need an extra sort in PKSquashing.
auto stream = std::make_shared<PKSquashingBlockInputStream</*need_extra_sort=*/true>>(_raw_child, pk_column_id, is_common_handle);
mvcc_compact_stream = std::make_unique<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
stream, *(_raw_child->schema_snap), _raw_child->gc_safepoint, is_common_handle);
stream, *(schema_snap.column_defines), _raw_child->gc_safepoint, is_common_handle);
}

void BoundedSSTFilesToBlockInputStream::readPrefix()
Expand All @@ -289,11 +281,6 @@ Block BoundedSSTFilesToBlockInputStream::read()
return mvcc_compact_stream->read();
}

std::tuple<std::shared_ptr<StorageDeltaMerge>, DM::ColumnDefinesPtr> BoundedSSTFilesToBlockInputStream::ingestingInfo() const
{
return std::make_tuple(_raw_child->ingest_storage, _raw_child->schema_snap);
}

SSTFilesToBlockInputStream::ProcessKeys BoundedSSTFilesToBlockInputStream::getProcessKeys() const
{
return _raw_child->process_keys;
Expand Down
45 changes: 21 additions & 24 deletions dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <RaftStoreProxyFFI/ColumnFamily.h>
#include <Storages/DeltaMerge/DMVersionFilterBlockInputStream.h>
#include <Storages/Transaction/PartitionStreams.h>

#include <memory>
#include <string_view>
Expand Down Expand Up @@ -40,21 +41,19 @@ using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr<BoundedSSTFilesToBl
class SSTFilesToBlockInputStream final : public IBlockInputStream
{
public:
using StorageDeltaMergePtr = std::shared_ptr<StorageDeltaMerge>;
SSTFilesToBlockInputStream(RegionPtr region_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
StorageDeltaMergePtr ingest_storage_,
DM::ColumnDefinesPtr schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
SSTFilesToBlockInputStream(RegionPtr region_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
const DecodingStorageSchemaSnapshot & schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
~SSTFilesToBlockInputStream();

String getName() const override { return "SSTFilesToBlockInputStream"; }

Block getHeader() const override { return toEmptyBlock(*schema_snap); }
Block getHeader() const override { return toEmptyBlock(*(schema_snap.column_defines)); }

void readPrefix() override;
void readSuffix() override;
Expand All @@ -76,15 +75,14 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
Block readCommitedBlock();

private:
RegionPtr region;
const SSTViewVec & snaps;
const TiFlashRaftProxyHelper * proxy_helper{nullptr};
const StorageDeltaMergePtr ingest_storage;
const DM::ColumnDefinesPtr schema_snap;
TMTContext & tmt;
const Timestamp gc_safepoint;
size_t expected_size;
Poco::Logger * log;
RegionPtr region;
const SSTViewVec & snaps;
const TiFlashRaftProxyHelper * proxy_helper{nullptr};
const DecodingStorageSchemaSnapshot & schema_snap;
TMTContext & tmt;
const Timestamp gc_safepoint;
size_t expected_size;
Poco::Logger * log;

using SSTReaderPtr = std::unique_ptr<SSTReader>;
SSTReaderPtr write_cf_reader;
Expand All @@ -107,7 +105,9 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
class BoundedSSTFilesToBlockInputStream final
{
public:
BoundedSSTFilesToBlockInputStream(SSTFilesToBlockInputStreamPtr child, const ColId pk_column_id_, const bool is_common_handle_);
BoundedSSTFilesToBlockInputStream(SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const DecodingStorageSchemaSnapshot & schema_snap);

String getName() const { return "BoundedSSTFilesToBlockInputStream"; }

Expand All @@ -117,8 +117,6 @@ class BoundedSSTFilesToBlockInputStream final

Block read();

std::tuple<std::shared_ptr<StorageDeltaMerge>, DM::ColumnDefinesPtr> ingestingInfo() const;

SSTFilesToBlockInputStream::ProcessKeys getProcessKeys() const;

const RegionPtr getRegion() const;
Expand All @@ -128,7 +126,6 @@ class BoundedSSTFilesToBlockInputStream final

private:
const ColId pk_column_id;
const bool is_common_handle;

// Note that we only keep _raw_child for getting ingest info / process key, etc. All block should be
// read from `mvcc_compact_stream`
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/Transaction/PartitionStreams.h>
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/SSTReader.h>
Expand All @@ -31,11 +32,13 @@ namespace DM
{

SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( //
BoundedSSTFilesToBlockInputStreamPtr child_,
TiDB::SnapshotApplyMethod method_,
FileConvertJobType job_type_,
TMTContext & tmt_)
BoundedSSTFilesToBlockInputStreamPtr child_,
const DecodingStorageSchemaSnapshot & schema_snap_,
TiDB::SnapshotApplyMethod method_,
FileConvertJobType job_type_,
TMTContext & tmt_)
: child(child_), //
schema_snap(schema_snap_),
method(method_),
job_type(job_type_),
tmt(tmt_),
Expand Down Expand Up @@ -63,10 +66,8 @@ void SSTFilesToDTFilesOutputStream::writeSuffix()
auto dt_file = dt_stream->getFile();
assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested.
// Add the DTFile to StoragePathPool so that we can restore it later
auto [ingest_storage, _schema_snap] = child->ingestingInfo();
std::ignore = _schema_snap;
const auto bytes_written = dt_file->getBytesOnDisk();
ingest_storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);
const auto bytes_written = dt_file->getBytesOnDisk();
schema_snap.storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);

// Report DMWriteBytes for calculating write amplification
ProfileEvents::increment(ProfileEvents::DMWriteBytes, bytes_written);
Expand Down Expand Up @@ -112,8 +113,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream()
}

// The parent_path and file_id are generated by the storage.
auto [ingest_storage, schema_snap] = child->ingestingInfo();
auto [parent_path, file_id] = ingest_storage->getStore()->preAllocateIngestFile();
auto [parent_path, file_id] = schema_snap.storage->getStore()->preAllocateIngestFile();
if (parent_path.empty())
{
// Can no allocate path and id for storing DTFiles (the storage may be dropped / shutdown)
Expand All @@ -123,7 +123,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream()
LOG_INFO(log,
"Create file for snapshot data " << child->getRegion()->toString(true) << " [file=" << dt_file->path()
<< "] [single_file_mode=" << flags.isSingleFile() << "]");
dt_stream = std::make_unique<DMFileBlockOutputStream>(tmt.getContext(), dt_file, *schema_snap, flags);
dt_stream = std::make_unique<DMFileBlockOutputStream>(tmt.getContext(), dt_file, *(schema_snap.column_defines), flags);
dt_stream->writePrefix();
ingest_files.emplace_back(dt_file);
return true;
Expand Down
23 changes: 12 additions & 11 deletions dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct SSTViewVec;
struct TiFlashRaftProxyHelper;
struct SSTReader;
class StorageDeltaMerge;
struct DecodingStorageSchemaSnapshot;

namespace DM
{
Expand All @@ -48,11 +49,11 @@ enum class FileConvertJobType
class SSTFilesToDTFilesOutputStream : private boost::noncopyable
{
public:
using StorageDeltaMergePtr = std::shared_ptr<StorageDeltaMerge>;
SSTFilesToDTFilesOutputStream(BoundedSSTFilesToBlockInputStreamPtr child_,
TiDB::SnapshotApplyMethod method_,
FileConvertJobType job_type_,
TMTContext & tmt_);
SSTFilesToDTFilesOutputStream(BoundedSSTFilesToBlockInputStreamPtr child_,
const DecodingStorageSchemaSnapshot & schema_snap_,
TiDB::SnapshotApplyMethod method_,
FileConvertJobType job_type_,
TMTContext & tmt_);
~SSTFilesToDTFilesOutputStream();

void writePrefix();
Expand All @@ -65,18 +66,18 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable
void cancel();

private:

bool newDTFileStream();

// Stop the process for decoding committed data into DTFiles
void stop();

private:
BoundedSSTFilesToBlockInputStreamPtr child;
const TiDB::SnapshotApplyMethod method;
const FileConvertJobType job_type;
TMTContext & tmt;
Poco::Logger * log;
BoundedSSTFilesToBlockInputStreamPtr child;
const DecodingStorageSchemaSnapshot & schema_snap;
const TiDB::SnapshotApplyMethod method;
const FileConvertJobType job_type;
TMTContext & tmt;
Poco::Logger * log;

std::unique_ptr<DMFileBlockOutputStream> dt_stream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ try
store = reload(cols, (pk_type == DMTestEnv::PkType::CommonHandle), 1);

ASSERT_EQ(store->isCommonHandle(), pk_type == DMTestEnv::PkType::CommonHandle) << DMTestEnv::PkTypeToString(pk_type);
ASSERT_EQ(store->pkIsHandle(), (pk_type == DMTestEnv::PkType::PkIsHandleInt64 || pk_type == DMTestEnv::PkType::PkIsHandleInt32))
ASSERT_EQ(store->pkIsHandle(store->original_table_handle_define),
(pk_type == DMTestEnv::PkType::PkIsHandleInt64 || pk_type == DMTestEnv::PkType::PkIsHandleInt32))
<< DMTestEnv::PkTypeToString(pk_type);

const size_t nrows = 20;
Expand All @@ -265,7 +266,7 @@ try
handle.type,
store->isCommonHandle(),
store->getRowKeyColumnSize());
block1 = store->addExtraColumnIfNeed(*db_context, std::move(block1));
block1 = DeltaMergeStore::addExtraColumnIfNeed(*db_context, store->getHandle(), std::move(block1));
ASSERT_EQ(block1.rows(), nrows);
ASSERT_TRUE(block1.has(EXTRA_HANDLE_COLUMN_NAME));
for (const auto & c : block1)
Expand All @@ -282,7 +283,7 @@ try
handle.type,
store->isCommonHandle(),
store->getRowKeyColumnSize());
block2 = store->addExtraColumnIfNeed(*db_context, std::move(block2));
block2 = DeltaMergeStore::addExtraColumnIfNeed(*db_context, store->getHandle(), std::move(block2));
ASSERT_EQ(block2.rows(), nrows_2);
ASSERT_TRUE(block2.has(EXTRA_HANDLE_COLUMN_NAME));
for (const auto & c : block2)
Expand Down
Loading

0 comments on commit d20b395

Please sign in to comment.