Skip to content

Commit

Permalink
Cherry pick v3.1.0 (0408-0411) (#4130)
Browse files Browse the repository at this point in the history
* Fix group by outputs. (#4128)

* Fix group by outputs.

* Add test.

* Fix format.

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* remove the conception of zone in all place (#4119)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* default add hosts for standalone version (#4129)

Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com>
Co-authored-by: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com>
Co-authored-by: Alex Xing <90179377+SuperYoko@users.noreply.github.com>
  • Loading branch information
4 people authored Apr 11, 2022
1 parent 6db2cb6 commit 50159ab
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 106 deletions.
4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,9 @@ Status MetaClient::handleResponse(const RESP& resp) {
case nebula::cpp2::ErrorCode::E_WRONGCLUSTER:
return Status::Error("Wrong cluster!");
case nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH:
return Status::Error("Zone not enough!");
return Status::Error("Host not enough!");
case nebula::cpp2::ErrorCode::E_ZONE_IS_EMPTY:
return Status::Error("Zone is empty!");
return Status::Error("Host not exist!");
case nebula::cpp2::ErrorCode::E_STORE_FAILURE:
return Status::Error("Store failure!");
case nebula::cpp2::ErrorCode::E_BAD_BALANCE_PLAN:
Expand Down
4 changes: 0 additions & 4 deletions src/graph/executor/admin/SpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ folly::Future<Status> DescSpaceExecutor::execute() {
"Collate",
"Vid Type",
"Atomic Edge",
"Zones",
"Comment"};
Row row;
row.values.emplace_back(spaceId);
Expand All @@ -94,9 +93,6 @@ folly::Future<Status> DescSpaceExecutor::execute() {
}
row.values.emplace_back(sAtomicEdge);

auto zoneNames = folly::join(",", properties.get_zone_names());
row.values.emplace_back(zoneNames);

if (properties.comment_ref().has_value()) {
row.values.emplace_back(*properties.comment_ref());
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/graph/validator/GroupByValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ Status GroupByValidator::validateImpl() {
NG_RETURN_IF_ERROR(validateYield(groupBySentence->yieldClause()));
NG_RETURN_IF_ERROR(groupClauseSemanticCheck());

for (auto* col : groupBySentence->yieldClause()->columns()) {
auto type = deduceExprType(col->expr());
outputs_.emplace_back(col->name(), std::move(type).value());
}

return Status::OK();
}

Expand Down Expand Up @@ -155,7 +160,6 @@ Status GroupByValidator::groupClauseSemanticCheck() {
for (auto i = 0u; i < groupItems_.size(); ++i) {
auto type = deduceExprType(groupItems_[i]);
NG_RETURN_IF_ERROR(type);
outputs_.emplace_back(aggOutputColNames_[i], std::move(type).value());
}
// check exprProps
if (!exprProps_.srcTagProps().empty() || !exprProps_.dstTagProps().empty()) {
Expand Down
18 changes: 8 additions & 10 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::prepare() {
for (size_t i = 0; i < paras_.size(); i++) {
lostZones_.emplace_back(paras_[i]);
}
std::vector<std::string> newZones;
newZones.reserve(lostZones_.size());
for (auto& name : lostZones_) {
auto host = HostAddr::fromString(name);
newZones.emplace_back(folly::stringPrintf("default_zone_%s_%d", host.host.c_str(), host.port));
}
lostZones_.swap(newZones);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand All @@ -49,16 +56,7 @@ folly::Future<Status> ZoneBalanceJobExecutor::executeInternal() {
return status;
}
}
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) {
if (status == meta::cpp2::JobStatus::FINISHED) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
nebula::cpp2::ErrorCode ret = updateMeta();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
status = meta::cpp2::JobStatus::FAILED;
}
}
executorOnFinished_(status);
});
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { executorOnFinished_(status); });
plan_->invoke();
return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions src/meta/processors/zone/AddHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ void AddHostsProcessor::process(const cpp2::AddHostsReq& req) {

std::vector<kvstore::KV> data;
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
std::map<GraphSpaceID, meta::cpp2::SpaceDesc> spaceMap;
std::string spacePrefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> spaceIter;
auto spaceRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, spacePrefix, &spaceIter);
if (spaceRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(spaceRet);
onFinished();
return;
}
while (spaceIter->valid()) {
spaceMap.emplace(MetaKeyUtils::spaceId(spaceIter->key()),
MetaKeyUtils::parseSpace(spaceIter->val()));
spaceIter->next();
}
for (auto& host : hosts) {
// Ensure that the node is not registered.
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
Expand All @@ -56,6 +70,16 @@ void AddHostsProcessor::process(const cpp2::AddHostsReq& req) {
auto zoneVal = MetaKeyUtils::zoneVal({host});
data.emplace_back(std::move(machineKey), "");
data.emplace_back(std::move(zoneKey), std::move(zoneVal));
for (auto& [spaceId, properties] : spaceMap) {
const std::vector<std::string>& curZones = properties.get_zone_names();
std::set<std::string> zm(curZones.begin(), curZones.end());
zm.emplace(zoneName);
std::vector<std::string> newZones(zm.begin(), zm.end());
properties.zone_names_ref() = std::move(newZones);
}
}
for (auto& [spaceId, properties] : spaceMap) {
data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
15 changes: 13 additions & 2 deletions src/meta/processors/zone/DropHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
auto spaceIterRet = doPrefix(spacePrefix);
auto spaceIter = nebula::value(spaceIterRet).get();
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
std::map<GraphSpaceID, meta::cpp2::SpaceDesc> spaceMap;
while (spaceIter->valid()) {
auto spaceId = MetaKeyUtils::spaceId(spaceIter->key());
auto spaceKey = MetaKeyUtils::spaceKey(spaceId);
Expand All @@ -44,7 +45,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
<< " error: " << apache::thrift::util::enumNameSafe(code);
break;
}

spaceMap.emplace(spaceId, MetaKeyUtils::parseSpace(spaceIter->val()));
const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId);
auto partIterRet = doPrefix(partPrefix);
auto partIter = nebula::value(partIterRet).get();
Expand Down Expand Up @@ -96,6 +97,14 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
}

holder->remove(MetaKeyUtils::zoneKey(zoneName));

for (auto& [spaceId, properties] : spaceMap) {
const std::vector<std::string>& curZones = properties.get_zone_names();
std::set<std::string> zm(curZones.begin(), curZones.end());
zm.erase(zoneName);
std::vector<std::string> newZones(zm.begin(), zm.end());
properties.zone_names_ref() = std::move(newZones);
}
} else {
// Delete some hosts in the zone
for (auto& h : hosts) {
Expand All @@ -112,7 +121,9 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
CHECK_CODE_AND_BREAK();
iter->next();
}

for (auto& [spaceId, properties] : spaceMap) {
holder->put(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(code);
onFinished();
Expand Down
1 change: 0 additions & 1 deletion src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,6 @@ TEST(BalanceTest, NormalZoneTest) {
ret = balancer.executeInternal();
baton.wait();
EXPECT_EQ(Status::OK(), ret.value());
verifyMetaZone(kv, balancer.spaceInfo_.spaceId_, {"1", "2", "3", "4"});
verifyBalanceTask(
kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 12);
}
Expand Down
61 changes: 14 additions & 47 deletions src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -3326,36 +3326,20 @@ admin_job_sentence
meta::cpp2::JobType::LEADER_BALANCE);
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_IN KW_ZONE {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_IN KW_ZONE KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
HostList* hl = $7;
std::vector<HostAddr> has = hl->hosts();
for (HostAddr& ha: has) {
sentence->addPara(ha.toString());
}
delete hl;
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_ACROSS KW_ZONE {
| KW_SUBMIT KW_JOB KW_BALANCE KW_DATA {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_ACROSS KW_ZONE KW_REMOVE zone_name_list {
| KW_SUBMIT KW_JOB KW_BALANCE KW_DATA KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
ZoneNameList* nl = $7;
std::vector<std::string> names = nl->zoneNames();
for (std::string& name: names) {
sentence->addPara(name);
HostList* hl = $6;
std::vector<HostAddr> hosts = hl->hosts();
for (HostAddr& host: hosts) {
sentence->addPara(host.toString());
}
delete nl;
delete hl;
$$ = sentence;
}
;
Expand Down Expand Up @@ -3794,37 +3778,20 @@ balance_sentence
meta::cpp2::JobType::LEADER_BALANCE);
$$ = sentence;
}
|
KW_BALANCE KW_IN KW_ZONE {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
$$ = sentence;
}
| KW_BALANCE KW_IN KW_ZONE KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
HostList* hl = $5;
std::vector<HostAddr> has = hl->hosts();
for (HostAddr& ha: has) {
sentence->addPara(ha.toString());
}
delete hl;
$$ = sentence;
}
| KW_BALANCE KW_ACROSS KW_ZONE {
| KW_BALANCE KW_DATA {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
$$ = sentence;
}
| KW_BALANCE KW_ACROSS KW_ZONE KW_REMOVE zone_name_list {
| KW_BALANCE KW_DATA KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
ZoneNameList* nl = $5;
std::vector<std::string> names = nl->zoneNames();
for (std::string& name: names) {
sentence->addPara(name);
HostList* hl = $4;
std::vector<HostAddr> hosts = hl->hosts();
for (HostAddr& host: hosts) {
sentence->addPara(host.toString());
}
delete nl;
delete hl;
$$ = sentence;
}
;
Expand Down
33 changes: 0 additions & 33 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2009,34 +2009,6 @@ TEST_F(ParserTest, ConfigOperation) {
}
}

TEST_F(ParserTest, BalanceOperation) {
{
std::string query = "BALANCE LEADER";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE IN ZONE";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE ACROSS ZONE";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE IN ZONE REMOVE 192.168.0.1:50000,192.168.0.1:50001";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE IN ZONE REMOVE 192.168.0.1:50000,\"localhost\":50001";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
}

TEST_F(ParserTest, CrashByFuzzer) {
{
std::string query = ";YIELD\nI41( ,1)GEGE.INGEST";
Expand Down Expand Up @@ -3227,11 +3199,6 @@ TEST_F(ParserTest, JobTest) {
checkTest("SUBMIT JOB INGEST", "SUBMIT JOB INGEST");

checkTest("SUBMIT JOB STATS", "SUBMIT JOB STATS");
checkTest("SUBMIT JOB BALANCE IN ZONE", "SUBMIT JOB BALANCE IN ZONE");
checkTest(
"SUBMIT JOB BALANCE IN ZONE REMOVE 192.168.0.1:50000, 192.168.0.1:50001, 192.168.0.1:50002",
"SUBMIT JOB BALANCE IN ZONE REMOVE \"192.168.0.1\":50000, \"192.168.0.1\":50001, "
"\"192.168.0.1\":50002");
checkTest("SUBMIT JOB BALANCE LEADER", "SUBMIT JOB BALANCE LEADER");
checkTest("SHOW JOBS", "SHOW JOBS");
checkTest("SHOW JOB 111", "SHOW JOB 111");
Expand Down
24 changes: 24 additions & 0 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ DEFINE_bool(local_config, false, "meta client will not retrieve latest configura
DEFINE_int32(storage_port, 44501, "Storage daemon listening port");
DEFINE_int32(storage_num_worker_threads, 32, "Number of workers");
DECLARE_bool(local_config);
DEFINE_bool(add_local_host, true, "Whether add localhost automatically");
DECLARE_string(local_ip);
#endif
DEFINE_bool(storage_kv_mode, false, "True for kv mode");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
Expand Down Expand Up @@ -168,6 +170,28 @@ bool StorageServer::start() {
options.dataPaths_ = dataPaths_;

metaClient_ = std::make_unique<meta::MetaClient>(ioThreadPool_, metaAddrs_, options);

#ifdef BUILD_STANDALONE
if (FLAGS_add_local_host) {
std::vector<HostAddr> hosts = {{FLAGS_local_ip, FLAGS_storage_port}};
folly::Baton<> baton;
bool isAdded = false;
metaClient_->addHosts(hosts).thenValue([&isAdded, &baton](StatusOr<bool> resp) {
if (!resp.ok() || !resp.value()) {
LOG(ERROR) << "Add hosts for standalone failed.";
} else {
LOG(INFO) << "Add hosts for standalone succeed.";
isAdded = true;
}
baton.post();
});
baton.wait();
if (!isAdded) {
return false;
}
}
#endif

if (!metaClient_->waitForMetadReady()) {
LOG(ERROR) << "waitForMetadReady error!";
return false;
Expand Down
1 change: 1 addition & 0 deletions tests/common/nebula_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def _make_sa_params(self, **kwargs):
self.graphd_param['password_lock_time_in_secs'] = '10'
self.graphd_param['raft_heartbeat_interval_secs'] = '30'
self.graphd_param['skip_wait_in_rate_limiter'] = 'true'
self.graphd_param['add_local_host'] = 'false'
for p in [self.metad_param, self.storaged_param, self.graphd_param]:
p.update(kwargs)

Expand Down
15 changes: 15 additions & 0 deletions tests/tck/features/aggregate/Agg.feature
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,21 @@ Feature: Basic Aggregate and GroupBy
| sum |
| 6 |

Scenario: Reference the output of group by
When executing query:
"""
GO FROM "Tim Duncan" OVER * YIELD dst(edge) as dst, $$.player.age as age
| GROUP BY $-.dst YIELD (sum($-.age)+3) as age
| ORDER BY $-.age
"""
Then the result should be, in order:
| age |
| 3 |
| 34 |
| 36 |
| 75 |
| 85 |

Scenario: Error Check
When executing query:
"""
Expand Down
Loading

0 comments on commit 50159ab

Please sign in to comment.