diff --git a/src/meta/http/MetaHttpReplaceHostHandler.cpp b/src/meta/http/MetaHttpReplaceHostHandler.cpp index ad3cbc0f062..edc602a1094 100644 --- a/src/meta/http/MetaHttpReplaceHostHandler.cpp +++ b/src/meta/http/MetaHttpReplaceHostHandler.cpp @@ -37,21 +37,20 @@ void MetaHttpReplaceHostHandler::onRequest(std::unique_ptr headers) if (!headers->hasQueryParam("from")) { err_ = HttpCode::E_ILLEGAL_ARGUMENT; - errMsg_ = "miss argument [from]"; + errMsg_ = "Miss argument [from]"; return; } if (!headers->hasQueryParam("to")) { err_ = HttpCode::E_ILLEGAL_ARGUMENT; - errMsg_ = "miss argument [to]"; + errMsg_ = "Miss argument [to]"; return; } ipv4From_ = headers->getQueryParam("from"); - ipv4To_ = headers->getQueryParam("to"); - LOG(INFO) << folly::sformat("change host info from {} to {}", ipv4From_, ipv4To_); + LOG(INFO) << folly::sformat("Change host info from {} to {}", ipv4From_, ipv4To_); } void MetaHttpReplaceHostHandler::onBody(std::unique_ptr) noexcept { @@ -76,15 +75,15 @@ void MetaHttpReplaceHostHandler::onEOM() noexcept { break; } - if (replaceHost(ipv4From_, ipv4To_)) { - LOG(INFO) << "Replace Host successfully"; + if (replaceHostInPart(ipv4From_, ipv4To_) && replaceHostInZone(ipv4From_, ipv4To_)) { + LOG(INFO) << "Replace Host in partition and zone successfully"; ResponseBuilder(downstream_) .status(WebServiceUtils::to(HttpStatusCode::OK), WebServiceUtils::toString(HttpStatusCode::OK)) - .body("Replace Host successfully") + .body("Replace Host in partition and zone successfully") .sendWithEOM(); } else { - LOG(INFO) << "Replace Host failed"; + LOG(INFO) << "Replace Host in partition and zone failed"; ResponseBuilder(downstream_) .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) @@ -106,13 +105,13 @@ void MetaHttpReplaceHostHandler::onError(ProxygenError error) noexcept { << proxygen::getErrorString(error); } -bool MetaHttpReplaceHostHandler::replaceHost(std::string ipv4From, std::string ipv4To) { +bool MetaHttpReplaceHostHandler::replaceHostInPart(std::string ipv4From, std::string ipv4To) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); const auto& spacePrefix = MetaKeyUtils::spacePrefix(); std::unique_ptr iter; auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, spacePrefix, &iter); if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - errMsg_ = folly::stringPrintf("can't get space prefix=%s", spacePrefix.c_str()); + errMsg_ = folly::stringPrintf("Can't get space prefix=%s", spacePrefix.c_str()); LOG(INFO) << errMsg_; return false; } @@ -123,14 +122,14 @@ bool MetaHttpReplaceHostHandler::replaceHost(std::string ipv4From, std::string i allSpaceId.emplace_back(spaceId); iter->next(); } - LOG(INFO) << "allSpaceId.size()=" << allSpaceId.size(); + LOG(INFO) << "AllSpaceId.size()=" << allSpaceId.size(); std::vector data; for (const auto& spaceId : allSpaceId) { const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId); kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - errMsg_ = folly::stringPrintf("can't get partPrefix=%s", partPrefix.c_str()); + errMsg_ = folly::stringPrintf("Can't get partPrefix=%s", partPrefix.c_str()); LOG(INFO) << errMsg_; return false; } @@ -156,12 +155,54 @@ bool MetaHttpReplaceHostHandler::replaceHost(std::string ipv4From, std::string i kvstore_->asyncMultiPut( kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { updateSucceed = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - errMsg_ = folly::stringPrintf("write to kvstore failed, %s , %d", __func__, __LINE__); + errMsg_ = folly::stringPrintf("Write to kvstore failed, %s , %d", __func__, __LINE__); baton.post(); }); baton.wait(); return updateSucceed; } +bool MetaHttpReplaceHostHandler::replaceHostInZone(std::string ipv4From, std::string ipv4To) { + folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); + const auto& zonePrefix = MetaKeyUtils::zonePrefix(); + std::unique_ptr iter; + auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &iter); + if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + errMsg_ = folly::stringPrintf("Can't get zone prefix=%s", zonePrefix.c_str()); + LOG(INFO) << errMsg_; + return false; + } + + std::vector data; + while (iter->valid()) { + bool needUpdate = false; + auto zoneName = MetaKeyUtils::parseZoneName(iter->key()); + auto hosts = MetaKeyUtils::parseZoneHosts(iter->val()); + for (auto& host : hosts) { + if (host.host == ipv4From) { + host.host = ipv4To; + needUpdate = true; + } + } + + if (needUpdate) { + data.emplace_back(iter->key(), MetaKeyUtils::zoneVal(hosts)); + } + iter->next(); + } + + bool updateSucceed{false}; + folly::Baton baton; + kvstore_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + updateSucceed = (code == nebula::cpp2::ErrorCode::SUCCEEDED); + errMsg_ = folly::stringPrintf("Write to kvstore failed, %s , %d", __func__, __LINE__); + baton.post(); + }); + baton.wait(); + + return updateSucceed; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/http/MetaHttpReplaceHostHandler.h b/src/meta/http/MetaHttpReplaceHostHandler.h index 75d2430ab0a..0b8a793f425 100644 --- a/src/meta/http/MetaHttpReplaceHostHandler.h +++ b/src/meta/http/MetaHttpReplaceHostHandler.h @@ -20,8 +20,8 @@ using nebula::HttpCode; /** * @brief It will replace host info in meta partition table from * backup host to current cluster host. - * It should be called after ingesting meta sst files when - * restore cluster. + * It will replace given host in zone table and partition table. Notice that, + * it only replace the host without port. * Functions such as onRequest, onBody... and requestComplete are inherited * from RequestHandler, we will check request parameters in onRequest and * call main logic in onEOM. @@ -33,7 +33,7 @@ class MetaHttpReplaceHostHandler : public proxygen::RequestHandler { public: MetaHttpReplaceHostHandler() = default; - void init(nebula::kvstore::KVStore *kvstore); + void init(nebula::kvstore::KVStore* kvstore); void onRequest(std::unique_ptr headers) noexcept override; @@ -47,14 +47,15 @@ class MetaHttpReplaceHostHandler : public proxygen::RequestHandler { void onError(proxygen::ProxygenError error) noexcept override; - bool replaceHost(std::string ipv4From, std::string ipv4To); + bool replaceHostInPart(std::string ipv4From, std::string ipv4To); + bool replaceHostInZone(std::string ipv4From, std::string ipv4To); private: HttpCode err_{HttpCode::SUCCEEDED}; std::string errMsg_; std::string ipv4From_; std::string ipv4To_; - nebula::kvstore::KVStore *kvstore_; + nebula::kvstore::KVStore* kvstore_; }; } // namespace meta diff --git a/src/meta/http/test/CMakeLists.txt b/src/meta/http/test/CMakeLists.txt index ad2a7067eea..9f116aefe5e 100644 --- a/src/meta/http/test/CMakeLists.txt +++ b/src/meta/http/test/CMakeLists.txt @@ -33,19 +33,19 @@ nebula_add_test( ) -# nebula_add_test( -# NAME -# meta_http_replace_test -# SOURCES -# MetaHttpReplaceHandlerTest.cpp -# OBJECTS -# $ -# $ -# ${meta_test_deps} -# LIBRARIES -# ${ROCKSDB_LIBRARIES} -# ${THRIFT_LIBRARIES} -# ${PROXYGEN_LIBRARIES} -# wangle -# gtest -# ) +nebula_add_test( + NAME + meta_http_replace_test + SOURCES + MetaHttpReplaceHandlerTest.cpp + OBJECTS + $ + $ + ${meta_test_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest +) diff --git a/src/meta/http/test/MetaHttpReplaceHandlerTest.cpp b/src/meta/http/test/MetaHttpReplaceHandlerTest.cpp index ad22d9667f8..c078d35e1db 100644 --- a/src/meta/http/test/MetaHttpReplaceHandlerTest.cpp +++ b/src/meta/http/test/MetaHttpReplaceHandlerTest.cpp @@ -23,7 +23,8 @@ namespace meta { meta::MetaHttpReplaceHostHandler* gHandler = nullptr; kvstore::KVStore* gKVStore = nullptr; -std::vector dumpKVStore(kvstore::KVStore* kvstore); +std::set gHosts; +std::set dumpHosts(kvstore::KVStore* kvstore); class MetaHttpReplaceHandlerTestEnv : public ::testing::Environment { public: @@ -31,23 +32,32 @@ class MetaHttpReplaceHandlerTestEnv : public ::testing::Environment { FLAGS_ws_http_port = 0; FLAGS_ws_h2_port = 0; LOG(INFO) << "Starting web service..."; - rootPath_ = std::make_unique("/tmp/MetaHttpReplaceHandler.XXXXXX"); kv_ = MockCluster::initMetaKV(rootPath_->path()); - TestUtils::createSomeHosts(kv_.get()); - TestUtils::assembleSpace(kv_.get(), 1, 1, 1); - gKVStore = kv_.get(); + LOG(INFO) << "Prepare data..."; + HostAddr host0("0", 0), host1("1", 1), host2("2", 2), host3("3", 3); + gHosts.insert(host0.host); + gHosts.insert(host1.host); + gHosts.insert(host2.host); + gHosts.insert(host3.host); + + TestUtils::createSomeHosts(gKVStore, {host0, host1, host2, host3}); + // Notice: it will add part1~4 to hosts host0~3, host0~3 are generated in the function which is + // a little wired. + TestUtils::assembleSpace(gKVStore, 1, 4, 1, 4); + ZoneInfo zoneInfo = {{"zone_0", {host0, host1}}, {"zone_1", {host2, host3}}}; + TestUtils::assembleZone(gKVStore, zoneInfo); + + LOG(INFO) << "Setup webservice with replace handler..."; webSvc_ = std::make_unique(); auto& router = webSvc_->router(); - router.get("/replace").handler([&](nebula::web::PathParams&&) { gHandler = new meta::MetaHttpReplaceHostHandler(); gHandler->init(gKVStore); return gHandler; }); - auto status = webSvc_->start(); ASSERT_TRUE(status.ok()) << status; } @@ -56,6 +66,10 @@ class MetaHttpReplaceHandlerTestEnv : public ::testing::Environment { kv_.reset(); rootPath_.reset(); webSvc_.reset(); + + gHandler = nullptr; + gKVStore = nullptr; + gHosts.clear(); LOG(INFO) << "Web service stopped"; } @@ -71,24 +85,26 @@ StatusOr silentCurl(const std::string& path) { } TEST(MetaHttpReplaceHandlerTest, FooTest) { - std::vector dump = dumpKVStore(gKVStore); + auto dump = dumpHosts(gKVStore); for (auto& row : dump) { LOG(INFO) << "host=" << row; } - std::string sFrom{"0.0.0.0"}; + std::string sFrom{"0"}; std::string sTo{"66.66.66.66"}; - std::vector beforeUpdate{sFrom}; - std::vector afterUpdate{sTo}; + std::set beforeUpdate(gHosts); + std::set afterUpdate(gHosts); + afterUpdate.erase(sFrom); + afterUpdate.insert(sTo); { // no [from] static const char* tmp = "http://127.0.0.1:%d/replace?to=%s"; auto url = folly::stringPrintf(tmp, FLAGS_ws_http_port, sTo.c_str()); silentCurl(url); - auto result = dumpKVStore(gKVStore); - EXPECT_TRUE(result == beforeUpdate); + auto result = dumpHosts(gKVStore); + EXPECT_EQ(result, beforeUpdate); } { @@ -96,37 +112,17 @@ TEST(MetaHttpReplaceHandlerTest, FooTest) { static const char* tmp = "http://127.0.0.1:%d/replace?&from=%s"; auto url = folly::stringPrintf(tmp, FLAGS_ws_http_port, sFrom.c_str()); silentCurl(url); - auto result = dumpKVStore(gKVStore); - EXPECT_TRUE(result == beforeUpdate); - } - - { - // invalid [from] - std::string invalidAddr = "10.10.10"; - const char* tmp = "http://127.0.0.1:%d/replace?from=%s&to=%s"; - auto url = folly::stringPrintf(tmp, FLAGS_ws_http_port, invalidAddr.c_str(), sTo.c_str()); - silentCurl(url); - auto result = dumpKVStore(gKVStore); - EXPECT_TRUE(result == beforeUpdate); - } - - { - // invalid [to] - std::string invalidAddr = "10.10.10"; - const char* tmp = "http://127.0.0.1:%d/replace?from=%s&to=%s"; - auto url = folly::stringPrintf(tmp, FLAGS_ws_http_port, sFrom.c_str(), invalidAddr.c_str()); - silentCurl(url); - auto result = dumpKVStore(gKVStore); - EXPECT_TRUE(result == beforeUpdate); + auto result = dumpHosts(gKVStore); + EXPECT_EQ(result, beforeUpdate); } { // valid [from] but not exist - std::string notExistFrom = "1.1.1.1"; + std::string notExistFrom = "10.10.10.10"; const char* tmp = "http://127.0.0.1:%d/replace?from=%s&to=%s"; auto url = folly::stringPrintf(tmp, FLAGS_ws_http_port, notExistFrom.c_str(), sTo.c_str()); silentCurl(url); - auto result = dumpKVStore(gKVStore); + auto result = dumpHosts(gKVStore); EXPECT_EQ(result, beforeUpdate); LOG(INFO) << "valid [from] but not exist"; for (auto& r : result) { @@ -139,15 +135,13 @@ TEST(MetaHttpReplaceHandlerTest, FooTest) { static const char* tmp = "http://127.0.0.1:%d/replace?from=%s&to=%s"; auto url = folly::stringPrintf(tmp, FLAGS_ws_http_port, sFrom.c_str(), sTo.c_str()); silentCurl(url); - auto result = dumpKVStore(gKVStore); + auto result = dumpHosts(gKVStore); EXPECT_EQ(result, afterUpdate); } } -std::vector dumpKVStore(kvstore::KVStore* kvstore) { - LOG(INFO) << __func__ << " enter"; - std::vector ret; - // Get all spaces +std::set dumpHosts(kvstore::KVStore* kvstore) { + // Get all hosts from all partition std::vector allSpaceId; const auto& spacePrefix = MetaKeyUtils::spacePrefix(); std::unique_ptr iter; @@ -159,21 +153,33 @@ std::vector dumpKVStore(kvstore::KVStore* kvstore) { iter->next(); } - LOG(INFO) << "allSpaceId.size()=" << allSpaceId.size(); + std::set hosts; for (const auto& spaceId : allSpaceId) { const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId); kvRet = kvstore->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); EXPECT_EQ(kvRet, nebula::cpp2::ErrorCode::SUCCEEDED); while (iter->valid()) { - auto hostAddrs = MetaKeyUtils::parsePartVal(iter->val()); - LOG(INFO) << "hostAddrs.size()=" << hostAddrs.size(); - for (auto& hostAddr : hostAddrs) { - ret.emplace_back(hostAddr.host); + auto addrs = MetaKeyUtils::parsePartVal(iter->val()); + for (auto& addr : addrs) { + hosts.insert(addr.host); } iter->next(); } } - return ret; + + // Get all hosts from all zone + const auto& zonePrefix = MetaKeyUtils::zonePrefix(); + kvRet = kvstore->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &iter); + EXPECT_EQ(kvRet, nebula::cpp2::ErrorCode::SUCCEEDED); + while (iter->valid()) { + auto addrs = MetaKeyUtils::parseZoneHosts(iter->val()); + for (auto& addr : addrs) { + hosts.insert(addr.host); + } + iter->next(); + } + + return hosts; } } // namespace meta diff --git a/src/meta/processors/admin/RestoreProcessor.cpp b/src/meta/processors/admin/RestoreProcessor.cpp index 1c64df3c8d8..264d9315e0e 100644 --- a/src/meta/processors/admin/RestoreProcessor.cpp +++ b/src/meta/processors/admin/RestoreProcessor.cpp @@ -99,7 +99,6 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4 bool needUpdate = false; auto zoneName = MetaKeyUtils::parseZoneName(iter->key()); auto hosts = MetaKeyUtils::parseZoneHosts(iter->val()); - std::vector DesHosts; for (auto& host : hosts) { if (host == ipv4From) { needUpdate = true; diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 8e08afdfd89..02db28347c3 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -86,7 +86,7 @@ class TestUtils { // Record machine information std::vector machines; for (auto& host : hosts) { - VLOG(3) << "Registe machine: " << host; + VLOG(3) << "Register machine: " << host; machines.emplace_back(nebula::MetaKeyUtils::machineKey(host.host, host.port), ""); } folly::Baton baton;