Skip to content

Commit

Permalink
Introduce least-outstanding-connections load balancing (#282)
Browse files Browse the repository at this point in the history
Least outstanding connections load balancing can improve the load distribution between instances but for Pgcat it may also improve handling slow replicas that don't go completely down. With LoC, traffic will quickly move away from the slow replica without waiting for the replica to be banned.

If all replicas slow down equally (due to a bad query that is hitting all replicas), the algorithm will degenerate to Random Load Balancing (which is what we had in Pgcat until today).

This may also allow Pgcat to accommodate pools with differently-sized replicas.
  • Loading branch information
drdrsh authored Jan 17, 2023
1 parent ab0bad6 commit 7894bba
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 7 deletions.
36 changes: 35 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ pub enum PoolMode {
#[serde(alias = "session", alias = "Session")]
Session,
}

impl ToString for PoolMode {
fn to_string(&self) -> String {
match *self {
Expand All @@ -274,11 +273,33 @@ impl ToString for PoolMode {
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)]
pub enum LoadBalancingMode {
#[serde(alias = "random", alias = "Random")]
Random,

#[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")]
LeastOutstandingConnections,
}
impl ToString for LoadBalancingMode {
fn to_string(&self) -> String {
match *self {
LoadBalancingMode::Random => "random".to_string(),
LoadBalancingMode::LeastOutstandingConnections => {
"least_outstanding_connections".to_string()
}
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct Pool {
#[serde(default = "Pool::default_pool_mode")]
pub pool_mode: PoolMode,

#[serde(default = "Pool::default_load_balancing_mode")]
pub load_balancing_mode: LoadBalancingMode,

pub default_role: String,

#[serde(default)] // False
Expand All @@ -305,6 +326,10 @@ impl Pool {
PoolMode::Transaction
}

pub fn default_load_balancing_mode() -> LoadBalancingMode {
LoadBalancingMode::Random
}

pub fn default_automatic_sharding_key() -> Option<String> {
None
}
Expand Down Expand Up @@ -345,6 +370,7 @@ impl Default for Pool {
fn default() -> Pool {
Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
shards: BTreeMap::from([(String::from("1"), Shard::default())]),
users: BTreeMap::default(),
default_role: String::from("any"),
Expand Down Expand Up @@ -471,6 +497,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
format!("pools.{}.pool_mode", pool_name),
pool.pool_mode.to_string(),
),
(
format!("pools.{}.load_balancing_mode", pool_name),
pool.load_balancing_mode.to_string(),
),
(
format!("pools.{}.primary_reads_enabled", pool_name),
pool.primary_reads_enabled.to_string(),
Expand Down Expand Up @@ -594,6 +624,10 @@ impl Config {
"[pool: {}] Pool mode: {:?}",
pool_name, pool_config.pool_mode
);
info!(
"[pool: {}] Load Balancing mode: {:?}",
pool_name, pool_config.load_balancing_mode
);
let connect_timeout = match pool_config.connect_timeout {
Some(connect_timeout) => connect_timeout,
None => self.general.connect_timeout,
Expand Down
32 changes: 30 additions & 2 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

use crate::config::{get_config, Address, General, PoolMode, Role, User};
use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User};
use crate::errors::Error;

use crate::server::Server;
Expand Down Expand Up @@ -62,6 +62,9 @@ pub struct PoolSettings {
/// Transaction or Session.
pub pool_mode: PoolMode,

/// Random or LeastOutstandingConnections.
pub load_balancing_mode: LoadBalancingMode,

// Number of shards.
pub shards: usize,

Expand Down Expand Up @@ -94,6 +97,7 @@ impl Default for PoolSettings {
fn default() -> PoolSettings {
PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: LoadBalancingMode::Random,
shards: 1,
user: User::default(),
default_role: None,
Expand Down Expand Up @@ -257,6 +261,7 @@ impl ConnectionPool {
server_info: BytesMut::new(),
settings: PoolSettings {
pool_mode: pool_config.pool_mode,
load_balancing_mode: pool_config.load_balancing_mode,
// shards: pool_config.shards.clone(),
shards: shard_ids.len(),
user: user.clone(),
Expand Down Expand Up @@ -356,8 +361,17 @@ impl ConnectionPool {
.filter(|address| address.role == role)
.collect();

// Random load balancing
// We shuffle even if least_outstanding_queries is used to avoid imbalance
// in cases where all candidates have more or less the same number of outstanding
// queries
candidates.shuffle(&mut thread_rng());
if self.settings.load_balancing_mode == LoadBalancingMode::LeastOutstandingConnections {
candidates.sort_by(|a, b| {
self.busy_connection_count(b)
.partial_cmp(&self.busy_connection_count(a))
.unwrap()
});
}

while !candidates.is_empty() {
// Get the next candidate
Expand Down Expand Up @@ -565,6 +579,20 @@ impl ConnectionPool {
pub fn server_info(&self) -> BytesMut {
self.server_info.clone()
}

fn busy_connection_count(&self, address: &Address) -> u32 {
let state = self.pool_state(address.shard, address.address_index);
let idle = state.idle_connections;
let provisioned = state.connections;

if idle > provisioned {
// Unlikely but avoids an overflow panic if this ever happens
return 0;
}
let busy = provisioned - idle;
debug!("{:?} has {:?} busy connections", address, busy);
return busy;
}
}

/// Wrapper for the bb8 connection pool.
Expand Down
1 change: 1 addition & 0 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ mod test {

let pool_settings = PoolSettings {
pool_mode: PoolMode::Transaction,
load_balancing_mode: crate::config::LoadBalancingMode::Random,
shards: 2,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
Expand Down
9 changes: 6 additions & 3 deletions tests/ruby/helpers/pgcat_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

module Helpers
module Pgcat
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
Expand All @@ -23,6 +23,7 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
Expand All @@ -46,7 +47,7 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
end
end

def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
Expand All @@ -64,6 +65,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
"#{pool_name}" => {
"default_role" => "primary",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
Expand All @@ -90,7 +92,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction")
end
end

def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
user = {
"password" => "sharding_user",
"pool_size" => pool_size,
Expand All @@ -111,6 +113,7 @@ def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
"#{pool_name}" => {
"default_role" => "any",
"pool_mode" => pool_mode,
"load_balancing_mode" => lb_mode,
"primary_reads_enabled" => false,
"query_parser_enabled" => false,
"sharding_function" => "pg_bigint_hash",
Expand Down
106 changes: 105 additions & 1 deletion tests/ruby/load_balancing_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true
require_relative 'spec_helper'

describe "Load Balancing" do
describe "Random Load Balancing" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
after do
processes.all_databases.map(&:reset)
Expand Down Expand Up @@ -59,3 +59,107 @@
end
end

describe "Least Outstanding Queries Load Balancing" do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") }
after do
processes.all_databases.map(&:reset)
processes.pgcat.shutdown
end

context "under homogenous load" do
it "balances query volume between all instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))

query_count = QUERY_COUNT
expected_share = query_count / processes.all_databases.count
failed_count = 0

query_count.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end

expect(failed_count).to eq(0)
processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
end
end

context "under heterogeneous load" do
it "balances query volume between all instances based on how busy they are" do
slow_query_count = 2
threads = Array.new(slow_query_count) do
Thread.new do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SELECT pg_sleep(1)")
end
end

conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))

query_count = QUERY_COUNT
expected_share = query_count / (processes.all_databases.count - slow_query_count)
failed_count = 0

query_count.times do
conn.async_exec("SELECT 1 + 2")
rescue
failed_count += 1
end

expect(failed_count).to eq(0)
# Under LOQ, we expect replicas running the slow pg_sleep
# to get no selects
expect(
processes.
all_databases.
map(&:count_select_1_plus_2).
count { |instance_share| instance_share == 0 }
).to eq(slow_query_count)

# We also expect the quick queries to be spread across
# the idle servers only
processes.
all_databases.
map(&:count_select_1_plus_2).
reject { |instance_share| instance_share == 0 }.
each do |instance_share|
expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end

threads.map(&:join)
end
end

context "when some replicas are down" do
it "balances query volume between working instances" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
failed_count = 0

processes[:replicas][0].take_down do
processes[:replicas][1].take_down do
QUERY_COUNT.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
end
end

expect(failed_count).to eq(2)
processes.all_databases.each do |instance|
queries_routed = instance.count_select_1_plus_2
if processes.replicas[0..1].include?(instance)
expect(queries_routed).to eq(0)
else
expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
end
end
end
end
end

0 comments on commit 7894bba

Please sign in to comment.