Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statement timeout + replica imbalance #122

Merged
merged 10 commits into from
Aug 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ password = "sharding_user"
# The maximum number of connection from a single Pgcat process to any database in the cluster
# is the sum of pool_size across all users.
pool_size = 9
statement_timeout = 0

[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 30000

# Shard 0
[pools.sharded_db.shards.0]
Expand Down Expand Up @@ -130,6 +132,7 @@ sharding_function = "pg_bigint_hash"
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 30000

[pools.simple_db.shards.0]
servers = [
Expand Down
12 changes: 12 additions & 0 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_te
# Replica/primary selection & more sharding tests
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null

# Statement timeout tests
sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml
kill -SIGHUP $(pgrep pgcat) # Reload config
sleep 0.2

# This should timeout
(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)')

# Disable statement timeout
sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml
kill -SIGHUP $(pgrep pgcat) # Reload config again

#
# ActiveRecord tests
#
Expand Down
5 changes: 5 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,14 @@ password = "sharding_user"
# is the sum of pool_size across all users.
pool_size = 9

# Maximum query duration. Dangerous, but protetcts against DBs that died and a non-obvious way.
statement_timeout = 0

[pools.sharded_db.users.1]
username = "other_user"
password = "other_user"
pool_size = 21
statement_timeout = 15000

# Shard 0
[pools.sharded_db.shards.0]
Expand Down Expand Up @@ -130,6 +134,7 @@ sharding_function = "pg_bigint_hash"
username = "simple_user"
password = "simple_user"
pool_size = 5
statement_timeout = 0

[pools.simple_db.shards.0]
servers = [
Expand Down
51 changes: 44 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ where
// The query router determines where the query is going to go,
// e.g. primary, replica, which shard.
let mut query_router = QueryRouter::new();
let mut round_robin = 0;
let mut round_robin = rand::random();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch @drdrsh !


// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
Expand Down Expand Up @@ -970,17 +970,54 @@ where
}

async fn receive_server_message(
&self,
&mut self,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<BytesMut, Error> {
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
Err(err)
if pool.settings.user.statement_timeout > 0 {
match tokio::time::timeout(
tokio::time::Duration::from_millis(pool.settings.user.statement_timeout),
server.recv(),
)
.await
{
Ok(result) => match result {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
)
.await?;
Err(err)
}
},
Err(_) => {
error!(
"Statement timeout while talking to {:?} with user {}",
address, pool.settings.user.username
);
server.mark_bad();
pool.ban(address, shard, self.process_id);
error_response_terminal(&mut self.write, "pool statement timeout").await?;
Err(Error::StatementTimeout)
}
}
} else {
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
)
.await?;
Err(err)
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub struct User {
pub username: String,
pub password: String,
pub pool_size: u32,
pub statement_timeout: u64,
}

impl Default for User {
Expand All @@ -108,6 +109,7 @@ impl Default for User {
username: String::from("postgres"),
password: String::new(),
pool_size: 15,
statement_timeout: 0,
}
}
}
Expand Down Expand Up @@ -326,6 +328,7 @@ impl Config {
};

for (pool_name, pool_config) in &self.pools {
// TODO: Make this output prettier (maybe a table?)
info!("--- Settings for pool {} ---", pool_name);
info!(
"Pool size from all users: {}",
Expand All @@ -340,8 +343,17 @@ impl Config {
info!("Sharding function: {}", pool_config.sharding_function);
info!("Primary reads: {}", pool_config.primary_reads_enabled);
info!("Query router: {}", pool_config.query_parser_enabled);

// TODO: Make this prettier.
info!("Number of shards: {}", pool_config.shards.len());
info!("Number of users: {}", pool_config.users.len());

for user in &pool_config.users {
info!(
"{} pool size: {}, statement timeout: {}",
user.1.username, user.1.pool_size, user.1.statement_timeout
);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub enum Error {
AllServersDown,
ClientError,
TlsError,
StatementTimeout,
}