Skip to content

Commit

Permalink
refactor Region::splitInto.
Browse files Browse the repository at this point in the history
  • Loading branch information
solotzg committed Mar 20, 2019
1 parent 405ea15 commit 6528907
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 150 deletions.
22 changes: 11 additions & 11 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ Block RangesFilterBlockInputStream::readImpl()

size_t rows = block.rows();

auto handle_bg = column->getElement(0);
auto handle_ed = column->getElement(rows - 1);
auto handle_begin = column->getElement(0);
auto handle_end = column->getElement(rows - 1);

if (handle_bg >= ranges.second || ranges.first > handle_ed)
if (handle_begin >= ranges.second || ranges.first > handle_end)
continue;

if (handle_bg >= ranges.first)
if (handle_begin >= ranges.first)
{
if (handle_ed < ranges.second)
if (handle_end < ranges.second)
{
return block;
}
Expand All @@ -58,20 +58,20 @@ Block RangesFilterBlockInputStream::readImpl()
}
else
{
size_t pos_bg
size_t pos_begin
= std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.first) - column->getData().cbegin();
size_t pos_ed = rows;
if (handle_ed >= ranges.second)
pos_ed = std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin();
size_t pos_end = rows;
if (handle_end >= ranges.second)
pos_end = std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin();

size_t len = pos_ed - pos_bg;
size_t len = pos_end - pos_begin;
if (!len)
continue;
for (size_t i = 0; i < block.columns(); i++)
{
ColumnWithTypeAndName & ori_column = block.getByPosition(i);
auto new_column = ori_column.column->cloneEmpty();
new_column->insertRangeFrom(*ori_column.column, pos_bg, len);
new_column->insertRangeFrom(*ori_column.column, pos_begin, len);
ori_column.column = std::move(new_column);
}
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ void extend_mutable_engine_column_names(Names& column_names_to_read, const Merge
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
}

MarkRanges MarkRangesFromRegionRange(const MergeTreeData::DataPart::Index & index, const Int64 handle_bg,
const Int64 handle_ed, MarkRanges mark_ranges, size_t min_marks_for_seek, const Settings & settings)
MarkRanges MarkRangesFromRegionRange(const MergeTreeData::DataPart::Index & index, const Int64 handle_begin,
const Int64 handle_end, MarkRanges mark_ranges, size_t min_marks_for_seek, const Settings & settings)
{
MarkRanges res;

Expand Down Expand Up @@ -187,7 +187,7 @@ MarkRanges MarkRangesFromRegionRange(const MergeTreeData::DataPart::Index & inde
Int64 index_left_handle = index_left.get<Int64>();
Int64 index_right_handle = index_right.get<Int64>();

if (handle_bg >= index_right_handle || index_left_handle >= handle_ed)
if (handle_begin >= index_right_handle || index_left_handle >= handle_end)
continue;

if (range.end == range.begin + 1)
Expand Down
50 changes: 11 additions & 39 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,20 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

KVStore::KVStore(const std::string & data_dir, Context *) : region_persister(data_dir), log(&Logger::get("KVStore"))
KVStore::KVStore(const std::string & data_dir, Context *, std::vector<RegionID> * regions_to_remove) : region_persister(data_dir), log(&Logger::get("KVStore"))
{
std::lock_guard<std::mutex> lock(mutex);
region_persister.restore(regions);

// Remove regions which pending_remove = true, those regions still exist because progress crash after persisted and before removal.
std::vector<RegionID> to_remove;
for (auto p : regions)
if (regions_to_remove != nullptr)
{
RegionPtr & region = p.second;
if (region->isPendingRemove())
to_remove.push_back(region->id());
}

for (auto & region_id : to_remove)
{
LOG_INFO(log, "Region [" << region_id << "] is removed after restored.");
auto it = regions.find(region_id);
RegionPtr region = it->second;
regions.erase(it);
std::ignore = region;
// TODO: remove region from region_table later, not now
for (auto & p : regions)
{
RegionPtr & region = p.second;
if (region->isPendingRemove())
regions_to_remove->push_back(region->id());
}
}
}

Expand All @@ -42,7 +34,7 @@ RegionPtr KVStore::getRegion(RegionID region_id)
return (it == regions.end()) ? nullptr : it->second;
}

RegionMap KVStore::getRegions()
const RegionMap & KVStore::getRegions()
{
std::lock_guard<std::mutex> lock(mutex);
return regions;
Expand Down Expand Up @@ -112,18 +104,6 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
curr_region = it->second;
}

if (curr_region->isPendingRemove())
{
// Normally this situation should not exist. Unless some exceptions throw during former removeRegion.
LOG_DEBUG(log, curr_region->toString() << " (before cmd) is in pending remove status, remove it now.");
removeRegion(curr_region_id, context);

LOG_INFO(log, "Sync status because of removal: " << curr_region->toString(true));
*(responseBatch.mutable_responses()->Add()) = curr_region->toCommandResponse();

continue;
}

if (header.destroy())
{
LOG_INFO(log, curr_region->toString() << " is removed by tombstone.");
Expand All @@ -138,7 +118,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
continue;
}

auto [new_region, split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback);
auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback);

if (curr_region->isPendingRemove())
{
Expand All @@ -156,10 +136,6 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
{
std::lock_guard<std::mutex> lock(mutex);

regions[curr_region_id] = new_region;

curr_region = new_region;

for (const auto & region : split_regions)
{
auto [it, ok] = regions.emplace(region->id(), region);
Expand Down Expand Up @@ -242,19 +218,15 @@ bool KVStore::tryPersistAndReport(RaftContext & context, const Seconds kvstore_t
{
persist_job = true;

size_t persist_parm = region->persistParm();
region_persister.persist(region);
region->markPersisted();
region->updatePersistParm(persist_parm);

ss << "(" << region_id << "," << region->persistParm() << ") ";
*(responseBatch.mutable_responses()->Add()) = region->toCommandResponse();
}

LOG_TRACE(log, "Regions " << ss.str() << "report status");

if (persist_job)
{
LOG_TRACE(log, "Regions " << ss.str() << "report status");
LOG_TRACE(log, "Batch report regions status");
context.send(responseBatch);
}
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds
class KVStore final : private boost::noncopyable
{
public:
KVStore(const std::string & data_dir, Context * context = nullptr);
KVStore(const std::string & data_dir, Context * context = nullptr, std::vector<RegionID> * regions_to_remove = nullptr);
RegionPtr getRegion(RegionID region_id);
void traverseRegions(std::function<void(const RegionID region_id, const RegionPtr & region)> callback);

Expand All @@ -41,9 +41,8 @@ class KVStore final : private boost::noncopyable
bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period=KVSTORE_TRY_PERSIST_PERIOD,
const Seconds region_persist_period=REGION_PERSIST_PERIOD);

RegionMap getRegions();
const RegionMap & getRegions();

private:
void removeRegion(RegionID region_id, Context * context);

private:
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,22 @@ std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTab
if (!region)
return {nullptr, NOT_FOUND, 0};

if (region_version != InvalidRegionVersion && region->version() != region_version)
return {nullptr, VERSION_ERROR, 0};

if (learner_read)
region->wait_index(region->learner_read());
region->waitIndex(region->learnerRead());

auto schema_fetcher = [&](TableID) {
return std::make_tuple<const TiDB::TableInfo *, const ColumnsDescription *, const Names *>(&table_info, &columns, &ordered_columns);
};

{
auto scanner = region->createCommittedScanRemover(table_id);

if (region->isPendingRemove())
return {nullptr, PENDING_REMOVE, 0};

if (region_version != InvalidRegionVersion && region->version() != region_version)
return {nullptr, VERSION_ERROR, 0};

{
Region::LockInfoPtr lock_info = nullptr;
if (resolve_locks)
Expand Down Expand Up @@ -70,4 +74,4 @@ std::tuple<BlockInputStreamPtr, RegionTable::RegionReadStatus, size_t> RegionTab
}
}

} // namespace DB
} // namespace DB
Loading

0 comments on commit 6528907

Please sign in to comment.