Skip to content

Commit

Permalink
[fix] when two same requests come, 2nd may erase prime even if it doe…
Browse files Browse the repository at this point in the history
…sn't set that
  • Loading branch information
liuyu85cn committed Sep 14, 2021
1 parent 8d9b613 commit 730a39d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
22 changes: 15 additions & 7 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::prepareLocal() {
}

auto [pro, fut] = folly::makePromiseContract<Code>();
auto primes = makePrime();
for (auto& kv : primes) {
LOG(INFO) << this << " put prime " << folly::hexlify(kv.first);
}

erasePrime();
env_->kvstore_->asyncMultiPut(
spaceId_, localPartId_, makePrime(), [p = std::move(pro)](auto rc) mutable {
spaceId_, localPartId_, std::move(primes), [p = std::move(pro)](auto rc) mutable {
LOG_IF(WARNING, rc != nebula::cpp2::ErrorCode::SUCCEEDED)
<< "kvstore err: " << static_cast<int>(rc);
p.setValue(rc);
Expand Down Expand Up @@ -73,14 +79,16 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processLocal(Code code) {
markDanglingEdge();
}

erasePrime();
if (code == Code::SUCCEEDED || code == Code::E_RPC_FAILURE) {
return forwardToDelegateProcessor();
} else {
return abort();
}
auto ret = code == Code::E_RPC_FAILURE ? Code::SUCCEEDED : code;
return ret;

if (code == Code::E_RPC_FAILURE) {
return Code::SUCCEEDED;
}
return code;
}

void ChainAddEdgesProcessorLocal::markDanglingEdge() {
Expand All @@ -96,7 +104,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re
req_ = req;
spaceId_ = req_.get_space_id();
localPartId_ = req.get_parts().begin()->first;
replaceNullWithDefaultValue(req_);
// replaceNullWithDefaultValue(req_);
auto part = env_->kvstore_->part(spaceId_, localPartId_);
if (!nebula::ok(part)) {
pushResultCode(nebula::error(part), localPartId_);
Expand Down Expand Up @@ -239,7 +247,7 @@ std::vector<kvstore::KV> ChainAddEdgesProcessorLocal::makeDoublePrime() {
void ChainAddEdgesProcessorLocal::erasePrime() {
auto fn = [&](const cpp2::NewEdge& edge) {
auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, edge.get_key());
LOG(INFO) << this << "prime hex: " << folly::hexlify(key);
// LOG(INFO) << this << " erase prime hex: " << folly::hexlify(key);
return key;
};
for (auto& edge : req_.get_parts().begin()->second) {
Expand All @@ -250,7 +258,7 @@ void ChainAddEdgesProcessorLocal::erasePrime() {
void ChainAddEdgesProcessorLocal::eraseDoublePrime() {
auto fn = [&](const cpp2::NewEdge& edge) {
auto key = ConsistUtil::doublePrime(spaceVidLen_, localPartId_, edge.get_key());
LOG(INFO) << this << "double prime hex: " << folly::hexlify(key);
// LOG(INFO) << this << " erase double prime hex: " << folly::hexlify(key);
return key;
};
for (auto& edge : req_.get_parts().begin()->second) {
Expand Down
8 changes: 5 additions & 3 deletions src/storage/transaction/ChainResumeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@ void ChainResumeProcessor::process() {
LOG(INFO) << "no leader found, skip any resume process";
return;
}
LOG(INFO) << "ChainResumeProcessor::process(), leaders.size() = " << leaders.size();
// LOG(INFO) << "ChainResumeProcessor::process(), leaders.size() = " << leaders.size();
std::unique_ptr<kvstore::KVIterator> iter;
for (auto& leader : leaders) {
auto spaceId = leader.first;
LOG(INFO) << "leader.second.size() = " << leader.second.size() << ", spaceId = " << spaceId;
// LOG(INFO) << "leader.second.size() = " << leader.second.size() << ", spaceId = " << spaceId;
for (auto& partInfo : leader.second) {
auto partId = partInfo.get_part_id();
auto prefix = ConsistUtil::primePrefix(partId);
LOG(INFO) << "scanning prefix = " << folly::hexlify(prefix);
// LOG(INFO) << "scanning prefix = " << folly::hexlify(prefix);
auto rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
for (; iter->valid(); iter->next()) {
LOG(INFO) << "resume prime key: " << folly::hexlify(iter->key());
ResumeOptions opt(ResumeType::RESUME_CHAIN, iter->val().str());
auto* proc = ChainProcessorFactory::makeProcessor(env_, opt);
futs.emplace_back(proc->getFinished());
Expand All @@ -47,6 +48,7 @@ void ChainResumeProcessor::process() {
break;
}
for (; iter->valid(); iter->next()) {
LOG(INFO) << "resume double prime key: " << folly::hexlify(iter->key());
ResumeOptions opt(ResumeType::RESUME_REMOTE, iter->val().str());
auto* proc = ChainProcessorFactory::makeProcessor(env_, opt);
futs.emplace_back(proc->getFinished());
Expand Down

0 comments on commit 730a39d

Please sign in to comment.