Skip to content

Commit

Permalink
Add snapshot for prefix iterate. (#3717)
Browse files Browse the repository at this point in the history
* Add snapshot for prefix iterate.

* Add header comment

* Add happy test

* Fix lint

* Modify engine snap argument position

* Fix other subclass

* Fix code

Co-authored-by: yaphet <4414314+darionyaphet@users.noreply.github.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
4 people authored Jan 21, 2022
1 parent 2d15fb9 commit e14aba8
Show file tree
Hide file tree
Showing 14 changed files with 279 additions and 26 deletions.
24 changes: 22 additions & 2 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ class KVEngine {
bool sync,
bool wait) = 0;

/**
* @brief Get the Snapshot from kv engine.
*
* @return const void* snapshot pointer.
*/
virtual const void* GetSnapshot() = 0;
/**
* @brief Release snapshot from kv engine.
*
* @param snapshot
*/
virtual void ReleaseSnapshot(const void* snapshot) = 0;
// Read a single key
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;

Expand All @@ -62,9 +74,17 @@ class KVEngine {
const std::string& end,
std::unique_ptr<KVIterator>* iter) = 0;

// Get all results with 'prefix' str as prefix.
/**
* @brief Get all results with 'prefix' str as prefix.
*
* @param prefix Prefix string.
* @param snapshot Snapshot from kv engine. nullptr means no snapshot.
* @param iter Iterator for this prefix range.
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* iter) = 0;
std::unique_ptr<KVIterator>* iter,
const void* snapshot = nullptr) = 0;

// Get all results with 'prefix' str as prefix starting form 'start'
virtual nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start,
Expand Down
50 changes: 46 additions & 4 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ class KVStore {
return nullptr;
}

/**
* @brief Get the Snapshot object
*
* @param spaceId Space id
* @param partID Partition id
* @param canReadFromFollower Flag can read from follower.
* @return const void* Snapshot.
*/
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;
/**
* @brief Release snapshot.
*
* @param spaceId Space id.
* @param partId Partition id.
* @param snapshot Snapshot to release.
*/
virtual void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) = 0;

// Read a single key
virtual nebula::cpp2::ErrorCode get(GraphSpaceID spaceId,
PartitionID partId,
Expand Down Expand Up @@ -113,19 +133,41 @@ class KVStore {
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = delete;

// Get all results with prefix.
/**
* @brief Get all results with prefix.
*
* @param spaceId
* @param partId
* @param prefix
* @param iter
* @param canReadFromFollower
* @param snapshot If set, read from snapshot.
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

// To forbid to pass rvalue via the `prefix' parameter.
/**
* @brief To forbid to pass rvalue via the `prefix' parameter.
*
* @param spaceId
* @param partId
* @param prefix
* @param iter
* @param canReadFromFollower
* @param snapshot
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = delete;

// Get all results with prefix starting from start
virtual nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
20 changes: 18 additions & 2 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,23 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};

for (const auto& prefix : tables) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
Expand All @@ -54,14 +69,15 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
// peers. If send failed, will return false.
bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
PartitionID partId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
kvstore::RateLimiter* rateLimiter) {
std::unique_ptr<KVIterator> iter;
auto ret = store_->prefix(spaceId, partId, prefix, &iter);
auto ret = store_->prefix(spaceId, partId, prefix, &iter, false, snapshot);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
private:
bool accessTable(GraphSpaceID spaceId,
PartitionID partId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
Expand Down
29 changes: 27 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,30 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
return part->engine()->get(key, value);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
PartitionID partId,
bool canReadFromFollower) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return nullptr;
}
auto part = nebula::value(ret);
if (!checkLeader(part, canReadFromFollower)) {
return nullptr;
}
return part->engine()->GetSnapshot();
}

void NebulaStore::ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
LOG(INFO) << "Failed to release snapshot for GraphSpaceID " << spaceId << " PartitionID"
<< partId;
}
auto part = nebula::value(ret);
return part->engine()->ReleaseSnapshot(snapshot);
}

std::pair<nebula::cpp2::ErrorCode, std::vector<Status>> NebulaStore::multiGet(
GraphSpaceID spaceId,
PartitionID partId,
Expand Down Expand Up @@ -634,7 +658,8 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -643,7 +668,7 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId,
if (!checkLeader(part, canReadFromFollower)) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->prefix(prefix, iter);
return part->engine()->prefix(prefix, iter, snapshot);
}

nebula::cpp2::ErrorCode NebulaStore::rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
26 changes: 24 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ class NebulaStore : public KVStore, public Handler {
return options_.dataPaths_;
}

/**
* @brief Get the Snapshot from engine.
*
* @param spaceId
* @param partID
* @param canReadFromFollower
* @return const void* Snapshot pointer.
*/
const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) override;
/**
* @brief Release snapshot from engine.
*
* @param spaceId
* @param partId
* @param snapshot
*/
void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override;

nebula::cpp2::ErrorCode get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
Expand Down Expand Up @@ -157,14 +177,16 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

// Delete the overloading with a rvalue `prefix'
nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override = delete;

// Get all results with prefix starting from start
nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class MemPartManager final : public PartManager {
FRIEND_TEST(NebulaStoreTest, TransLeaderTest);
FRIEND_TEST(NebulaStoreTest, CheckpointTest);
FRIEND_TEST(NebulaStoreTest, ThreeCopiesCheckpointTest);
FRIEND_TEST(NebulaStoreTest, ReadSnapshotTest);
FRIEND_TEST(NebulaStoreTest, AtomicOpBatchTest);
FRIEND_TEST(NebulaStoreTest, RemoveInvalidSpaceTest);
FRIEND_TEST(NebulaStoreTest, BackupRestoreTest);
Expand Down
16 changes: 12 additions & 4 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,24 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
}

nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
std::unique_ptr<KVIterator>* storageIter,
const void* snapshot) {
// In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make
// sure the prefix bloom filter exists. But this is quite error-prone, so we do a check here.
if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) {
return prefixWithExtractor(prefix, storageIter);
return prefixWithExtractor(prefix, snapshot, storageIter);
} else {
return prefixWithoutExtractor(prefix, storageIter);
return prefixWithoutExtractor(prefix, snapshot, storageIter);
}
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix,
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
if (snapshot != nullptr) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
options.prefix_same_as_start = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
Expand All @@ -236,8 +241,11 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor(
const std::string& prefix, std::unique_ptr<KVIterator>* storageIter) {
const std::string& prefix, const void* snapshot, std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
if (snapshot != nullptr) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
rocksdb::Iterator* iter = db_->NewIterator(options);
Expand Down
13 changes: 12 additions & 1 deletion src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ class RocksEngine : public KVEngine {
return walPath_.c_str();
}

const void* GetSnapshot() override {
return db_->GetSnapshot();
}

void ReleaseSnapshot(const void* snapshot) override {
db_->ReleaseSnapshot(reinterpret_cast<const rocksdb::Snapshot*>(snapshot));
}

std::unique_ptr<WriteBatch> startBatchWrite() override;

nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
Expand All @@ -166,16 +174,19 @@ class RocksEngine : public KVEngine {
std::unique_ptr<KVIterator>* iter) override;

nebula::cpp2::ErrorCode prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;
std::unique_ptr<KVIterator>* iter,
const void* snapshot = nullptr) override;

nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;

nebula::cpp2::ErrorCode prefixWithExtractor(const std::string& prefix,
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter);

nebula::cpp2::ErrorCode prefixWithoutExtractor(const std::string& prefix,
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter);

nebula::cpp2::ErrorCode scan(std::unique_ptr<KVIterator>* storageIter) override;
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/plugins/hbase/HBaseStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,11 @@ ResultCode HBaseStore::prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
UNUSED(partId);
UNUSED(canReadFromFollower);
UNUSED(snapshot);
return this->prefix(spaceId, prefix, iter);
}

Expand Down
20 changes: 18 additions & 2 deletions src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ class HBaseStore : public KVStore {
return {-1, -1};
}

const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) override {
UNUSED(spaceId);
UNUSED(partID);
UNUSED(canReadFromFollower);
return nullptr;
}
void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(snapshot);
return;
}
ResultCode get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
Expand Down Expand Up @@ -110,14 +124,16 @@ class HBaseStore : public KVStore {
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

// To forbid to pass rvalue via the `prefix' parameter.
ResultCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override = delete;

// Get all results with prefix starting from start
ResultCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
Loading

0 comments on commit e14aba8

Please sign in to comment.