diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index d178c056f68..dd9364a3de2 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -49,6 +49,18 @@ class KVEngine { bool sync, bool wait) = 0; + /** + * @brief Get the Snapshot from kv engine. + * + * @return const void* snapshot pointer. + */ + virtual const void* GetSnapshot() = 0; + /** + * @brief Release snapshot from kv engine. + * + * @param snapshot + */ + virtual void ReleaseSnapshot(const void* snapshot) = 0; // Read a single key virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0; @@ -62,9 +74,17 @@ class KVEngine { const std::string& end, std::unique_ptr* iter) = 0; - // Get all results with 'prefix' str as prefix. + /** + * @brief Get all results with 'prefix' str as prefix. + * + * @param prefix Prefix string. + * @param snapshot Snapshot from kv engine. nullptr means no snapshot. + * @param iter Iterator for this prefix range. + * @return nebula::cpp2::ErrorCode + */ virtual nebula::cpp2::ErrorCode prefix(const std::string& prefix, - std::unique_ptr* iter) = 0; + std::unique_ptr* iter, + const void* snapshot = nullptr) = 0; // Get all results with 'prefix' str as prefix starting form 'start' virtual nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start, diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index dcda46ae7fd..0a21920979a 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -78,6 +78,26 @@ class KVStore { return nullptr; } + /** + * @brief Get the Snapshot object + * + * @param spaceId Space id + * @param partID Partition id + * @param canReadFromFollower Flag can read from follower. + * @return const void* Snapshot. + */ + virtual const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) = 0; + /** + * @brief Release snapshot. + * + * @param spaceId Space id. + * @param partId Partition id. + * @param snapshot Snapshot to release. + */ + virtual void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) = 0; + // Read a single key virtual nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, PartitionID partId, @@ -113,19 +133,41 @@ class KVStore { std::unique_ptr* iter, bool canReadFromFollower = false) = delete; - // Get all results with prefix. + /** + * @brief Get all results with prefix. + * + * @param spaceId + * @param partId + * @param prefix + * @param iter + * @param canReadFromFollower + * @param snapshot If set, read from snapshot. + * @return nebula::cpp2::ErrorCode + */ virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) = 0; + bool canReadFromFollower = false, + const void* snapshot = nullptr) = 0; - // To forbid to pass rvalue via the `prefix' parameter. + /** + * @brief To forbid to pass rvalue via the `prefix' parameter. + * + * @param spaceId + * @param partId + * @param prefix + * @param iter + * @param canReadFromFollower + * @param snapshot + * @return nebula::cpp2::ErrorCode + */ virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, std::string&& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) = delete; + bool canReadFromFollower = false, + const void* snapshot = nullptr) = delete; // Get all results with prefix starting from start virtual nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 3103840a049..a59d7e8ee46 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -42,8 +42,23 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, FLAGS_snapshot_part_rate_limit, FLAGS_snapshot_batch_size); + const void* snapshot = store_->GetSnapshot(spaceId, partId); + SCOPE_EXIT { + if (snapshot != nullptr) { + store_->ReleaseSnapshot(spaceId, partId, snapshot); + } + }; + for (const auto& prefix : tables) { - if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) { + if (!accessTable(spaceId, + partId, + snapshot, + prefix, + cb, + data, + totalCount, + totalSize, + rateLimiter.get())) { return; } } @@ -54,6 +69,7 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, // peers. If send failed, will return false. bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, PartitionID partId, + const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, std::vector& data, @@ -61,7 +77,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, int64_t& totalSize, kvstore::RateLimiter* rateLimiter) { std::unique_ptr iter; - auto ret = store_->prefix(spaceId, partId, prefix, &iter); + auto ret = store_->prefix(spaceId, partId, prefix, &iter, false, snapshot); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed" << ", error code:" << static_cast(ret); diff --git a/src/kvstore/NebulaSnapshotManager.h b/src/kvstore/NebulaSnapshotManager.h index 0fa0b4933a7..8dca397619b 100644 --- a/src/kvstore/NebulaSnapshotManager.h +++ b/src/kvstore/NebulaSnapshotManager.h @@ -26,6 +26,7 @@ class NebulaSnapshotManager : public raftex::SnapshotManager { private: bool accessTable(GraphSpaceID spaceId, PartitionID partId, + const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, std::vector& data, diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index b6be72e3209..ec53d1c0e57 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -589,6 +589,30 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, return part->engine()->get(key, value); } +const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId, + PartitionID partId, + bool canReadFromFollower) { + auto ret = part(spaceId, partId); + if (!ok(ret)) { + return nullptr; + } + auto part = nebula::value(ret); + if (!checkLeader(part, canReadFromFollower)) { + return nullptr; + } + return part->engine()->GetSnapshot(); +} + +void NebulaStore::ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) { + auto ret = part(spaceId, partId); + if (!ok(ret)) { + LOG(INFO) << "Failed to release snapshot for GraphSpaceID " << spaceId << " PartitionID" + << partId; + } + auto part = nebula::value(ret); + return part->engine()->ReleaseSnapshot(snapshot); +} + std::pair> NebulaStore::multiGet( GraphSpaceID spaceId, PartitionID partId, @@ -634,7 +658,8 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { auto ret = part(spaceId, partId); if (!ok(ret)) { return error(ret); @@ -643,7 +668,7 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId, if (!checkLeader(part, canReadFromFollower)) { return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } - return part->engine()->prefix(prefix, iter); + return part->engine()->prefix(prefix, iter, snapshot); } nebula::cpp2::ErrorCode NebulaStore::rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 720bc8c2ee9..985d6e52bd5 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -123,6 +123,26 @@ class NebulaStore : public KVStore, public Handler { return options_.dataPaths_; } + /** + * @brief Get the Snapshot from engine. + * + * @param spaceId + * @param partID + * @param canReadFromFollower + * @return const void* Snapshot pointer. + */ + const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) override; + /** + * @brief Release snapshot from engine. + * + * @param spaceId + * @param partId + * @param snapshot + */ + void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override; + nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, @@ -157,14 +177,16 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override; // Delete the overloading with a rvalue `prefix' nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, std::string&& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override = delete; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override = delete; // Get all results with prefix starting from start nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index a04644a068d..0f7b8b80c1c 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -110,6 +110,7 @@ class MemPartManager final : public PartManager { FRIEND_TEST(NebulaStoreTest, TransLeaderTest); FRIEND_TEST(NebulaStoreTest, CheckpointTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesCheckpointTest); + FRIEND_TEST(NebulaStoreTest, ReadSnapshotTest); FRIEND_TEST(NebulaStoreTest, AtomicOpBatchTest); FRIEND_TEST(NebulaStoreTest, RemoveInvalidSpaceTest); FRIEND_TEST(NebulaStoreTest, BackupRestoreTest); diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index fd9c1a03851..6a5ef04baf6 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -213,19 +213,24 @@ nebula::cpp2::ErrorCode RocksEngine::range(const std::string& start, } nebula::cpp2::ErrorCode RocksEngine::prefix(const std::string& prefix, - std::unique_ptr* storageIter) { + std::unique_ptr* storageIter, + const void* snapshot) { // In fact, we don't need to check prefix.size() >= extractorLen_, which is caller's duty to make // sure the prefix bloom filter exists. But this is quite error-prone, so we do a check here. if (FLAGS_enable_rocksdb_prefix_filtering && prefix.size() >= extractorLen_) { - return prefixWithExtractor(prefix, storageIter); + return prefixWithExtractor(prefix, snapshot, storageIter); } else { - return prefixWithoutExtractor(prefix, storageIter); + return prefixWithoutExtractor(prefix, snapshot, storageIter); } } nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& prefix, + const void* snapshot, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; + if (snapshot != nullptr) { + options.snapshot = reinterpret_cast(snapshot); + } options.prefix_same_as_start = true; rocksdb::Iterator* iter = db_->NewIterator(options); if (iter) { @@ -236,8 +241,11 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref } nebula::cpp2::ErrorCode RocksEngine::prefixWithoutExtractor( - const std::string& prefix, std::unique_ptr* storageIter) { + const std::string& prefix, const void* snapshot, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; + if (snapshot != nullptr) { + options.snapshot = reinterpret_cast(snapshot); + } // prefix_same_as_start is false by default options.total_order_seek = FLAGS_enable_rocksdb_prefix_filtering; rocksdb::Iterator* iter = db_->NewIterator(options); diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 66bf1c358cd..16f5ba0216f 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -146,6 +146,14 @@ class RocksEngine : public KVEngine { return walPath_.c_str(); } + const void* GetSnapshot() override { + return db_->GetSnapshot(); + } + + void ReleaseSnapshot(const void* snapshot) override { + db_->ReleaseSnapshot(reinterpret_cast(snapshot)); + } + std::unique_ptr startBatchWrite() override; nebula::cpp2::ErrorCode commitBatchWrite(std::unique_ptr batch, @@ -166,16 +174,19 @@ class RocksEngine : public KVEngine { std::unique_ptr* iter) override; nebula::cpp2::ErrorCode prefix(const std::string& prefix, - std::unique_ptr* iter) override; + std::unique_ptr* iter, + const void* snapshot = nullptr) override; nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start, const std::string& prefix, std::unique_ptr* iter) override; nebula::cpp2::ErrorCode prefixWithExtractor(const std::string& prefix, + const void* snapshot, std::unique_ptr* storageIter); nebula::cpp2::ErrorCode prefixWithoutExtractor(const std::string& prefix, + const void* snapshot, std::unique_ptr* storageIter); nebula::cpp2::ErrorCode scan(std::unique_ptr* storageIter) override; diff --git a/src/kvstore/plugins/hbase/HBaseStore.cpp b/src/kvstore/plugins/hbase/HBaseStore.cpp index 78784ef57cb..5d05dee490d 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.cpp +++ b/src/kvstore/plugins/hbase/HBaseStore.cpp @@ -292,9 +292,11 @@ ResultCode HBaseStore::prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { UNUSED(partId); UNUSED(canReadFromFollower); + UNUSED(snapshot); return this->prefix(spaceId, prefix, iter); } diff --git a/src/kvstore/plugins/hbase/HBaseStore.h b/src/kvstore/plugins/hbase/HBaseStore.h index a8d628758c0..7b085be7fb0 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.h +++ b/src/kvstore/plugins/hbase/HBaseStore.h @@ -75,6 +75,20 @@ class HBaseStore : public KVStore { return {-1, -1}; } + const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) override { + UNUSED(spaceId); + UNUSED(partID); + UNUSED(canReadFromFollower); + return nullptr; + } + void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(snapshot); + return; + } ResultCode get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, @@ -110,14 +124,16 @@ class HBaseStore : public KVStore { PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override; // To forbid to pass rvalue via the `prefix' parameter. ResultCode prefix(GraphSpaceID spaceId, PartitionID partId, std::string&& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override = delete; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override = delete; // Get all results with prefix starting from start ResultCode rangeWithPrefix(GraphSpaceID spaceId, diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 3bff012cdca..fcd96929f06 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -760,6 +760,78 @@ TEST(NebulaStoreTest, ThreeCopiesCheckpointTest) { } } +TEST(NebulaStoreTest, ReadSnapshotTest) { + auto partMan = std::make_unique(); + auto ioThreadPool = std::make_shared(4); + // space id : 1 , part id : 0 + partMan->partsMap_[1][0] = PartHosts(); + + VLOG(1) << "Total space num is " << partMan->partsMap_.size() + << ", total local partitions num is " << partMan->parts(HostAddr("", 0)).size(); + + fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX"); + std::vector paths; + paths.emplace_back(folly::stringPrintf("%s/disk1", rootPath.path())); + + KVOptions options; + options.dataPaths_ = std::move(paths); + options.partMan_ = std::move(partMan); + HostAddr local = {"", 0}; + auto store = + std::make_unique(std::move(options), ioThreadPool, local, getHandlers()); + store->init(); + sleep(FLAGS_raft_heartbeat_interval_secs); + // put kv + { + std::vector> expected, result; + auto atomic = [&]() -> std::string { + std::unique_ptr batchHolder = std::make_unique(); + for (auto i = 0; i < 20; i++) { + auto key = folly::stringPrintf("key_%d", i); + auto val = folly::stringPrintf("val_%d", i); + batchHolder->put(key.data(), val.data()); + expected.emplace_back(std::move(key), std::move(val)); + } + return encodeBatchValue(batchHolder->getBatch()); + }; + + folly::Baton baton; + auto callback = [&](nebula::cpp2::ErrorCode code) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }; + store->asyncAtomicOp(1, 0, atomic, callback); + baton.wait(); + + std::unique_ptr iter; + std::string prefix("key"); + const void* snapshot = store->GetSnapshot(1, 0); + SCOPE_EXIT { + store->ReleaseSnapshot(1, 0, snapshot); + }; + std::vector data; + for (auto i = 20; i < 40; i++) { + auto key = folly::stringPrintf("key_%d", i); + auto val = folly::stringPrintf("val_%d", i); + data.emplace_back(key, val); + } + folly::Baton baton2; + store->asyncMultiPut(1, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); + baton2.post(); + }); + baton2.wait(); + auto ret = store->prefix(1, 0, prefix, &iter, false, snapshot); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret); + while (iter->valid()) { + result.emplace_back(iter->key(), iter->val()); + iter->next(); + } + std::sort(expected.begin(), expected.end()); + EXPECT_EQ(expected, result); + } +} + TEST(NebulaStoreTest, AtomicOpBatchTest) { auto partMan = std::make_unique(); auto ioThreadPool = std::make_shared(4); diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index df2f831b91c..09c5747dfa3 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -70,6 +70,21 @@ class MockKVStore : public ::nebula::kvstore::KVStore { CHECK(false); return nebula::cpp2::ErrorCode::SUCCEEDED; } + + const void* GetSnapshot(GraphSpaceID spaceId, + PartitionID partID, + bool canReadFromFollower = false) override { + UNUSED(spaceId); + UNUSED(partID); + UNUSED(canReadFromFollower); + return nullptr; + } + void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(snapshot); + return; + } // Read a single key nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, PartitionID partId, @@ -154,10 +169,12 @@ class MockKVStore : public ::nebula::kvstore::KVStore { PartitionID partId, const std::string& prefix, std::unique_ptr* iter, - bool canReadFromFollower = false) override { + bool canReadFromFollower = false, + const void* snapshot = nullptr) override { UNUSED(canReadFromFollower); UNUSED(spaceId); UNUSED(partId); + UNUSED(snapshot); // Pity that mock kv don't have snap. CHECK_EQ(spaceId, spaceId_); auto mockIter = std::make_unique(kv_, kv_.lower_bound(prefix)); mockIter->setValidFunc([prefix](const decltype(kv_)::iterator& it) { diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index daedb8451ba..cf66ea4fc87 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -219,7 +219,7 @@ void UpgraderSpace::runPartV1() { << partId; const auto& prefix = NebulaKeyUtilsV1::prefix(partId); std::unique_ptr iter; - auto retCode = readEngine_->prefix(prefix, &iter); + auto retCode = readEngine_->prefix(prefix, nullptr, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " @@ -393,7 +393,7 @@ void UpgraderSpace::doProcessV1() { LOG(INFO) << "Start to handle system data in space id " << spaceId_; auto prefix = NebulaKeyUtilsV1::systemPrefix(); std::unique_ptr iter; - auto retCode = readEngine_->prefix(prefix, &iter); + auto retCode = readEngine_->prefix(prefix, nullptr, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Space id " << spaceId_ << " get system data failed"; LOG(ERROR) << "Handle system data in space id " << spaceId_ << " failed"; @@ -433,7 +433,7 @@ void UpgraderSpace::runPartV2() { << partId; auto prefix = NebulaKeyUtilsV2::partPrefix(partId); std::unique_ptr iter; - auto retCode = readEngine_->prefix(prefix, &iter); + auto retCode = readEngine_->prefix(prefix, nullptr, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " @@ -601,7 +601,7 @@ void UpgraderSpace::doProcessV2() { LOG(INFO) << "Start to handle system data in space id " << spaceId_; auto prefix = NebulaKeyUtilsV2::systemPrefix(); std::unique_ptr iter; - auto retCode = readEngine_->prefix(prefix, &iter); + auto retCode = readEngine_->prefix(prefix, nullptr, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Space id " << spaceId_ << " get system data failed."; LOG(ERROR) << "Handle system data in space id " << spaceId_ << " failed."; @@ -894,7 +894,7 @@ void UpgraderSpace::runPartV3() { << partId; auto prefix = NebulaKeyUtilsV3::partTagPrefix(partId); std::unique_ptr iter; - auto retCode = readEngine_->prefix(prefix, &iter); + auto retCode = readEngine_->prefix(prefix, nullptr, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id "