Skip to content

Commit

Permalink
comments and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kostasrim committed Feb 28, 2025
1 parent 033672f commit fc440bf
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 34 deletions.
64 changes: 37 additions & 27 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ void SlowLogGet(dfly::CmdArgList args, std::string_view sub_cmd, util::ProactorP

std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, Namespace* ns,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress) {
std::function<bool()> is_pause_in_progress,
std::function<void()> maybe_cleanup) {
// Track connections and set pause state to be able to wait untill all running transactions read
// the new pause state. Exlude already paused commands from the busy count. Exlude tracking
// blocked connections because: a) If the connection is blocked it is puased. b) We read pause
Expand Down Expand Up @@ -729,23 +730,28 @@ std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, Namesp
shard_set->RunBriefInParallel(
[ns](EngineShard* shard) { ns->GetDbSlice(shard->shard_id()).SetExpireAllowed(false); });

return fb2::Fiber("client_pause", [is_pause_in_progress, pause_state, ns]() mutable {
// On server shutdown we sleep 10ms to make sure all running task finish, therefore 10ms steps
// ensure this fiber will not left hanging .
constexpr auto step = 10ms;
while (is_pause_in_progress()) {
ThisFiber::SleepFor(step);
}

ServerState& etl = *ServerState::tlocal();
if (etl.gstate() != GlobalState::SHUTTING_DOWN) {
shard_set->pool()->AwaitFiberOnAll([pause_state](util::ProactorBase* pb) {
ServerState::tlocal()->SetPauseState(pause_state, false);
});
shard_set->RunBriefInParallel(
[ns](EngineShard* shard) { ns->GetDbSlice(shard->shard_id()).SetExpireAllowed(true); });
}
});
return fb2::Fiber("client_pause",
[is_pause_in_progress, pause_state, ns, maybe_cleanup]() mutable {
// On server shutdown we sleep 10ms to make sure all running task finish,
// therefore 10ms steps ensure this fiber will not left hanging .
constexpr auto step = 10ms;
while (is_pause_in_progress()) {
ThisFiber::SleepFor(step);
}

ServerState& etl = *ServerState::tlocal();
if (etl.gstate() != GlobalState::SHUTTING_DOWN) {
shard_set->pool()->AwaitFiberOnAll([pause_state](util::ProactorBase* pb) {
ServerState::tlocal()->SetPauseState(pause_state, false);
});
shard_set->RunBriefInParallel([ns](EngineShard* shard) {
ns->GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
});
}
if (maybe_cleanup) {
maybe_cleanup();
}
});
}

ServerFamily::ServerFamily(Service* service) : service_(*service) {
Expand Down Expand Up @@ -935,7 +941,7 @@ void ServerFamily::JoinSnapshotSchedule() {
void ServerFamily::Shutdown() {
VLOG(1) << "ServerFamily::Shutdown";

client_pause_fb_.JoinIfNeeded();
client_pause_.store(true);

load_fiber_.JoinIfNeeded();

Expand All @@ -957,6 +963,8 @@ void ServerFamily::Shutdown() {
});
}

client_pause_ec_.await([this] { return active_pauses_.load() == 0; });

pb_task_->Await([this] {
auto ec = journal_->Close();
LOG_IF(ERROR, ec) << "Error closing journal " << ec;
Expand Down Expand Up @@ -1860,9 +1868,7 @@ void ServerFamily::ClientUnPauseCmd(CmdArgList args, SinkReplyBuilder* builder)
builder->SendError(facade::kSyntaxErr);
return;
}
std::unique_lock lk(client_pause_mu_);
client_pause_.store(false, std::memory_order_relaxed);
client_pause_fb_.JoinIfNeeded();
builder->SendOk();
}

Expand Down Expand Up @@ -3223,16 +3229,20 @@ void ServerFamily::ClientPauseCmd(CmdArgList args, SinkReplyBuilder* builder,
const auto timeout_ms = timeout * 1ms;
auto is_pause_in_progress = [this, end_time = chrono::steady_clock::now() + timeout_ms] {
return ServerState::tlocal()->gstate() != GlobalState::SHUTTING_DOWN &&
chrono::steady_clock::now() < end_time && client_pause_.load(std::memory_order_relaxed);
chrono::steady_clock::now() < end_time;
};

auto cleanup = [this] {
active_pauses_.fetch_sub(1);
client_pause_ec_.notify();
};

std::unique_lock lk(client_pause_mu_);
if (auto pause_fb_opt =
Pause(listeners, cntx->ns, cntx->conn(), pause_state, std::move(is_pause_in_progress));
if (auto pause_fb_opt = Pause(listeners, cntx->ns, cntx->conn(), pause_state,
std::move(is_pause_in_progress), cleanup);
pause_fb_opt) {
client_pause_fb_.JoinIfNeeded();
client_pause_.store(true, std::memory_order_relaxed);
client_pause_fb_ = std::move(*pause_fb_opt);
active_pauses_.fetch_add(1);
pause_fb_opt->Detach();
builder->SendOk();
} else {
builder->SendError("Failed to pause all running clients");
Expand Down
10 changes: 7 additions & 3 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class ServerFamily {
bool accepting_connections_ = true;
util::ProactorBase* pb_task_ = nullptr;

mutable util::fb2::Mutex replicaof_mu_, save_mu_, client_pause_mu_;
mutable util::fb2::Mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_ ABSL_GUARDED_BY(replicaof_mu_);
std::vector<std::unique_ptr<Replica>> cluster_replicas_
ABSL_GUARDED_BY(replicaof_mu_); // used to replicating multiple nodes to single dragonfly
Expand All @@ -362,7 +362,10 @@ class ServerFamily {
std::shared_ptr<detail::SnapshotStorage> snapshot_storage_;

std::atomic<bool> client_pause_ = false;
util::fb2::Fiber client_pause_fb_;
// We need this because if dragonfly shuts down during pause, ServerState will destruct
// before the dettached fiber Pause() causing a seg fault.
std::atomic<size_t> active_pauses_ = 0;
util::fb2::EventCount client_pause_ec_;

// protected by save_mu_
util::fb2::Fiber bg_save_fb_;
Expand All @@ -377,6 +380,7 @@ class ServerFamily {
// Reusable CLIENT PAUSE implementation that blocks while polling is_pause_in_progress
std::optional<util::fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, Namespace* ns,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress);
std::function<bool()> is_pause_in_progress,
std::function<void()> maybe_cleanup = {});

} // namespace dfly
32 changes: 28 additions & 4 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,22 +1167,46 @@ async def test_client_unpause(df_factory):
server.start()

async_client = server.client()
await async_client.client_pause(15000, all=False)
await async_client.client_pause(3000, all=False)

async def set_foo():
client = server.client()
await client.execute_command("SET", "foo", "bar")

p1 = asyncio.create_task(set_foo())

await asyncio.sleep(2)
await asyncio.sleep(1)
assert not p1.done()

async with async_timeout.timeout(2):
async with async_timeout.timeout(1):
await async_client.client_unpause()

await p1
assert p1.done()

await async_client.client_pause(5, all=False)
await async_client.client_pause(1, all=False)
await asyncio.sleep(2)
server.stop()


async def test_client_pause_b2b(async_client):
async with async_timeout.timeout(1):
await async_client.client_pause(2000, all=False)
await async_client.client_pause(2000, all=False)


async def test_client_unpause_after_pause_all(async_client):
await async_client.client_pause(2000, all=True)
# Blocks and waits
res = await async_client.client_unpause()
assert res == "OK"
await async_client.client_pause(2000, all=False)
res = await async_client.client_unpause()


async def test_client_detached_crash(df_factory):
server = df_factory.create(proactor_threads=1)
server.start()
async_client = server.client()
await async_client.client_pause(2, all=False)
server.stop()

0 comments on commit fc440bf

Please sign in to comment.