diff --git a/src/client.rs b/src/client.rs index 6cdea987..3f6ba13f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -821,6 +821,14 @@ where message_result = read_message(&mut self.read) => message_result? }; + if message[0] as char == 'X' { + debug!("Client disconnecting"); + + self.stats.disconnect(); + + return Ok(()); + } + // Handle admin database queries. if self.admin { debug!("Handling admin command"); @@ -940,14 +948,6 @@ where continue; } - 'X' => { - debug!("Client disconnecting"); - - self.stats.disconnect(); - - return Ok(()); - } - // Close (F) 'C' => { if prepared_statements_enabled { diff --git a/src/server.rs b/src/server.rs index c4d7a1af..037fab2d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -107,10 +107,10 @@ impl StreamInner { #[derive(Copy, Clone)] struct CleanupState { - /// If server connection requires DISCARD ALL before checkin because of set statement + /// If server connection requires RESET ALL before checkin because of set statement needs_cleanup_set: bool, - /// If server connection requires DISCARD ALL before checkin because of prepare statement + /// If server connection requires DEALLOCATE ALL before checkin because of prepare statement needs_cleanup_prepare: bool, } @@ -296,7 +296,7 @@ pub struct Server { /// Is the server broken? We'll remote it from the pool if so. bad: bool, - /// If server connection requires DISCARD ALL before checkin + /// If server connection requires reset statements before checkin cleanup_state: CleanupState, /// Mapping of clients and servers used for query cancellation. @@ -982,7 +982,7 @@ impl Server { // We don't detect set statements in transactions // No great way to differentiate between set and set local // As a result, we will miss cases when set statements are used in transactions - // This will reduce amount of discard statements sent + // This will reduce amount of reset statements sent if !self.in_transaction { debug!("Server connection marked for clean up"); self.cleanup_state.needs_cleanup_set = true; @@ -1304,12 +1304,21 @@ impl Server { // Client disconnected but it performed session-altering operations such as // SET statement_timeout to 1 or create a prepared statement. We clear that // to avoid leaking state between clients. For performance reasons we only - // send `DISCARD ALL` if we think the session is altered instead of just sending + // send `RESET ALL` if we think the session is altered instead of just sending // it before each checkin. if self.cleanup_state.needs_cleanup() && self.cleanup_connections { info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name); - self.query("DISCARD ALL").await?; - self.query("RESET ROLE").await?; + let mut reset_string = String::from("RESET ROLE;"); + + if self.cleanup_state.needs_cleanup_set { + reset_string.push_str("RESET ALL;"); + }; + + if self.cleanup_state.needs_cleanup_prepare { + reset_string.push_str("DEALLOCATE ALL;"); + }; + + self.query(&reset_string).await?; self.cleanup_state.reset(); } @@ -1336,7 +1345,7 @@ impl Server { self.last_activity } - // Marks a connection as needing DISCARD ALL at checkin + // Marks a connection as needing cleanup at checkin pub fn mark_dirty(&mut self) { self.cleanup_state.set_true(); } diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index 628680bd..1d4ade4c 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -221,7 +221,7 @@ conn.close end - it "Does not send DISCARD ALL unless necessary" do + it "Does not send RESET ALL unless necessary" do 10.times do conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn.async_exec("SET SERVER ROLE to 'primary'") @@ -229,7 +229,7 @@ conn.close end - expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + expect(processes.primary.count_query("RESET ALL")).to eq(0) 10.times do conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) @@ -239,7 +239,7 @@ conn.close end - expect(processes.primary.count_query("DISCARD ALL")).to eq(10) + expect(processes.primary.count_query("RESET ALL")).to eq(10) end it "Resets server roles correctly" do @@ -273,7 +273,7 @@ end end - it "Does not send DISCARD ALL unless necessary" do + it "Does not send RESET ALL unless necessary" do 10.times do conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn.async_exec("SET SERVER ROLE to 'primary'") @@ -282,7 +282,7 @@ conn.close end - expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + expect(processes.primary.count_query("RESET ALL")).to eq(0) 10.times do conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) @@ -292,7 +292,7 @@ conn.close end - expect(processes.primary.count_query("DISCARD ALL")).to eq(10) + expect(processes.primary.count_query("RESET ALL")).to eq(10) end it "Respects tracked parameters on startup" do @@ -331,7 +331,7 @@ conn.async_exec("COMMIT") conn.close end - expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + expect(processes.primary.count_query("RESET ALL")).to eq(0) 10.times do conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) @@ -341,7 +341,7 @@ conn.async_exec("COMMIT") conn.close end - expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + expect(processes.primary.count_query("RESET ALL")).to eq(0) end end @@ -355,7 +355,7 @@ conn.close puts processes.pgcat.logs - expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + expect(processes.primary.count_query("RESET ALL")).to eq(0) end it "will not clean up prepared statements" do @@ -366,7 +366,7 @@ conn.close puts processes.pgcat.logs - expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + expect(processes.primary.count_query("RESET ALL")).to eq(0) end end end