Skip to content

Commit

Permalink
Merge branch 'master' into support-delete-vertex-without-edge
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs authored Nov 17, 2021
2 parents 643381f + b92c65e commit fd8d1bb
Show file tree
Hide file tree
Showing 85 changed files with 2,683 additions and 4,114 deletions.
51 changes: 0 additions & 51 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2579,57 +2579,6 @@ folly::Future<StatusOr<std::vector<cpp2::RoleItem>>> MetaClient::getUserRoles(st
return future;
}

folly::Future<StatusOr<int64_t>> MetaClient::balance(std::vector<HostAddr> hostDel,
bool isStop,
bool isReset) {
cpp2::BalanceReq req;
if (!hostDel.empty()) {
req.set_host_del(std::move(hostDel));
}
if (isStop) {
req.set_stop(isStop);
}
if (isReset) {
req.set_reset(isReset);
}

folly::Promise<StatusOr<int64_t>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_balance(request); },
[](cpp2::BalanceResp&& resp) -> int64_t { return resp.get_id(); },
std::move(promise));
return future;
}

folly::Future<StatusOr<std::vector<cpp2::BalanceTask>>> MetaClient::showBalance(int64_t balanceId) {
cpp2::BalanceReq req;
req.set_id(balanceId);
folly::Promise<StatusOr<std::vector<cpp2::BalanceTask>>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_balance(request); },
[](cpp2::BalanceResp&& resp) -> std::vector<cpp2::BalanceTask> { return resp.get_tasks(); },
std::move(promise));
return future;
}

folly::Future<StatusOr<bool>> MetaClient::balanceLeader() {
cpp2::LeaderBalanceReq req;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_leaderBalance(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<std::string>> MetaClient::getTagDefaultValue(GraphSpaceID spaceId,
TagID tagId,
const std::string& field) {
Expand Down
9 changes: 0 additions & 9 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,6 @@ class MetaClient {

folly::Future<StatusOr<std::vector<cpp2::RoleItem>>> getUserRoles(std::string account);

// Operations for admin
folly::Future<StatusOr<int64_t>> balance(std::vector<HostAddr> hostDel,
bool isStop,
bool isReset);

folly::Future<StatusOr<std::vector<cpp2::BalanceTask>>> showBalance(int64_t balanceId);

folly::Future<StatusOr<bool>> balanceLeader();

// Operations for config
folly::Future<StatusOr<bool>> regConfig(const std::vector<cpp2::ConfigItem>& items);

Expand Down
209 changes: 113 additions & 96 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
folly::EventBase* evb,
std::unordered_map<HostAddr, Request> requests,
RemoteFunc&& remoteFunc) {
using TransportException = apache::thrift::transport::TTransportException;
auto context = std::make_shared<ResponseContext<Request, RemoteFunc, Response>>(
requests.size(), std::move(remoteFunc));

Expand All @@ -137,49 +138,64 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
// Since all requests are sent using the same eventbase, all
// then-callback will be executed on the same IO thread
.via(evb)
.then([this, context, host, spaceId, start](folly::Try<Response>&& val) {
if (val.hasException()) {
auto& r = context->findRequest(host);
LOG(ERROR) << "Request to " << host << " failed: " << val.exception().what();
auto parts = getReqPartsId(r);
context->resp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
invalidLeader(spaceId, parts);
context->resp.markFailure();
} else {
auto resp = std::move(val.value());
auto& result = resp.get_result();
bool hasFailure{false};
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
hasFailure = true;
context->resp.emplaceFailedPart(code.get_part_id(), code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
.thenValue([this, context, host, spaceId, start](Response&& resp) {
auto& result = resp.get_result();
bool hasFailure{false};
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
hasFailure = true;
context->resp.emplaceFailedPart(code.get_part_id(), code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
// do nothing
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
} else {
// do nothing
}
if (hasFailure) {
context->resp.markFailure();
}

// Adjust the latency
auto latency = result.get_latency_in_us();
context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start);

// Keep the response
context->resp.addResponse(std::move(resp));
}
if (hasFailure) {
context->resp.markFailure();
}

// Adjust the latency
auto latency = result.get_latency_in_us();
context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start);

// Keep the response
context->resp.addResponse(std::move(resp));
})
.thenError(folly::tag_t<TransportException>{},
[this, context, host, spaceId](TransportException&& ex) {
auto& r = context->findRequest(host);
auto parts = getReqPartsId(r);
if (ex.getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex.what();
} else {
invalidLeader(spaceId, parts);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
}
context->resp.appendFailedParts(parts,
nebula::cpp2::ErrorCode::E_RPC_FAILURE);
context->resp.markFailure();
})
.thenError(folly::tag_t<std::exception>{},
[this, context, host, spaceId](std::exception&& ex) {
auto& r = context->findRequest(host);
auto parts = getReqPartsId(r);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
invalidLeader(spaceId, parts);
context->resp.appendFailedParts(parts,
nebula::cpp2::ErrorCode::E_RPC_FAILURE);
context->resp.markFailure();
})
.ensure([context, host] {
if (context->removeRequest(host)) {
// Received all responses
context->promise.setValue(std::move(context->resp));
Expand All @@ -199,75 +215,76 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
template <typename ClientType>
template <class Request, class RemoteFunc, class Response>
folly::Future<StatusOr<Response>> StorageClientBase<ClientType>::getResponse(
folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<StatusOr<Response>> pro) {
auto f = pro.getFuture();
getResponseImpl(evb,
std::forward<decltype(request)>(request),
std::forward<RemoteFunc>(remoteFunc),
std::move(pro));
folly::EventBase* evb, std::pair<HostAddr, Request>&& request, RemoteFunc&& remoteFunc) {
auto pro = std::make_shared<folly::Promise<StatusOr<Response>>>();
auto f = pro->getFuture();
getResponseImpl(
evb, std::forward<decltype(request)>(request), std::forward<RemoteFunc>(remoteFunc), pro);
return f;
}

template <typename ClientType>
template <class Request, class RemoteFunc, class Response>
void StorageClientBase<ClientType>::getResponseImpl(folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
folly::Promise<StatusOr<Response>> pro) {
void StorageClientBase<ClientType>::getResponseImpl(
folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
std::shared_ptr<folly::Promise<StatusOr<Response>>> pro) {
using TransportException = apache::thrift::transport::TTransportException;
if (evb == nullptr) {
DCHECK(!!ioThreadPool_);
evb = ioThreadPool_->getEventBase();
}
folly::via(evb,
[evb,
request = std::move(request),
remoteFunc = std::move(remoteFunc),
pro = std::move(pro),
this]() mutable {
auto host = request.first;
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partsId = getReqPartsId(request.second);
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.then([spaceId,
partsId = std::move(partsId),
p = std::move(pro),
request = std::move(request),
remoteFunc = std::move(remoteFunc),
this](folly::Try<Response>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
p.setValue(Status::Error(folly::stringPrintf(
"RPC failure in StorageClient: %s", t.exception().what().c_str())));
invalidLeader(spaceId, partsId);
return;
}
auto&& resp = std::move(t.value());
// leader changed
auto& result = resp.get_result();
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
folly::via(
evb,
[evb, request = std::move(request), remoteFunc = std::move(remoteFunc), pro, this]() mutable {
auto host = request.first;
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partsId = getReqPartsId(request.second);
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.thenValue([spaceId, pro, this](Response&& resp) mutable {
auto& result = resp.get_result();
for (auto& code : result.get_failed_parts()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code "
<< static_cast<int32_t>(code.get_code());
if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
auto* leader = code.get_leader();
if (isValidHostPtr(leader)) {
updateLeader(spaceId, code.get_part_id(), *leader);
} else {
invalidLeader(spaceId, code.get_part_id());
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
}
}
pro->setValue(std::move(resp));
})
.thenError(folly::tag_t<TransportException>{},
[spaceId, partsId = std::move(partsId), host, pro, this](
TransportException&& ex) mutable {
if (ex.getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex.what();
} else {
invalidLeader(spaceId, code.get_part_id());
invalidLeader(spaceId, partsId);
LOG(ERROR) << "Request to " << host << " failed: " << ex.what();
}
} else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND ||
code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) {
invalidLeader(spaceId, code.get_part_id());
}
}
p.setValue(std::move(resp));
});
}); // via
pro->setValue(Status::Error(
folly::stringPrintf("RPC failure in StorageClient: %s", ex.what())));
})
.thenError(folly::tag_t<std::exception>{},
[spaceId, partsId = std::move(partsId), host, pro, this](
std::exception&& ex) mutable {
// exception occurred during RPC
pro->setValue(Status::Error(
folly::stringPrintf("RPC failure in StorageClient: %s", ex.what())));
invalidLeader(spaceId, partsId);
});
}); // via
}

template <typename ClientType>
Expand Down
10 changes: 4 additions & 6 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,9 @@ class StorageClientBase {
class RemoteFunc,
class Response = typename std::result_of<RemoteFunc(ClientType* client,
const Request&)>::type::value_type>
folly::Future<StatusOr<Response>> getResponse(
folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<StatusOr<Response>> pro = folly::Promise<StatusOr<Response>>());
folly::Future<StatusOr<Response>> getResponse(folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc);

template <class Request,
class RemoteFunc,
Expand All @@ -154,7 +152,7 @@ class StorageClientBase {
void getResponseImpl(folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
folly::Promise<StatusOr<Response>> pro);
std::shared_ptr<folly::Promise<StatusOr<Response>>> pro);

// Cluster given ids into the host they belong to
// The method returns a map
Expand Down
6 changes: 6 additions & 0 deletions src/common/datatypes/HostAddr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ bool HostAddr::operator==(const HostAddr& rhs) const {

bool HostAddr::operator!=(const HostAddr& rhs) const { return !(*this == rhs); }

HostAddr& HostAddr::operator=(const HostAddr& rhs) {
host = rhs.host;
port = rhs.port;
return *this;
}

bool HostAddr::operator<(const HostAddr& rhs) const {
if (host == rhs.host) {
return port < rhs.port;
Expand Down
17 changes: 17 additions & 0 deletions src/common/datatypes/HostAddr.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <sstream>

#include "common/base/Logging.h"
#include "common/thrift/ThriftTypes.h"

namespace nebula {
Expand All @@ -25,6 +26,8 @@ struct HostAddr {
* */
HostAddr(int h, int p) = delete;
HostAddr(std::string h, Port p) : host(std::move(h)), port(p) {}
HostAddr(const HostAddr& other) : host(other.host), port(other.port) {}
HostAddr(HostAddr&& other) : host(std::move(other.host)), port(std::move(other.port)) {}

void clear() {
host.clear();
Expand All @@ -40,11 +43,25 @@ struct HostAddr {
return os.str();
}

HostAddr& operator=(const HostAddr& rhs);

bool operator==(const HostAddr& rhs) const;

bool operator!=(const HostAddr& rhs) const;

bool operator<(const HostAddr& rhs) const;

static HostAddr fromString(const std::string& str) {
HostAddr ha;
auto pos = str.find(":");
if (pos == std::string::npos) {
LOG(ERROR) << "HostAddr: parse string error";
return ha;
}
ha.host = str.substr(1, pos - 2);
ha.port = std::stoi(str.substr(pos + 1));
return ha;
}
};

inline std::ostream& operator<<(std::ostream& os, const HostAddr& addr) {
Expand Down
Loading

0 comments on commit fd8d1bb

Please sign in to comment.