Skip to content

Commit

Permalink
Reduces the amount of time the get_pool operation takes (#625)
Browse files Browse the repository at this point in the history
* Reduces the amount of time the get_pool operation takes

* trigger build

* Fix admin
  • Loading branch information
zainkabani authored Oct 20, 2023
1 parent 2c7bf52 commit d37df43
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ lcov.info
dev/.bash_history
dev/cache
!dev/cache/.keepme
.venv
29 changes: 17 additions & 12 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ where
}
// Authenticate normal user.
else {
let mut pool = match get_pool(pool_name, username) {
let pool = match get_pool(pool_name, username) {
Some(pool) => pool,
None => {
error_response(
Expand Down Expand Up @@ -800,6 +800,18 @@ where
&self.pool_name,
);

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = if self.admin {
// Admin clients do not use pools.
ConnectionPool::default()
} else {
self.get_pool().await?
};

query_router.update_pool_settings(&pool.settings);

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
Expand Down Expand Up @@ -853,12 +865,6 @@ where
continue;
}

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;
query_router.update_pool_settings(pool.settings.clone());

let mut initial_parsed_ast = None;

match message[0] as char {
Expand Down Expand Up @@ -990,12 +996,11 @@ where
};

// Check if the pool is paused and wait until it's resumed.
if pool.wait_paused().await {
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
}
pool.wait_paused().await;

query_router.update_pool_settings(pool.settings.clone());
// Refresh pool information, something might have changed.
pool = self.get_pool().await?;
query_router.update_pool_settings(&pool.settings);

let current_shard = query_router.shard();

Expand Down
18 changes: 9 additions & 9 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ impl Default for PoolSettings {
#[derive(Clone, Debug, Default)]
pub struct ConnectionPool {
/// The pools handled internally by bb8.
databases: Vec<Vec<Pool<ServerPool>>>,
databases: Arc<Vec<Vec<Pool<ServerPool>>>>,

/// The addresses (host, port, role) to handle
/// failover and load balancing deterministically.
addresses: Vec<Vec<Address>>,
addresses: Arc<Vec<Vec<Address>>>,

/// List of banned addresses (see above)
/// that should not be queried.
Expand All @@ -206,7 +206,7 @@ pub struct ConnectionPool {
original_server_parameters: Arc<RwLock<ServerParameters>>,

/// Pool configuration.
pub settings: PoolSettings,
pub settings: Arc<PoolSettings>,

/// If not validated, we need to double check the pool is available before allowing a client
/// to use it.
Expand Down Expand Up @@ -445,13 +445,13 @@ impl ConnectionPool {
}

let pool = ConnectionPool {
databases: shards,
addresses,
databases: Arc::new(shards),
addresses: Arc::new(addresses),
banlist: Arc::new(RwLock::new(banlist)),
config_hash: new_pool_hash_value,
original_server_parameters: Arc::new(RwLock::new(ServerParameters::new())),
auth_hash: pool_auth_hash,
settings: PoolSettings {
settings: Arc::new(PoolSettings {
pool_mode: match user.pool_mode {
Some(pool_mode) => pool_mode,
None => pool_config.pool_mode,
Expand Down Expand Up @@ -494,7 +494,7 @@ impl ConnectionPool {
Some(ref plugins) => Some(plugins.clone()),
None => config.plugins.clone(),
},
},
}),
validated: Arc::new(AtomicBool::new(false)),
paused: Arc::new(AtomicBool::new(false)),
paused_waiter: Arc::new(Notify::new()),
Expand All @@ -504,7 +504,7 @@ impl ConnectionPool {
// before setting it globally.
// Do this async and somewhere else, we don't have to wait here.
if config.general.validate_config {
let mut validate_pool = pool.clone();
let validate_pool = pool.clone();
tokio::task::spawn(async move {
let _ = validate_pool.validate().await;
});
Expand All @@ -525,7 +525,7 @@ impl ConnectionPool {
/// when they connect.
/// This also warms up the pool for clients that connect when
/// the pooler starts up.
pub async fn validate(&mut self) -> Result<(), Error> {
pub async fn validate(&self) -> Result<(), Error> {
let mut futures = Vec::new();
let validated = Arc::clone(&self.validated);

Expand Down
10 changes: 5 additions & 5 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl QueryRouter {
}

/// Pool settings can change because of a config reload.
pub fn update_pool_settings(&mut self, pool_settings: PoolSettings) {
self.pool_settings = pool_settings;
pub fn update_pool_settings(&mut self, pool_settings: &PoolSettings) {
self.pool_settings = pool_settings.clone();
}

pub fn pool_settings(&self) -> &PoolSettings {
Expand Down Expand Up @@ -1403,7 +1403,7 @@ mod test {
assert_eq!(qr.primary_reads_enabled, None);

// Internal state must not be changed due to this, only defaults
qr.update_pool_settings(pool_settings.clone());
qr.update_pool_settings(&pool_settings);

assert_eq!(qr.active_role, None);
assert_eq!(qr.active_shard, None);
Expand Down Expand Up @@ -1476,7 +1476,7 @@ mod test {
};

let mut qr = QueryRouter::new();
qr.update_pool_settings(pool_settings);
qr.update_pool_settings(&pool_settings);

// Shard should start out unset
assert_eq!(qr.active_shard, None);
Expand Down Expand Up @@ -1860,7 +1860,7 @@ mod test {
..Default::default()
};
let mut qr = QueryRouter::new();
qr.update_pool_settings(pool_settings);
qr.update_pool_settings(&pool_settings);

let query = simple_query("SELECT * FROM pg_database");
let ast = qr.parse(&query).unwrap();
Expand Down

0 comments on commit d37df43

Please sign in to comment.