Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduces the amount of time the get_pool operation takes #625

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ lcov.info
dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv
29 changes: 17 additions & 12 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ where
}
// Authenticate normal user.
else {
let mut pool = match get_pool(pool_name, username) {
let pool = match get_pool(pool_name, username) {
Some(pool) => pool,
None => {
error_response(
Expand Down Expand Up @@ -800,6 +800,18 @@ where
&self.pool_name,
);

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default()
} else {
self.get_pool().await?
};

query_router.update_pool_settings(&pool.settings);

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
Expand Down Expand Up @@ -853,12 +865,6 @@ where
continue;
}

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;
query_router.update_pool_settings(pool.settings.clone());

let mut initial_parsed_ast = None;

match message[0] as char {
Expand Down Expand Up @@ -990,12 +996,11 @@ where
};

// Check if the pool is paused and wait until it's resumed.
if pool.wait_paused().await {
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
}
pool.wait_paused().await;

query_router.update_pool_settings(pool.settings.clone());
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
query_router.update_pool_settings(&pool.settings);

let current_shard = query_router.shard();

Expand Down
18 changes: 9 additions & 9 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ impl Default for PoolSettings {
#[derive(Clone, Debug, Default)]
pub struct ConnectionPool {
/// The pools handled internally by bb8.
databases: Vec<Vec<Pool<ServerPool>>>,
databases: Arc<Vec<Vec<Pool<ServerPool>>>>,

/// The addresses (host, port, role) to handle
/// failover and load balancing deterministically.
addresses: Vec<Vec<Address>>,
addresses: Arc<Vec<Vec<Address>>>,

/// List of banned addresses (see above)
/// that should not be queried.
Expand All @@ -206,7 +206,7 @@ pub struct ConnectionPool {
original_server_parameters: Arc<RwLock<ServerParameters>>,

/// Pool configuration.
pub settings: PoolSettings,
pub settings: Arc<PoolSettings>,

/// If not validated, we need to double check the pool is available before allowing a client
/// to use it.
Expand Down Expand Up @@ -445,13 +445,13 @@ impl ConnectionPool {
}

let pool = ConnectionPool {
databases: shards,
addresses,
databases: Arc::new(shards),
addresses: Arc::new(addresses),
banlist: Arc::new(RwLock::new(banlist)),
config_hash: new_pool_hash_value,
original_server_parameters: Arc::new(RwLock::new(ServerParameters::new())),
auth_hash: pool_auth_hash,
settings: PoolSettings {
settings: Arc::new(PoolSettings {
pool_mode: match user.pool_mode {
Some(pool_mode) => pool_mode,
None => pool_config.pool_mode,
Expand Down Expand Up @@ -494,7 +494,7 @@ impl ConnectionPool {
Some(ref plugins) => Some(plugins.clone()),
None => config.plugins.clone(),
},
},
}),
validated: Arc::new(AtomicBool::new(false)),
paused: Arc::new(AtomicBool::new(false)),
paused_waiter: Arc::new(Notify::new()),
Expand All @@ -504,7 +504,7 @@ impl ConnectionPool {
// before setting it globally.
// Do this async and somewhere else, we don't have to wait here.
if config.general.validate_config {
let mut validate_pool = pool.clone();
let validate_pool = pool.clone();
tokio::task::spawn(async move {
let _ = validate_pool.validate().await;
});
Expand All @@ -525,7 +525,7 @@ impl ConnectionPool {
/// when they connect.
/// This also warms up the pool for clients that connect when
/// the pooler starts up.
pub async fn validate(&mut self) -> Result<(), Error> {
pub async fn validate(&self) -> Result<(), Error> {
let mut futures = Vec::new();
let validated = Arc::clone(&self.validated);

Expand Down
10 changes: 5 additions & 5 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl QueryRouter {
}

/// Pool settings can change because of a config reload.
pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) {
self.pool_settings = pool_settings;
pub fn update_pool_settings(&mut self, pool_settings: &PoolSettings) {
self.pool_settings = pool_settings.clone();
}

pub fn pool_settings(&self) -> &PoolSettings {
Expand Down Expand Up @@ -1403,7 +1403,7 @@ mod test {
assert_eq!(qr.primary_reads_enabled, None);

// Internal state must not be changed due to this, only defaults
qr.update_pool_settings(pool_settings.clone());
qr.update_pool_settings(&pool_settings);

assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None);
Expand Down Expand Up @@ -1476,7 +1476,7 @@ mod test {
};

let mut qr = QueryRouter::new();
qr.update_pool_settings(pool_settings);
qr.update_pool_settings(&pool_settings);

// Shard should start out unset
assert_eq!(qr.active_shard, None);
Expand Down Expand Up @@ -1860,7 +1860,7 @@ mod test {
..Default::default()
};
let mut qr = QueryRouter::new();
qr.update_pool_settings(pool_settings);
qr.update_pool_settings(&pool_settings);

let query = simple_query("SELECT * FROM pg_database");
let ast = qr.parse(&query).unwrap();
Expand Down