Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset instead of discard all #549

Merged
merged 3 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,14 @@ where
message_result = read_message(&mut self.read) => message_result?
};

if message[0] as char == 'X' {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A previous PR had moved the admin handling to before the message code match, an admin client disconnect produces an error log so this moves the 'X' message code check to before the admin client handling

debug!("Client disconnecting");

self.stats.disconnect();

return Ok(());
}

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
Expand Down Expand Up @@ -940,14 +948,6 @@ where
continue;
}

'X' => {
debug!("Client disconnecting");

self.stats.disconnect();

return Ok(());
}

// Close (F)
'C' => {
if prepared_statements_enabled {
Expand Down
25 changes: 17 additions & 8 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ impl StreamInner {

#[derive(Copy, Clone)]
struct CleanupState {
/// If server connection requires DISCARD ALL before checkin because of set statement
/// If server connection requires RESET ALL before checkin because of set statement
needs_cleanup_set: bool,

/// If server connection requires DISCARD ALL before checkin because of prepare statement
/// If server connection requires DEALLOCATE ALL before checkin because of prepare statement
needs_cleanup_prepare: bool,
}

Expand Down Expand Up @@ -296,7 +296,7 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,

/// If server connection requires DISCARD ALL before checkin
/// If server connection requires reset statements before checkin
cleanup_state: CleanupState,

/// Mapping of clients and servers used for query cancellation.
Expand Down Expand Up @@ -982,7 +982,7 @@ impl Server {
// We don't detect set statements in transactions
// No great way to differentiate between set and set local
// As a result, we will miss cases when set statements are used in transactions
// This will reduce amount of discard statements sent
// This will reduce amount of reset statements sent
if !self.in_transaction {
debug!("Server connection marked for clean up");
self.cleanup_state.needs_cleanup_set = true;
Expand Down Expand Up @@ -1304,12 +1304,21 @@ impl Server {
// Client disconnected but it performed session-altering operations such as
// SET statement_timeout to 1 or create a prepared statement. We clear that
// to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending
// send `RESET ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?;
let mut reset_string = String::from("RESET ROLE;");

if self.cleanup_state.needs_cleanup_set {
reset_string.push_str("RESET ALL;");
};

if self.cleanup_state.needs_cleanup_prepare {
reset_string.push_str("DEALLOCATE ALL;");
};

self.query(&reset_string).await?;
self.cleanup_state.reset();
}

Expand All @@ -1336,7 +1345,7 @@ impl Server {
self.last_activity
}

// Marks a connection as needing DISCARD ALL at checkin
// Marks a connection as needing cleanup at checkin
pub fn mark_dirty(&mut self) {
self.cleanup_state.set_true();
}
Expand Down
20 changes: 10 additions & 10 deletions tests/ruby/misc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@
conn.close
end

it "Does not send DISCARD ALL unless necessary" do
it "Does not send RESET ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)

10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Expand All @@ -239,7 +239,7 @@
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
expect(processes.primary.count_query("RESET ALL")).to eq(10)
end

it "Resets server roles correctly" do
Expand Down Expand Up @@ -273,7 +273,7 @@
end
end

it "Does not send DISCARD ALL unless necessary" do
it "Does not send RESET ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
Expand All @@ -282,7 +282,7 @@
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)

10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Expand All @@ -292,7 +292,7 @@
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
expect(processes.primary.count_query("RESET ALL")).to eq(10)
end

it "Respects tracked parameters on startup" do
Expand Down Expand Up @@ -331,7 +331,7 @@
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)

10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Expand All @@ -341,7 +341,7 @@
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end
end

Expand All @@ -355,7 +355,7 @@
conn.close

puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end

it "will not clean up prepared statements" do
Expand All @@ -366,7 +366,7 @@
conn.close

puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end
end
end
Expand Down