From 730a39d786b8e835956c0a9232e1b758da7a72f3 Mon Sep 17 00:00:00 2001 From: liuyu <52276794+liuyu85cn@users.noreply.github.com> Date: Tue, 14 Sep 2021 16:04:14 +0800 Subject: [PATCH] [fix] when two same requests come, 2nd may erase prime even if it doesn't set that --- .../ChainAddEdgesProcessorLocal.cpp | 22 +++++++++++++------ .../transaction/ChainResumeProcessor.cpp | 8 ++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp index 79a0d5f1a2b..a5d7e79f188 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp @@ -39,8 +39,14 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::prepareLocal() { } auto [pro, fut] = folly::makePromiseContract(); + auto primes = makePrime(); + for (auto& kv : primes) { + LOG(INFO) << this << " put prime " << folly::hexlify(kv.first); + } + + erasePrime(); env_->kvstore_->asyncMultiPut( - spaceId_, localPartId_, makePrime(), [p = std::move(pro)](auto rc) mutable { + spaceId_, localPartId_, std::move(primes), [p = std::move(pro)](auto rc) mutable { LOG_IF(WARNING, rc != nebula::cpp2::ErrorCode::SUCCEEDED) << "kvstore err: " << static_cast(rc); p.setValue(rc); @@ -73,14 +79,16 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::processLocal(Code code) { markDanglingEdge(); } - erasePrime(); if (code == Code::SUCCEEDED || code == Code::E_RPC_FAILURE) { return forwardToDelegateProcessor(); } else { return abort(); } - auto ret = code == Code::E_RPC_FAILURE ? Code::SUCCEEDED : code; - return ret; + + if (code == Code::E_RPC_FAILURE) { + return Code::SUCCEEDED; + } + return code; } void ChainAddEdgesProcessorLocal::markDanglingEdge() { @@ -96,7 +104,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re req_ = req; spaceId_ = req_.get_space_id(); localPartId_ = req.get_parts().begin()->first; - replaceNullWithDefaultValue(req_); + // replaceNullWithDefaultValue(req_); auto part = env_->kvstore_->part(spaceId_, localPartId_); if (!nebula::ok(part)) { pushResultCode(nebula::error(part), localPartId_); @@ -239,7 +247,7 @@ std::vector ChainAddEdgesProcessorLocal::makeDoublePrime() { void ChainAddEdgesProcessorLocal::erasePrime() { auto fn = [&](const cpp2::NewEdge& edge) { auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, edge.get_key()); - LOG(INFO) << this << "prime hex: " << folly::hexlify(key); + // LOG(INFO) << this << " erase prime hex: " << folly::hexlify(key); return key; }; for (auto& edge : req_.get_parts().begin()->second) { @@ -250,7 +258,7 @@ void ChainAddEdgesProcessorLocal::erasePrime() { void ChainAddEdgesProcessorLocal::eraseDoublePrime() { auto fn = [&](const cpp2::NewEdge& edge) { auto key = ConsistUtil::doublePrime(spaceVidLen_, localPartId_, edge.get_key()); - LOG(INFO) << this << "double prime hex: " << folly::hexlify(key); + // LOG(INFO) << this << " erase double prime hex: " << folly::hexlify(key); return key; }; for (auto& edge : req_.get_parts().begin()->second) { diff --git a/src/storage/transaction/ChainResumeProcessor.cpp b/src/storage/transaction/ChainResumeProcessor.cpp index e2922438f77..1d5deed7f50 100644 --- a/src/storage/transaction/ChainResumeProcessor.cpp +++ b/src/storage/transaction/ChainResumeProcessor.cpp @@ -21,20 +21,21 @@ void ChainResumeProcessor::process() { LOG(INFO) << "no leader found, skip any resume process"; return; } - LOG(INFO) << "ChainResumeProcessor::process(), leaders.size() = " << leaders.size(); + // LOG(INFO) << "ChainResumeProcessor::process(), leaders.size() = " << leaders.size(); std::unique_ptr iter; for (auto& leader : leaders) { auto spaceId = leader.first; - LOG(INFO) << "leader.second.size() = " << leader.second.size() << ", spaceId = " << spaceId; + // LOG(INFO) << "leader.second.size() = " << leader.second.size() << ", spaceId = " << spaceId; for (auto& partInfo : leader.second) { auto partId = partInfo.get_part_id(); auto prefix = ConsistUtil::primePrefix(partId); - LOG(INFO) << "scanning prefix = " << folly::hexlify(prefix); + // LOG(INFO) << "scanning prefix = " << folly::hexlify(prefix); auto rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { break; } for (; iter->valid(); iter->next()) { + LOG(INFO) << "resume prime key: " << folly::hexlify(iter->key()); ResumeOptions opt(ResumeType::RESUME_CHAIN, iter->val().str()); auto* proc = ChainProcessorFactory::makeProcessor(env_, opt); futs.emplace_back(proc->getFinished()); @@ -47,6 +48,7 @@ void ChainResumeProcessor::process() { break; } for (; iter->valid(); iter->next()) { + LOG(INFO) << "resume double prime key: " << folly::hexlify(iter->key()); ResumeOptions opt(ResumeType::RESUME_REMOTE, iter->val().str()); auto* proc = ChainProcessorFactory::makeProcessor(env_, opt); futs.emplace_back(proc->getFinished());