Skip to content

Commit

Permalink
Fix the error when vertex/edge not exist if update. (#2025)
Browse files Browse the repository at this point in the history
* Fix the error when vertex/edge not exist if update.

* Optimize the atomic error logical.

* Fix one alignment.

Co-authored-by: dutor <440396+dutor@users.noreply.github.com>
  • Loading branch information
Shylock-Hg and dutor authored Apr 2, 2020
1 parent 2a46bc4 commit 9fc4e7f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 90 deletions.
2 changes: 1 addition & 1 deletion src/graph/UpdateEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ void UpdateEdgeExecutor::updateEdge(bool reversely) {
return;
default:
std::string errMsg =
folly::stringPrintf("Maybe edge does not exist or filter failed, "
folly::stringPrintf("Maybe edge does not exist, "
"part: %d, error code: %d!",
code.get_part_id(),
static_cast<int32_t>(code.get_code()));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/UpdateVertexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void UpdateVertexExecutor::execute() {
break;
default:
std::string errMsg =
folly::stringPrintf("Maybe vertex does not exist or filter failed, "
folly::stringPrintf("Maybe vertex does not exist, "
"part: %d, error code: %d!",
code.get_part_id(),
static_cast<int32_t>(code.get_code()));
Expand Down
7 changes: 0 additions & 7 deletions src/storage/CommonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,6 @@ bool checkDataExpiredForTTL(const meta::SchemaProviderIf* schema,
int64_t ttlDuration);


enum class FilterResult {
SUCCEEDED = 0, // pass filter
E_FILTER_OUT = -1, // filter out
E_ERROR = -2, // exception when filter
E_BAD_SCHEMA = -3, // Bad schema
};


} // namespace storage
} // namespace nebula
Expand Down
56 changes: 17 additions & 39 deletions src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,23 +328,20 @@ std::string UpdateEdgeProcessor::updateAndWriteBack(PartitionID partId,
}


FilterResult UpdateEdgeProcessor::checkFilter(const PartitionID partId,
const cpp2::EdgeKey& edgeKey) {
cpp2::ErrorCode UpdateEdgeProcessor::checkFilter(const PartitionID partId,
const cpp2::EdgeKey& edgeKey) {
auto ret = collectEdgesProps(partId, edgeKey);
switch (ret) {
case kvstore::ResultCode::SUCCEEDED:
break;
case kvstore::ResultCode::ERR_CORRUPT_DATA:
return FilterResult::E_BAD_SCHEMA;
default:
return FilterResult::E_ERROR;
if (ret == kvstore::ResultCode::ERR_CORRUPT_DATA) {
return cpp2::ErrorCode::E_EDGE_NOT_FOUND;
} else if (ret != kvstore::ResultCode::SUCCEEDED) {
return to(ret);
}
for (auto& tc : this->tagContexts_) {
VLOG(3) << "partId " << partId << ", vId " << edgeKey.src
<< ", tagId " << tc.tagId_ << ", prop size " << tc.props_.size();
ret = collectVertexProps(partId, edgeKey.src, tc.tagId_, tc.props_);
if (ret != kvstore::ResultCode::SUCCEEDED) {
return FilterResult::E_ERROR;
return to(ret);
}
}

Expand Down Expand Up @@ -379,14 +376,14 @@ FilterResult UpdateEdgeProcessor::checkFilter(const PartitionID partId,
auto filterResult = this->exp_->eval(getters);
if (!filterResult.ok()) {
VLOG(1) << "Invalid filter expression";
return FilterResult::E_ERROR;
return cpp2::ErrorCode::E_INVALID_FILTER;
}
if (!Expression::asBool(filterResult.value())) {
VLOG(1) << "Filter skips the update";
return FilterResult::E_FILTER_OUT;
return cpp2::ErrorCode::E_FILTER_OUT;
}
}
return FilterResult::SUCCEEDED;
return cpp2::ErrorCode::SUCCEEDED;
}


Expand Down Expand Up @@ -488,18 +485,11 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
// TODO(shylock) the AtomicOP can't return various error
// so put it in the processor
filterResult_ = checkFilter(partId, edgeKey);
switch (filterResult_) {
case FilterResult::SUCCEEDED : {
if (filterResult_ == cpp2::ErrorCode::SUCCEEDED) {
return updateAndWriteBack(partId, edgeKey);
}
case FilterResult::E_FILTER_OUT:
// fallthrough
case FilterResult::E_ERROR:
// fallthrough
default: {
} else {
return folly::none;
}
}
},
[this, partId, edgeKey, req] (kvstore::ResultCode code) {
while (true) {
Expand All @@ -518,24 +508,12 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
break;
}
if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED) {
switch (filterResult_) {
case FilterResult::E_FILTER_OUT:
// Filter out
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
onProcessFinished(req.get_return_columns().size());
this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId);
break;
case FilterResult::E_ERROR:
this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId);
break;
case FilterResult::E_BAD_SCHEMA:
this->pushResultCode(cpp2::ErrorCode::E_EDGE_NOT_FOUND, partId);
break;
default:
this->pushResultCode(to(code), partId);
break;
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
if (filterResult_ == cpp2::ErrorCode::E_FILTER_OUT) {
onProcessFinished(req.get_return_columns().size());
}
this->pushResultCode(filterResult_, partId);
} else {
this->pushResultCode(to(code), partId);
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/mutate/UpdateEdgeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class UpdateEdgeProcessor
kvstore::ResultCode collectEdgesProps(const PartitionID partId,
const cpp2::EdgeKey& edgeKey);

FilterResult checkFilter(const PartitionID partId, const cpp2::EdgeKey& edgeKey);
cpp2::ErrorCode checkFilter(const PartitionID partId, const cpp2::EdgeKey& edgeKey);

std::string updateAndWriteBack(PartitionID partId, const cpp2::EdgeKey& edgeKey);

Expand All @@ -68,7 +68,7 @@ class UpdateEdgeProcessor
std::unique_ptr<RowUpdater> updater_;
meta::IndexManager* indexMan_{nullptr};
std::vector<std::shared_ptr<nebula::cpp2::IndexItem>> indexes_;
std::atomic<FilterResult> filterResult_{FilterResult::E_ERROR};
std::atomic<cpp2::ErrorCode> filterResult_{cpp2::ErrorCode::SUCCEEDED};
};

} // namespace storage
Expand Down
54 changes: 16 additions & 38 deletions src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,15 @@ kvstore::ResultCode UpdateVertexProcessor::collectVertexProps(
}


FilterResult UpdateVertexProcessor::checkFilter(const PartitionID partId, const VertexID vId) {
cpp2::ErrorCode UpdateVertexProcessor::checkFilter(const PartitionID partId, const VertexID vId) {
for (auto& tc : this->tagContexts_) {
VLOG(3) << "partId " << partId << ", vId " << vId
<< ", tagId " << tc.tagId_ << ", prop size " << tc.props_.size();
auto ret = collectVertexProps(partId, vId, tc.tagId_, tc.props_);
switch (ret) {
case kvstore::ResultCode::SUCCEEDED:
break;
case kvstore::ResultCode::ERR_CORRUPT_DATA:
return FilterResult::E_BAD_SCHEMA;
default:
return FilterResult::E_ERROR;
if (ret == kvstore::ResultCode::ERR_CORRUPT_DATA) {
return cpp2::ErrorCode::E_TAG_NOT_FOUND;
} else if (ret != kvstore::ResultCode::SUCCEEDED) {
return to(ret);
}
}

Expand All @@ -194,14 +191,14 @@ FilterResult UpdateVertexProcessor::checkFilter(const PartitionID partId, const
if (this->exp_ != nullptr) {
auto filterResult = this->exp_->eval(getters);
if (!filterResult.ok()) {
return FilterResult::E_ERROR;
return cpp2::ErrorCode::E_INVALID_FILTER;
}
if (!Expression::asBool(filterResult.value())) {
VLOG(1) << "Filter skips the update";
return FilterResult::E_FILTER_OUT;
return cpp2::ErrorCode::E_FILTER_OUT;
}
}
return FilterResult::SUCCEEDED;
return cpp2::ErrorCode::SUCCEEDED;
}


Expand Down Expand Up @@ -428,19 +425,11 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
// TODO(shylock) the AtomicOP can't return various error
// so put it in the processor
filterResult_ = checkFilter(partId, vId);
switch (filterResult_) {
case FilterResult::SUCCEEDED : {
if (filterResult_ == cpp2::ErrorCode::SUCCEEDED) {
return updateAndWriteBack(partId, vId);
}
case FilterResult::E_FILTER_OUT:
// Fallthrough
case FilterResult::E_ERROR:
// Fallthrough
case FilterResult::E_BAD_SCHEMA:
default: {
} else {
return folly::none;
}
}
},
[this, partId, vId, req] (kvstore::ResultCode code) {
while (true) {
Expand All @@ -455,24 +444,13 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
break;
}
if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED) {
switch (filterResult_) {
case FilterResult::E_FILTER_OUT:
// Filter out
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
onProcessFinished(req.get_return_columns().size());
this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId);
break;
case FilterResult::E_ERROR:
this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId);
break;
case FilterResult::E_BAD_SCHEMA:
this->pushResultCode(cpp2::ErrorCode::E_TAG_NOT_FOUND, partId);
break;
default:
this->pushResultCode(to(code), partId);
break;
if (filterResult_ == cpp2::ErrorCode::E_FILTER_OUT) {
// Filter out
// https://github.com/vesoft-inc/nebula/issues/1888
// Only filter out so we still return the data
onProcessFinished(req.get_return_columns().size());
}
this->pushResultCode(filterResult_, partId);
} else {
this->pushResultCode(to(code), partId);
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/mutate/UpdateVertexProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class UpdateVertexProcessor
const TagID tagId,
const std::vector<PropContext>& props);

FilterResult checkFilter(const PartitionID partId, const VertexID vId);
cpp2::ErrorCode checkFilter(const PartitionID partId, const VertexID vId);

std::string updateAndWriteBack(const PartitionID partId, const VertexID vId);

Expand All @@ -70,7 +70,7 @@ class UpdateVertexProcessor
std::unordered_map<TagID, std::unique_ptr<KeyUpdaterPair>> tagUpdaters_;
meta::IndexManager* indexMan_{nullptr};
std::vector<std::shared_ptr<nebula::cpp2::IndexItem>> indexes_;
std::atomic<FilterResult> filterResult_{FilterResult::E_ERROR};
std::atomic<cpp2::ErrorCode> filterResult_{cpp2::ErrorCode::SUCCEEDED};
};

} // namespace storage
Expand Down

0 comments on commit 9fc4e7f

Please sign in to comment.