Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: some streams errors such as pkpatternmatchdel etc #2726

Merged
merged 5 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/storage/src/base_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "src/base_value_format.h"
#include "src/base_meta_value_format.h"
#include "src/lists_meta_value_format.h"
#include "src/pika_stream_meta_value.h"
#include "src/strings_value_format.h"
#include "src/zsets_data_key_format.h"
#include "src/debug.h"
Expand Down Expand Up @@ -49,6 +50,13 @@ class BaseMetaFilter : public rocksdb::CompactionFilter {
DEBUG("Reserve");
return false;
}
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(value);
DEBUG("[ListMetaFilter], key: {}, entries_added = {}, first_id: {}, last_id: {}, version: {}",
key.ToString().c_str(), parsed_stream_meta_value.entries_added(),
parsed_stream_meta_value.first_id(), parsed_stream_meta_value.last_id(),
parsed_stream_meta_value.version());
return false;
} else if (type == DataType::kLists) {
ParsedListsMetaValue parsed_lists_meta_value(value);
DEBUG("[ListMetaFilter], key: {}, count = {}, timestamp: {}, cur_time: {}, version: {}", key.ToString().c_str(),
Expand Down Expand Up @@ -143,7 +151,12 @@ class BaseDataFilter : public rocksdb::CompactionFilter {
auto type = static_cast<enum DataType>(static_cast<uint8_t>(meta_value[0]));
if (type != type_) {
return true;
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kStreams || type == DataType::kZSets) {
} else if (type == DataType::kStreams) {
ParsedStreamMetaValue parsed_stream_meta_value(meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_stream_meta_value.version();
cur_meta_etime_ = 0; // stream do not support ttl
} else if (type == DataType::kHashes || type == DataType::kSets || type == DataType::kZSets) {
ParsedBaseMetaValue parsed_base_meta_value(&meta_value);
meta_not_found_ = false;
cur_meta_version_ = parsed_base_meta_value.Version();
Expand Down
15 changes: 9 additions & 6 deletions src/storage/src/pika_stream_meta_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class StreamMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamValueLength);
if (value_.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = &value_[0];
Expand Down Expand Up @@ -215,7 +216,8 @@ class ParsedStreamMetaValue {
ParsedStreamMetaValue(const Slice& value) {
assert(value.size() == kDefaultStreamValueLength);
if (value.size() != kDefaultStreamValueLength) {
LOG(ERROR) << "Invalid stream meta value length: ";
LOG(ERROR) << "Invalid stream meta value length: " << value.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
char* pos = const_cast<char*>(value.data());
Expand Down Expand Up @@ -294,7 +296,7 @@ class StreamCGroupMetaValue {
uint64_t needed = kDefaultStreamCGroupValueLength;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Init on a existed stream cgroup meta value!";
LOG(ERROR) << "Init on a existed stream cgroup meta value!";
return;
}
value_.resize(needed);
Expand All @@ -314,7 +316,8 @@ class StreamCGroupMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamCGroupValueLength);
if (value_.size() != kDefaultStreamCGroupValueLength) {
LOG(FATAL) << "Invalid stream cgroup meta value length: ";
LOG(ERROR) << "Invalid stream cgroup meta value length: " << value_.size()
<< " expected: " << kDefaultStreamValueLength;
return;
}
if (value_.size() == kDefaultStreamCGroupValueLength) {
Expand Down Expand Up @@ -373,7 +376,7 @@ class StreamConsumerMetaValue {
value_ = std::move(value);
assert(value_.size() == kDefaultStreamConsumerValueLength);
if (value_.size() != kDefaultStreamConsumerValueLength) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size()
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size()
<< " expected: " << kDefaultStreamConsumerValueLength;
return;
}
Expand All @@ -391,7 +394,7 @@ class StreamConsumerMetaValue {
pel_ = pel;
assert(value_.size() == 0);
if (value_.size() != 0) {
LOG(FATAL) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
LOG(ERROR) << "Invalid stream consumer meta value length: " << value_.size() << " expected: 0";
return;
}
uint64_t needed = kDefaultStreamConsumerValueLength;
Expand Down
7 changes: 3 additions & 4 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1589,19 +1589,19 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
iter->SeekToFirst();
key = iter->key().ToString();
while (iter->Valid()) {
auto meta_type = static_cast<enum DataType>(static_cast<uint8_t>(iter->value()[0]));
ParsedBaseMetaKey parsed_meta_key(iter->key().ToString());
key = iter->key().ToString();
meta_value = iter->value().ToString();

if (meta_type == DataType::kStrings) {
meta_value = iter->value().ToString();
ParsedStringsValue parsed_strings_value(&meta_value);
if (!parsed_strings_value.IsStale() &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) != 0)) {
batch.Delete(key);
}
} else if (meta_type == DataType::kLists) {
meta_value = iter->value().ToString();
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (!parsed_lists_meta_value.IsStale() && (parsed_lists_meta_value.Count() != 0U) &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) !=
Expand All @@ -1618,7 +1618,6 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
batch.Put(handles_[kMetaCF], key, stream_meta_value.value());
}
} else {
meta_value = iter->value().ToString();
ParsedBaseMetaValue parsed_meta_value(&meta_value);
if (!parsed_meta_value.IsStale() && (parsed_meta_value.Count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) !=
Expand Down
5 changes: 4 additions & 1 deletion src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1397,11 +1397,14 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start,

Status Storage::PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret) {
Status s;
*ret = 0;
for (const auto& inst : insts_) {
s = inst->PKPatternMatchDel(pattern, ret);
int32_t tmp_ret = 0;
s = inst->PKPatternMatchDel(pattern, &tmp_ret);
if (!s.ok()) {
return s;
}
*ret += tmp_ret;
}
return s;
}
Expand Down
Loading
Loading