Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce least-outstanding-connections load balancing #282

Merged
merged 7 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ pub enum PoolMode {
#[serde(alias = "session", alias = "Session")]
Session,
}

impl ToString for PoolMode {
fn to_string(&self) -> String {
match *self {
Expand All @@ -274,11 +273,33 @@ impl ToString for PoolMode {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum LoadBalancingMode {
#[serde(alias = "random", alias = "Random")]
Random,

#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
LeastOutstandingConnections,
}
impl ToString for LoadBalancingMode {
fn to_string(&self) -> String {
match *self {
LoadBalancingMode::Random => "random".to_string(),
LoadBalancingMode::LeastOutstandingConnections => {
"least_outstanding_connections".to_string()
}
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct Pool {
#[serde(default = "Pool::default_pool_mode")]
pub pool_mode: PoolMode,

#[serde(default = "Pool::default_load_balancing_mode")]
pub load_balancing_mode: LoadBalancingMode,

pub default_role: String,

#[serde(default)] // False
Expand All @@ -305,6 +326,10 @@ impl Pool {
PoolMode::Transaction
}

pub fn default_load_balancing_mode() -> LoadBalancingMode {
LoadBalancingMode::Random
}

pub fn default_automatic_sharding_key() -> Option<String> {
None
}
Expand Down Expand Up @@ -345,6 +370,7 @@ impl Default for Pool {
fn default() -> Pool {
Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
users: BTreeMap::default(),
default_role: String::from("any"),
Expand Down Expand Up @@ -471,6 +497,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
format!("pools.{}.pool_mode", pool_name),
pool.pool_mode.to_string(),
),
(
format!("pools.{}.load_balancing_mode", pool_name),
pool.load_balancing_mode.to_string(),
),
(
format!("pools.{}.primary_reads_enabled", pool_name),
pool.primary_reads_enabled.to_string(),
Expand Down Expand Up @@ -594,6 +624,10 @@ impl Config {
"[pool: {}] Pool mode: {:?}",
pool_name, pool_config.pool_mode
);
info!(
"[pool: {}] Load Balancing mode: {:?}",
pool_name, pool_config.load_balancing_mode
);
let connect_timeout = match pool_config.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => self.general.connect_timeout,
Expand Down
32 changes: 30 additions & 2 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

use crate::config::{get_config, Address, General, PoolMode, Role, User};
use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User};
use crate::errors::Error;

use crate::server::Server;
Expand Down Expand Up @@ -62,6 +62,9 @@ pub struct PoolSettings {
/// Transaction or Session.
pub pool_mode: PoolMode,

/// Random or LeastOutstandingConnections.
pub load_balancing_mode: LoadBalancingMode,

// Number of shards.
pub shards: usize,

Expand Down Expand Up @@ -94,6 +97,7 @@ impl Default for PoolSettings {
fn default() -> PoolSettings {
PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: LoadBalancingMode::Random,
shards: 1,
user: User::default(),
default_role: None,
Expand Down Expand Up @@ -257,6 +261,7 @@ impl ConnectionPool {
server_info: BytesMut::new(),
settings: PoolSettings {
pool_mode: pool_config.pool_mode,
load_balancing_mode: pool_config.load_balancing_mode,
// shards: pool_config.shards.clone(),
shards: shard_ids.len(),
user: user.clone(),
Expand Down Expand Up @@ -356,8 +361,17 @@ impl ConnectionPool {
.filter(|address| address.role == role)
.collect();

// Random load balancing
// We shuffle even if least_outstanding_queries is used to avoid imbalance
// in cases where all candidates have more or less the same number of outstanding
// queries
candidates.shuffle(&mut thread_rng());
if self.settings.load_balancing_mode == LoadBalancingMode::LeastOutstandingConnections {
candidates.sort_by(|a, b| {
self.busy_connection_count(b)
.partial_cmp(&self.busy_connection_count(a))
.unwrap()
Comment on lines +369 to +372
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually, the least_checked_out_connections which can be skewed by clients that are idle in transactions, I think we should be fine though.

});
}

while !candidates.is_empty() {
// Get the next candidate
Expand Down Expand Up @@ -565,6 +579,20 @@ impl ConnectionPool {
pub fn server_info(&self) -> BytesMut {
self.server_info.clone()
}

fn busy_connection_count(&self, address: &Address) -> u32 {
let state = self.databases[address.shard][address.address_index].state();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let idle = state.idle_connections;
let provisioned = state.connections;

if idle > provisioned {
// Unlikely but avoids an overflow panic if this ever happens
return 0;
}
let busy = provisioned - idle;
debug!("{:?} has {:?} busy connections", address, busy);
return busy;
}
}

/// Wrapper for the bb8 connection pool.
Expand Down
1 change: 1 addition & 0 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ mod test {

let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
shards: 2,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
Expand Down
9 changes: 6 additions & 3 deletions tests/ruby/helpers/pgcat_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

module Helpers
module Pgcat
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
Expand All @@ -23,6 +23,7 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
Expand All @@ -46,7 +47,7 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
end
end

def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
Expand All @@ -64,6 +65,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
"#{pool_name}" => {
"default_role" => "primary",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
Expand All @@ -90,7 +92,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
end
end

def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
Expand All @@ -111,6 +113,7 @@ def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
Expand Down
106 changes: 105 additions & 1 deletion tests/ruby/load_balancing_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true
require_relative 'spec_helper'

describe "Load Balancing" do
describe "Random Load Balancing" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
after do
processes.all_databases.map(&:reset)
Expand Down Expand Up @@ -59,3 +59,107 @@
end
end

describe "Least Outstanding Queries Load Balancing" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end

context "under homogenous load" do
it "balances query volume between all instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))

query_count = QUERY_COUNT
expected_share = query_count / processes.all_databases.count
failed_count = 0

query_count.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end

expect(failed_count).to eq(0)
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
end
end

context "under heterogeneous load" do
it "balances query volume between all instances based on how busy they are" do
slow_query_count = 2
threads = Array.new(slow_query_count) do
Thread.new do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SELECT pg_sleep(1)")
end
end

conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))

query_count = QUERY_COUNT
expected_share = query_count / (processes.all_databases.count - slow_query_count)
failed_count = 0

query_count.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end

expect(failed_count).to eq(0)
# Under LOQ, we expect replicas running the slow pg_sleep
# to get no selects
expect(
processes.
all_databases.
map(&:count_select_1_plus_2).
count { |instance_share| instance_share == 0 }
).to eq(slow_query_count)

# We also expect the quick queries to be spread across
# the idle servers only
processes.
all_databases.
map(&:count_select_1_plus_2).
reject { |instance_share| instance_share == 0 }.
each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end

threads.map(&:join)
end
end

context "when some replicas are down" do
it "balances query volume between working instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
failed_count = 0

processes[:replicas][0].take_down do
processes[:replicas][1].take_down do
QUERY_COUNT.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
end
end

expect(failed_count).to eq(2)
processes.all_databases.each do |instance|
queries_routed = instance.count_select_1_plus_2
if processes.replicas[0..1].include?(instance)
expect(queries_routed).to eq(0)
else
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
end
end
end
end