Skip to content

Commit f6da70f

Browse files
committed
core: Add count min sketch to track sizes
Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
1 parent 1da4817 commit f6da70f

File tree

4 files changed

+401
-1
lines changed

4 files changed

+401
-1
lines changed

src/core/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ add_library(dfly_core allocation_tracker.cc bloom.cc compact_object.cc dense_set
2727
interpreter.cc glob_matcher.cc mi_memory_resource.cc qlist.cc sds_utils.cc
2828
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc
2929
tx_queue.cc string_set.cc string_map.cc top_keys.cc detail/bitpacking.cc
30-
page_usage_stats.cc)
30+
page_usage_stats.cc count_min_sketch.cc)
3131

3232
cxx_link(dfly_core base dfly_search_core fibers2 jsonpath
3333
absl::flat_hash_map absl::str_format absl::random_random redis_lib
@@ -56,6 +56,7 @@ cxx_test(qlist_test dfly_core DATA testdata/list.txt.zst LABELS DFLY)
5656
cxx_test(zstd_test dfly_core TRDP::zstd LABELS DFLY)
5757
cxx_test(top_keys_test dfly_core LABELS DFLY)
5858
cxx_test(page_usage_stats_test dfly_core LABELS DFLY)
59+
cxx_test(count_min_sketch_test dfly_core LABELS DFLY)
5960

6061
if(LIB_PCRE2)
6162
target_compile_definitions(dfly_core_test PRIVATE USE_PCRE2=1)

src/core/count_min_sketch.cc

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "core/count_min_sketch.h"
6+
7+
#include <absl/time/clock.h>
8+
#include <xxhash.h>
9+
10+
#include <cmath>
11+
#include <iostream>
12+
13+
namespace {
14+
15+
constexpr auto MAX = std::numeric_limits<dfly::CountMinSketch::SizeT>::max();
16+
17+
uint64_t GetCurrentMS() {
18+
return absl::GetCurrentTimeNanos() / 1000 / 1000;
19+
}
20+
21+
// Value halves every 5000 ms: ln(2) / 5000
22+
constexpr double EXP_DECAY_CONST = 0.000138629;
23+
24+
uint64_t ExponentialDecay(uint64_t value, int64_t time_delta) {
25+
return value * std::exp(-time_delta * EXP_DECAY_CONST);
26+
}
27+
28+
// Value decrements by one every 1000 ms
29+
constexpr double LIN_DECAY_CONST = 0.001;
30+
31+
uint64_t LinearDecay(uint64_t value, int64_t time_delta) {
32+
const double decay = time_delta * LIN_DECAY_CONST;
33+
return value - std::min(static_cast<double>(value), decay);
34+
}
35+
36+
uint64_t ApplyDecay(uint64_t value, int64_t time_delta, dfly::MultiSketch::Decay decay) {
37+
switch (decay) {
38+
case dfly::MultiSketch::Decay::Exponential:
39+
return ExponentialDecay(value, time_delta);
40+
case dfly::MultiSketch::Decay::Linear:
41+
return LinearDecay(value, time_delta);
42+
case dfly::MultiSketch::Decay::SlidingWindow:
43+
return value;
44+
}
45+
ABSL_UNREACHABLE();
46+
}
47+
48+
} // namespace
49+
50+
namespace dfly {
51+
52+
CountMinSketch::CountMinSketch(double epsilon, double delta) {
53+
width_ = std::exp(1) / epsilon;
54+
depth_ = std::log(1.0 / delta);
55+
counters_.reserve(depth_);
56+
for (uint64_t i = 0; i < depth_; ++i) {
57+
counters_.emplace_back(width_, 0);
58+
}
59+
}
60+
61+
void CountMinSketch::Update(uint64_t key, CountMinSketch::SizeT incr) {
62+
uint64_t i = 0;
63+
std::for_each(counters_.begin(), counters_.end(), [&](auto& ctr) {
64+
const uint64_t index = Hash(key, i++);
65+
const SizeT curr = ctr[index];
66+
const SizeT updated = curr + incr;
67+
ctr[index] = updated < curr ? MAX : updated;
68+
});
69+
}
70+
71+
CountMinSketch::SizeT CountMinSketch::EstimateFrequency(uint64_t key) const {
72+
uint64_t i = 0;
73+
auto it = counters_.begin();
74+
SizeT estimate = (*it)[Hash(key, i++)];
75+
std::for_each(++it, counters_.end(),
76+
[&](const auto& ctr) { estimate = std::min(estimate, ctr[Hash(key, i++)]); });
77+
return estimate;
78+
}
79+
80+
void CountMinSketch::Reset() {
81+
for (auto& ctr : counters_) {
82+
std::fill(ctr.begin(), ctr.end(), 0);
83+
}
84+
}
85+
86+
uint64_t CountMinSketch::Hash(uint64_t key, uint64_t i) const {
87+
return XXH3_64bits_withSeed(&key, sizeof(key), i) % width_;
88+
}
89+
90+
MultiSketch::MultiSketch(uint64_t rollover_ms, double epsilon, double delta, Decay decay)
91+
: rollover_ms_(rollover_ms), current_sketch_(sketches_.size() - 1), decay_t_(decay) {
92+
uint64_t now = GetCurrentMS();
93+
for (uint64_t i = 0; i < sketches_.size(); ++i) {
94+
sketches_[i] = SketchWithTimestamp{CountMinSketch{epsilon, delta}, now, now};
95+
}
96+
}
97+
98+
void MultiSketch::Update(uint64_t key, CountMinSketch::SizeT incr) {
99+
if (++rollover_check_ >= rollover_check_every_) {
100+
MaybeRolloverCurrentSketch();
101+
rollover_check_ = 0;
102+
}
103+
sketches_[current_sketch_].sketch_.Update(key, incr);
104+
}
105+
106+
CountMinSketch::SizeT MultiSketch::EstimateFrequency(uint64_t key) const {
107+
CountMinSketch::SizeT estimate = 0;
108+
const uint64_t now = GetCurrentMS();
109+
110+
for (const auto& sketch : sketches_) {
111+
const auto e = sketch.sketch_.EstimateFrequency(key);
112+
// TODO use average time of sketch to compute delta
113+
estimate += ApplyDecay(e, now - sketch.start_time_, decay_t_);
114+
}
115+
return estimate;
116+
}
117+
118+
void MultiSketch::MaybeRolloverCurrentSketch() {
119+
const uint64_t now = GetCurrentMS();
120+
const uint64_t oldest = (current_sketch_ + 1) % sketches_.size();
121+
if (const uint64_t oldest_ts = sketches_[oldest].start_time_; now - oldest_ts > rollover_ms_) {
122+
sketches_[oldest].sketch_.Reset();
123+
sketches_[oldest].start_time_ = now;
124+
sketches_[current_sketch_].end_time_ = now;
125+
current_sketch_ = oldest;
126+
}
127+
}
128+
129+
} // namespace dfly

src/core/count_min_sketch.h

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
#pragma once
5+
6+
#include <array>
7+
#include <cstdint>
8+
#include <vector>
9+
10+
namespace dfly {
11+
12+
class CountMinSketch {
13+
public:
14+
using SizeT = uint16_t;
15+
16+
// epsilon is the maximum deviation from actual frequency allowed per element times the sum of all
17+
// frequencies:
18+
// f_actual <= f_estimated <= f_actual + epsilon * N
19+
// where N is the sum of all frequencies
20+
// delta is the probability that f_estimated overshoots the epsilon threshold for a single
21+
// estimate, aka failure probability.
22+
23+
// With default values, the dimension of the counter table is 27182 x 9, and the size is
24+
// around 490 KiBs.
25+
explicit CountMinSketch(double epsilon = 0.0001, double delta = 0.0001);
26+
27+
void Update(uint64_t key, SizeT incr = 1);
28+
29+
SizeT EstimateFrequency(uint64_t key) const;
30+
31+
void Reset();
32+
33+
CountMinSketch(const CountMinSketch& other) = delete;
34+
CountMinSketch& operator=(const CountMinSketch& other) = delete;
35+
36+
CountMinSketch(CountMinSketch&& other) noexcept = default;
37+
CountMinSketch& operator=(CountMinSketch&& other) noexcept = default;
38+
39+
private:
40+
uint64_t Hash(uint64_t key, uint64_t i) const;
41+
42+
std::vector<std::vector<SizeT>> counters_;
43+
uint64_t width_;
44+
uint64_t depth_;
45+
};
46+
47+
// Maintains a list of three sketches with timestamps. Updates are made to the current sketch.
48+
// Once the oldest sketch is older than a fixed limit, it is discarded and becomes the current
49+
// sketch. Estimates are the sum across all sketches.
50+
class MultiSketch {
51+
struct SketchWithTimestamp {
52+
CountMinSketch sketch_;
53+
uint64_t start_time_{0};
54+
uint64_t end_time_{0};
55+
};
56+
57+
public:
58+
enum class Decay : uint8_t {
59+
Exponential,
60+
Linear,
61+
SlidingWindow,
62+
};
63+
64+
explicit MultiSketch(uint64_t rollover_ms = 1000, double epsilon = 0.0001, double delta = 0.0001,
65+
Decay decay = Decay::Exponential);
66+
67+
MultiSketch(const MultiSketch& other) = delete;
68+
MultiSketch& operator=(const MultiSketch& other) = delete;
69+
70+
MultiSketch(MultiSketch&& other) noexcept = default;
71+
MultiSketch& operator=(MultiSketch&& other) noexcept = default;
72+
73+
void Update(uint64_t key, CountMinSketch::SizeT incr = 1);
74+
75+
CountMinSketch::SizeT EstimateFrequency(uint64_t key) const;
76+
77+
// For unit tests, allow setting a smaller limit
78+
void SetRolloverCheckLimit(uint64_t rollover_check_limit) {
79+
rollover_check_every_ = rollover_check_limit;
80+
}
81+
82+
private:
83+
void MaybeRolloverCurrentSketch();
84+
85+
std::array<SketchWithTimestamp, 3> sketches_;
86+
uint64_t rollover_ms_;
87+
uint64_t current_sketch_;
88+
89+
// Do a rollover check every N calls to avoid expensive GetTime calls
90+
uint64_t rollover_check_every_{512};
91+
uint64_t rollover_check_{0};
92+
Decay decay_t_;
93+
};
94+
95+
} // namespace dfly

0 commit comments

Comments
 (0)