Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snapshot for prefix iterate. #3717

Merged
merged 13 commits into from
Jan 21, 2022
24 changes: 22 additions & 2 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ class KVEngine {
bool sync,
bool wait) = 0;

/**
* @brief Get the Snapshot from kv engine.
*
* @return const void* snapshot pointer.
*/
virtual const void* GetSnapshot() = 0;
/**
* @brief Release snapshot from kv engine.
*
* @param snapshot
*/
virtual void ReleaseSnapshot(const void* snapshot) = 0;
// Read a single key
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;

Expand All @@ -62,9 +74,17 @@ class KVEngine {
const std::string& end,
std::unique_ptr<KVIterator>* iter) = 0;

// Get all results with 'prefix' str as prefix.
/**
* @brief Get all results with 'prefix' str as prefix.
*
* @param prefix Prefix string.
* @param snapshot Snapshot from kv engine. nullptr means no snapshot.
* @param iter Iterator for this prefix range.
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* iter) = 0;
std::unique_ptr<KVIterator>* iter,
const void* snapshot = nullptr) = 0;

// Get all results with 'prefix' str as prefix starting form 'start'
virtual nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start,
Expand Down
50 changes: 46 additions & 4 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ class KVStore {
return nullptr;
}

/**
* @brief Get the Snapshot object
*
* @param spaceId Space id
* @param partID Partition id
* @param canReadFromFollower Flag can read from follower.
* @return const void* Snapshot.
*/
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;
/**
* @brief Release snapshot.
*
* @param spaceId Space id.
* @param partId Partition id.
* @param snapshot Snapshot to release.
*/
virtual void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) = 0;

// Read a single key
virtual nebula::cpp2::ErrorCode get(GraphSpaceID spaceId,
PartitionID partId,
Expand Down Expand Up @@ -113,19 +133,41 @@ class KVStore {
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = delete;

// Get all results with prefix.
/**
* @brief Get all results with prefix.
*
* @param spaceId
* @param partId
* @param prefix
* @param iter
* @param canReadFromFollower
* @param snapshot If set, read from snapshot.
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

// To forbid to pass rvalue via the `prefix' parameter.
/**
* @brief To forbid to pass rvalue via the `prefix' parameter.
*
* @param spaceId
* @param partId
* @param prefix
* @param iter
* @param canReadFromFollower
* @param snapshot
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = delete;

// Get all results with prefix starting from start
virtual nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
20 changes: 18 additions & 2 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,23 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};

for (const auto& prefix : tables) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
Expand All @@ -54,14 +69,15 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
// peers. If send failed, will return false.
bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
PartitionID partId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
kvstore::RateLimiter* rateLimiter) {
std::unique_ptr<KVIterator> iter;
auto ret = store_->prefix(spaceId, partId, prefix, &iter);
auto ret = store_->prefix(spaceId, partId, prefix, &iter, false, snapshot);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
private:
bool accessTable(GraphSpaceID spaceId,
PartitionID partId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
Expand Down
29 changes: 27 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,30 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
return part->engine()->get(key, value);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
PartitionID partId,
bool canReadFromFollower) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return nullptr;
}
auto part = nebula::value(ret);
if (!checkLeader(part, canReadFromFollower)) {
return nullptr;
}
return part->engine()->GetSnapshot();
}

void NebulaStore::ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
LOG(INFO) << "Failed to release snapshot for GraphSpaceID " << spaceId << " PartitionID"
<< partId;
}
auto part = nebula::value(ret);
return part->engine()->ReleaseSnapshot(snapshot);
}

std::pair<nebula::cpp2::ErrorCode, std::vector<Status>> NebulaStore::multiGet(
GraphSpaceID spaceId,
PartitionID partId,
Expand Down Expand Up @@ -634,7 +658,8 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -643,7 +668,7 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId,
if (!checkLeader(part, canReadFromFollower)) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->prefix(prefix, iter);
return part->engine()->prefix(prefix, iter, snapshot);
}

nebula::cpp2::ErrorCode NebulaStore::rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
26 changes: 24 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ class NebulaStore : public KVStore, public Handler {
return options_.dataPaths_;
}

/**
* @brief Get the Snapshot from engine.
*
* @param spaceId
* @param partID
* @param canReadFromFollower
* @return const void* Snapshot pointer.
*/
const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) override;
/**
* @brief Release snapshot from engine.
*
* @param spaceId
* @param partId
* @param snapshot
*/
void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override;

nebula::cpp2::ErrorCode get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
Expand Down Expand Up @@ -157,14 +177,16 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

// Delete the overloading with a rvalue `prefix'
nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override = delete;

// Get all results with prefix starting from start
nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class MemPartManager final : public PartManager {
FRIEND_TEST(NebulaStoreTest, TransLeaderTest);
FRIEND_TEST(NebulaStoreTest, CheckpointTest);
FRIEND_TEST(NebulaStoreTest, ThreeCopiesCheckpointTest);
FRIEND_TEST(NebulaStoreTest, ReadSnapshotTest);
FRIEND_TEST(NebulaStoreTest, AtomicOpBatchTest);
FRIEND_TEST(NebulaStoreTest, RemoveInvalidSpaceTest);
FRIEND_TEST(NebulaStoreTest, BackupRestoreTest);
Expand Down
16 changes: 12 additions & 4 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,24 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start,
}

nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* storageIter) {
std::unique_ptr<KVIterator>* storageIter,
const void* snapshot) {
// In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make
// sure the prefix bloom filter exists. But this is quite error-prone, so we do a check here.
if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) {
return prefixWithExtractor(prefix, storageIter);
return prefixWithExtractor(prefix, snapshot, storageIter);
} else {
return prefixWithoutExtractor(prefix, storageIter);
return prefixWithoutExtractor(prefix, snapshot, storageIter);
}
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix,
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
if (snapshot != nullptr) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
options.prefix_same_as_start = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
if (iter) {
Expand All @@ -236,8 +241,11 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref
}

nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor(
const std::string& prefix, std::unique_ptr<KVIterator>* storageIter) {
const std::string& prefix, const void* snapshot, std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
if (snapshot != nullptr) {
options.snapshot = reinterpret_cast<const rocksdb::Snapshot*>(snapshot);
}
// prefix_same_as_start is false by default
options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering;
rocksdb::Iterator* iter = db_->NewIterator(options);
Expand Down
13 changes: 12 additions & 1 deletion src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ class RocksEngine : public KVEngine {
return walPath_.c_str();
}

const void* GetSnapshot() override {
return db_->GetSnapshot();
}

void ReleaseSnapshot(const void* snapshot) override {
db_->ReleaseSnapshot(reinterpret_cast<const rocksdb::Snapshot*>(snapshot));
Copy link
Contributor

@darionyaphet darionyaphet Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReleaseSnapshot would remove files from file system?

Copy link
Contributor Author

@SuperYoko SuperYoko Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, It's only a memory snapshot. But it do prevent files to be deleted before snapshot is released.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK snapshot will create some hard link about the previous SST files why don't remove them?

Copy link
Contributor Author

@SuperYoko SuperYoko Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK snapshot will create some hard link about the previous SST files why don't remove them?

https://github.com/facebook/rocksdb/wiki/Snapshot

A snapshot captures a point-in-time view of the DB at the time it's created. Snapshots do not persist across DB restarts.

Maybe you mean Checkpoint.

https://github.com/facebook/rocksdb/wiki/Checkpoints

}

std::unique_ptr<WriteBatch> startBatchWrite() override;

nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
Expand All @@ -166,16 +174,19 @@ class RocksEngine : public KVEngine {
std::unique_ptr<KVIterator>* iter) override;

nebula::cpp2::ErrorCode prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;
std::unique_ptr<KVIterator>* iter,
SuperYoko marked this conversation as resolved.
Show resolved Hide resolved
const void* snapshot = nullptr) override;

nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;

nebula::cpp2::ErrorCode prefixWithExtractor(const std::string& prefix,
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter);

nebula::cpp2::ErrorCode prefixWithoutExtractor(const std::string& prefix,
const void* snapshot,
std::unique_ptr<KVIterator>* storageIter);

nebula::cpp2::ErrorCode scan(std::unique_ptr<KVIterator>* storageIter) override;
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/plugins/hbase/HBaseStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,11 @@ ResultCode HBaseStore::prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
UNUSED(partId);
UNUSED(canReadFromFollower);
UNUSED(snapshot);
return this->prefix(spaceId, prefix, iter);
}

Expand Down
20 changes: 18 additions & 2 deletions src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ class HBaseStore : public KVStore {
return {-1, -1};
}

const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) override {
UNUSED(spaceId);
UNUSED(partID);
UNUSED(canReadFromFollower);
return nullptr;
}
void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(snapshot);
return;
}
ResultCode get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
Expand Down Expand Up @@ -110,14 +124,16 @@ class HBaseStore : public KVStore {
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

// To forbid to pass rvalue via the `prefix' parameter.
ResultCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override = delete;

// Get all results with prefix starting from start
ResultCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
Loading