diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 94b1e337774e..6699a6b44478 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -18,7 +18,7 @@ if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") tiering/external_alloc.cc) add_executable(dfly_bench dfly_bench.cc) - cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random) + cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random redis_lib) cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY) cxx_test(tiering/op_manager_test dfly_test_lib LABELS DFLY) cxx_test(tiering/small_bins_test dfly_test_lib LABELS DFLY) diff --git a/src/server/dfly_bench.cc b/src/server/dfly_bench.cc index 72123cf1aca3..5f4304aab51e 100644 --- a/src/server/dfly_bench.cc +++ b/src/server/dfly_bench.cc @@ -2,6 +2,11 @@ // See LICENSE for licensing terms. // +extern "C" { +#include "redis/crc16.h" +} + +#include #include #include #include @@ -9,6 +14,7 @@ #include #include +#include #include "absl/time/clock.h" #include "absl/time/time.h" @@ -17,9 +23,11 @@ #include "base/random.h" #include "base/zipf_gen.h" #include "facade/redis_parser.h" +#include "io/io.h" #include "io/io_buf.h" #include "util/fibers/dns_resolve.h" #include "util/fibers/pool.h" +#include "util/fibers/proactor_base.h" #include "util/fibers/uring_socket.h" // A load-test for DragonflyDB that fixes coordinated omission problem. @@ -59,6 +67,7 @@ using namespace util; using absl::GetFlag; using absl::StrFormat; using facade::RedisParser; +using facade::RespExpr; using facade::RespVec; using tcp = ::boost::asio::ip::tcp; using absl::StrCat; @@ -71,6 +80,7 @@ thread_local base::Xoroshiro128p bit_gen; enum Protocol { RESP, MC_TEXT } protocol; enum DistType { UNIFORM, NORMAL, ZIPFIAN, SEQUENTIAL } dist_type{UNIFORM}; +constexpr uint16_t kNumSlots = 16384; string_view kTmplPatterns[] = {"__key__"sv, "__data__"sv, "__score__"sv}; @@ -94,25 +104,39 @@ static string GetRandomHex(size_t len) { return res; } +struct ShardInfo { + uint16_t slot_start = 0; + uint16_t slot_end = 0; + tcp::endpoint endpoint; +}; + +using ClusterSpec = vector; + class KeyGenerator { public: KeyGenerator(uint32_t min, uint32_t max); - string operator()(); + string operator()(uint16_t slot_id) const; + void EnableClusterMode(); + + bool IsClusterEnabled() const { + return !hash_slots_.empty(); + } private: string prefix_; uint64_t min_, max_, range_; - uint64_t seq_cursor_; + mutable uint64_t seq_cursor_; double stddev_ = 1.0 / 6; - optional zipf_; + mutable optional zipf_; + vector hash_slots_; }; class CommandGenerator { public: CommandGenerator(KeyGenerator* keygen); - string Next(); + string Next(uint16_t slot_min, uint16_t slot_max); bool might_hit() const { return might_hit_; @@ -156,11 +180,15 @@ CommandGenerator::CommandGenerator(KeyGenerator* keygen) : keygen_(keygen) { } } -string CommandGenerator::Next() { +string CommandGenerator::Next(uint16_t slot_min, uint16_t slot_max) { cmd_.clear(); noreply_ = false; + uint16_t slot_id = 0; + if (keygen_->IsClusterEnabled()) { + slot_id = absl::Uniform(absl::IntervalClosedClosed, bit_gen, slot_min, slot_max); + } if (command_.empty()) { - string key = (*keygen_)(); + string key = (*keygen_)(slot_id); if (absl::Uniform(bit_gen, 0U, ratio_get_ + ratio_set_) < ratio_set_) { FillSet(key); @@ -178,7 +206,7 @@ string CommandGenerator::Next() { for (auto [pos, type] : key_indices_) { switch (type) { case KEY: - str = (*keygen_)(); + str = (*keygen_)(slot_id); break; case VALUE: str = GetRandomHex(value_.size()); @@ -254,7 +282,8 @@ class Driver { Driver(Driver&&) = delete; Driver& operator=(Driver&&) = delete; - void Connect(unsigned index, const tcp::endpoint& ep); + void Connect(unsigned index, const tcp::endpoint& ep, + optional> slot_range); void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen); float done() const { @@ -283,6 +312,7 @@ class Driver { ClientStats& stats_; unique_ptr socket_; + optional> slot_range_; fb2::Fiber receive_fb_; queue reqs_; fb2::CondVarAny cnd_; @@ -295,15 +325,11 @@ class Driver { class TLocalClient { public: explicit TLocalClient(ProactorBase* p) : p_(p) { - drivers_.resize(GetFlag(FLAGS_c)); - for (auto& driver : drivers_) { - driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_}); - } } TLocalClient(const TLocalClient&) = delete; - void Connect(tcp::endpoint ep); + void Connect(tcp::endpoint ep, const ClusterSpec& cluster); void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns); void Join(); @@ -344,7 +370,7 @@ class TLocalClient { optional cmd_gen_; vector driver_fbs_; - + ClusterSpec cluster_spec_; uint64_t cur_cycle_ns_; uint64_t target_cycle_; int64_t start_time_; @@ -371,7 +397,7 @@ KeyGenerator::KeyGenerator(uint32_t min, uint32_t max) } } -string KeyGenerator::operator()() { +string KeyGenerator::operator()(uint16_t slot_id) const { uint64_t key_suffix{0}; switch (dist_type) { case UNIFORM: @@ -391,12 +417,35 @@ string KeyGenerator::operator()() { seq_cursor_ = min_; break; } + string res = prefix_; + if (IsClusterEnabled()) { + absl::StrAppend(&res, "{", hash_slots_[slot_id], "}"); + } + absl::StrAppend(&res, key_suffix); + return res; +} - return StrCat(prefix_, key_suffix); +void KeyGenerator::EnableClusterMode() { + hash_slots_.resize(kNumSlots); + uint32_t i = 0; + uint32_t num_slots_filled = 0; + + // Precompute the hash slots for each of the slot ids so given the slot id + // we could generate a key that belongs to that slot. + while (num_slots_filled < kNumSlots) { + string slot = absl::StrCat(i); + uint16_t id = crc16(slot.data(), slot.length()) % kNumSlots; + if (hash_slots_[id].empty()) { + hash_slots_[id] = slot; + num_slots_filled++; + } + ++i; + } } -void Driver::Connect(unsigned index, const tcp::endpoint& ep) { - VLOG(2) << "Connecting " << index; +void Driver::Connect(unsigned index, const tcp::endpoint& ep, + optional> slot_range) { + VLOG(2) << "Connecting " << index << " to " << ep; error_code ec = socket_->Connect(ep); CHECK(!ec) << "Could not connect to " << ep << " " << ec; if (GetFlag(FLAGS_tcp_nodelay)) { @@ -417,6 +466,7 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) { string_view resp = io::View(io::Bytes(buf, *res_sz)); CHECK(absl::EndsWith(resp, "\r\n")) << resp; } + slot_range_ = slot_range; receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); }); } @@ -427,6 +477,12 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { stats_.num_clients++; int64_t time_limit_ns = time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX; + uint16_t slot_min = 0; + uint16_t slot_max = kNumSlots - 1; + if (slot_range_) { + slot_min = slot_range_->first; + slot_max = slot_range_->second; + } for (unsigned i = 0; i < num_reqs_; ++i) { int64_t now = absl::GetCurrentTimeNanos(); @@ -456,7 +512,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { fb2::NoOpLock lk; cnd_.wait(lk, [this, pipeline] { return reqs_.size() < pipeline; }); } - string cmd = cmd_gen->Next(); + string cmd = cmd_gen->Next(slot_min, slot_max); Req req; req.start = absl::GetCurrentTimeNanos(); @@ -552,9 +608,10 @@ void Driver::ParseRESP() { do { result = parser_.Parse(io_buf_.InputBuffer(), &consumed, &parse_args); if (result == RedisParser::OK && !parse_args.empty()) { - if (parse_args[0].type == facade::RespExpr::ERROR) { + if (parse_args[0].type == RespExpr::ERROR) { + VLOG(2) << "Error " << io::View(io_buf_.InputBuffer()); ++stats_.num_errors; - } else if (reqs_.front().might_hit && parse_args[0].type != facade::RespExpr::NIL) { + } else if (reqs_.front().might_hit && parse_args[0].type != RespExpr::NIL) { ++stats_.hit_count; } parse_args.clear(); @@ -602,14 +659,33 @@ void Driver::ParseMC() { } } -void TLocalClient::Connect(tcp::endpoint ep) { +void TLocalClient::Connect(tcp::endpoint ep, const ClusterSpec& cluster) { VLOG(2) << "Connecting client..."; + + cluster_spec_ = cluster; + unsigned conn_per_shard = GetFlag(FLAGS_c); + if (cluster.empty()) { + drivers_.resize(conn_per_shard); + } else { + drivers_.resize(cluster.size() * conn_per_shard); + } + + for (auto& driver : drivers_) { + driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_}); + } vector fbs(drivers_.size()); for (size_t i = 0; i < fbs.size(); ++i) { - fbs[i] = MakeFiber([&, i] { + optional> slot_range; + tcp::endpoint shard_ep = ep; + if (!cluster.empty()) { + size_t shard = i / conn_per_shard; + slot_range = {cluster[shard].slot_start, cluster[shard].slot_end}; + shard_ep = cluster[shard].endpoint; + } + fbs[i] = MakeFiber([&, shard_ep, i, slot_range] { ThisFiber::SetName(StrCat("connect/", i)); - drivers_[i]->Connect(i, ep); + drivers_[i]->Connect(i, shard_ep, slot_range); }); } @@ -622,7 +698,9 @@ void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) cmd_gen_.emplace(&key_gen_.value()); driver_fbs_.resize(drivers_.size()); - + if (!cluster_spec_.empty()) { + key_gen_->EnableClusterMode(); + } cur_cycle_ns_ = cycle_ns; target_cycle_ = cycle_ns; start_time_ = absl::GetCurrentTimeNanos(); @@ -724,6 +802,82 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) { } } +ClusterSpec FetchCluster(const tcp::endpoint& ep, ProactorBase* proactor) { + unique_ptr socket(proactor->CreateSocket()); + error_code ec = socket->Connect(ep); + CHECK(!ec) << "Could not connect to " << ep << " " << ec; + ec = socket->Write(io::Buffer("cluster nodes\r\n")); + CHECK(!ec); + facade::RedisParser parser{RedisParser::CLIENT, 1024}; + uint8_t buf[1024]; + RespVec resp_vec; + while (true) { + io::Result res = socket->Recv(buf); + CHECK(res) << res.error().message(); + RespExpr::Buffer bytes(buf, *res); + uint32_t consumed = 0; + facade::RedisParser::Result result = parser.Parse(bytes, &consumed, &resp_vec); + if (result == facade::RedisParser::OK) { + break; + } + CHECK_EQ(result, facade::RedisParser::INPUT_PENDING); + } + CHECK_EQ(1u, resp_vec.size()); + std::ignore = socket->Close(); + if (resp_vec.front().type == RespExpr::ERROR) { + LOG(INFO) << "Cluster command failed " << resp_vec.front().GetString(); + return {}; + } + string cluster_spec = resp_vec.front().GetString(); + LOG(INFO) << "Cluster spec: " << cluster_spec; + vector lines = absl::StrSplit(cluster_spec, '\n', absl::SkipEmpty()); + ClusterSpec res; + for (string_view line : lines) { + vector parts = absl::StrSplit(line, ' '); + // + // ... + if (parts.size() < 9) { + LOG(WARNING) << "Skipping line: " << line; + continue; + } + ShardInfo shard; + vector addr_parts = absl::StrSplit(parts[1], ':'); + CHECK_EQ(2u, addr_parts.size()); + auto address = ::boost::asio::ip::make_address(addr_parts[0]); + + uint32_t val; + vector port_parts = absl::StrSplit(addr_parts[1], '@'); + CHECK_EQ(2u, port_parts.size()); + CHECK(absl::SimpleAtoi(port_parts[0], &val)); + CHECK_LT(val, 65536u); + + shard.endpoint = tcp::endpoint(address, val); + + string_view flags = parts[2]; + absl::flat_hash_set flags_set(absl::StrSplit(flags, ',')); + if (!flags_set.contains("master")) { + LOG(INFO) << "Skipping non-master node " << shard.endpoint << " " << flags; + continue; + } + + vector slots = absl::StrSplit(parts[8], '-'); + if (!absl::SimpleAtoi(slots[0], &val) || val >= kNumSlots) { + LOG(ERROR) << "Invalid slot definition " << parts[8]; + continue; + } + shard.slot_start = val; + if (slots.size() > 1) { + CHECK(absl::SimpleAtoi(slots[1], &val)); + shard.slot_end = val; + } else { + shard.slot_end = shard.slot_start; + } + res.push_back(shard); + } + + return res; +} + int main(int argc, char* argv[]) { MainInitGuard guard(&argc, &argv); @@ -763,14 +917,21 @@ int main(int argc, char* argv[]) { auto address = ::boost::asio::ip::make_address(ip_addr); tcp::endpoint ep{address, GetFlag(FLAGS_p)}; - LOG(INFO) << "Connecting threads"; + ClusterSpec shards; + if (protocol == RESP) { + shards = proactor->Await([&] { return FetchCluster(ep, proactor); }); + } + LOG(INFO) << "Connecting threads to " + << (shards.empty() ? string("single node ") + : absl::StrCat(shards.size(), " shard cluster")); + pp->AwaitFiberOnAll([&](unsigned index, auto* p) { base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL); auto seed = seed_mix(); VLOG(1) << "Seeding bitgen with seed " << seed; bit_gen.seed(seed); client = make_unique(p); - client->Connect(ep); + client->Connect(ep, shards); }); const uint32_t key_minimum = GetFlag(FLAGS_key_minimum);